diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2021-10-05 09:34:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-05 10:31:41 +0000 |
commit | c06be6d51a30e00e3fab6e7baaa95e340cd6352d (patch) | |
tree | 8082baa89fb5b52b03c1f7396e7b77b4c7fea4fe | |
parent | 0b4fd0cc9ab87b0ca59b1046e5ca4e40060988cb (diff) | |
download | mongo-c06be6d51a30e00e3fab6e7baaa95e340cd6352d.tar.gz |
SERVER-59964 Persist data on commands issued by BalancerCommandsScheduler to support crash recovery routine
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_command_document.idl | 69 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler.h | 17 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp | 217 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h | 136 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp | 251 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager.cpp | 13 |
9 files changed, 538 insertions, 175 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index b57e1054a0f..45d4f231029 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -117,6 +117,10 @@ const NamespaceString NamespaceString::kShardingRenameParticipantsNamespace( const NamespaceString NamespaceString::kConfigSettingsNamespace(NamespaceString::kConfigDb, "settings"); + +const NamespaceString NamespaceString::kConfigBalancerCommandsNamespace( + NamespaceString::kConfigDb, "balancerCommandsSchedulerOngoingOperations"); + const NamespaceString NamespaceString::kVectorClockNamespace(NamespaceString::kConfigDb, "vectorClock"); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index ca1a0bbd4bc..41efc618e8f 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -169,6 +169,9 @@ public: // Namespace for balancer settings and default read and write concerns. static const NamespaceString kConfigSettingsNamespace; + // Namespace with information on commands sent by the balancer (thru BalancerCommandsScheduler). + static const NamespaceString kConfigBalancerCommandsNamespace; + // Namespace for vector clock state. static const NamespaceString kVectorClockNamespace; diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index d9317dfdbf4..39d418721d5 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -208,6 +208,7 @@ env.Library( 'balancer/balance_stats.cpp', 'balancer/balancer_chunk_selection_policy_impl.cpp', 'balancer/balancer_chunk_selection_policy.cpp', + 'balancer/balancer_command_document.idl', 'balancer/balancer_commands_scheduler_impl.cpp', 'balancer/balancer_policy.cpp', 'balancer/balancer.cpp', @@ -440,7 +441,6 @@ env.CppUnitTest( 'active_migrations_registry_test.cpp', 'auto_split_vector_test.cpp', 'balancer/balance_stats_test.cpp', - 'balancer/balancer_commands_scheduler_test.cpp', 'chunk_split_state_driver_test.cpp', 'collection_metadata_filtering_test.cpp', 'collection_metadata_test.cpp', @@ -535,6 +535,7 @@ env.CppUnitTest( 'balancer/balancer_policy_test.cpp', 'balancer/cluster_statistics_test.cpp', 'balancer/core_options_stub.cpp', + 'balancer/balancer_commands_scheduler_test.cpp', 'balancer/migration_manager_test.cpp', 'balancer/migration_test_fixture.cpp', 'balancer/scoped_migration_request_test.cpp', diff --git a/src/mongo/db/s/balancer/balancer_command_document.idl b/src/mongo/db/s/balancer/balancer_command_document.idl new file mode 100644 index 00000000000..b4d8e23d752 --- /dev/null +++ b/src/mongo/db/s/balancer/balancer_command_document.idl @@ -0,0 +1,69 @@ +# Copyright (C) 2021-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file defines the format of documents stored in +# config.balancerCommandsSchedulerOngoingOperations on the config server. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + - "mongo/s/sharding_types.idl" + +structs: + PersistedBalancerCommand: + description: "Descriptor of a command issued by BalancerCommandsScheduler, + persisted on disk (under NamespaceString::kConfigBalancerCommandsNamespace) + to be re-executed in case of server crash." + strict: false + generate_comparison_operators: false + fields: + requestId: + type: uuid + description: "Original RequestID associated to the command + when this was issued for the first time. For logging purposes." + optional: false + remoteCommand: + type: object_owned + description: "Serialised version of the remote command submitted by the + BalancerCommandsScheduler." + optional: false + target: + type: shard_id + description: "Id of the shard that has to receive and execute the remote command." + optional: false + nss: + type: namespacestring + description: "Namespace string being targeted by the remote command." + optional: false + requiresDistributedLock: + type: bool + description: "When true, the safe execution of the remote command requires + the distributed lock for the targeted namespace to be taken." + optional: false diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler.h b/src/mongo/db/s/balancer/balancer_commands_scheduler.h index d979ba7c800..c749f970f68 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler.h @@ -88,7 +88,7 @@ class DeferredResponse { public: virtual ~DeferredResponse() = default; - virtual uint32_t getRequestId() const = 0; + virtual UUID getRequestId() const = 0; virtual bool hasFinalised() const = 0; @@ -129,10 +129,10 @@ public: virtual ~BalancerCommandsScheduler() = default; /** - * Activates the scheduler, which will start accepting and processing - * request<Command>() invocations. + * Triggers an asynchronous self-initialisation of the component, + * which will start accepting request<Command>() invocations. */ - virtual void start() = 0; + virtual void start(OperationContext* opCtx) = 0; /** * Stops the scheduler and the processing of any outstanding and incoming request @@ -141,27 +141,34 @@ public: virtual void stop() = 0; virtual std::unique_ptr<MoveChunkResponse> requestMoveChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardId& recipient, const MoveChunkSettings& commandSettings) = 0; virtual std::unique_ptr<MergeChunksResponse> requestMergeChunks( - const NamespaceString& nss, const ChunkType& lowerBound, const ChunkType& upperBound) = 0; + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkType& lowerBound, + const ChunkType& upperBound) = 0; virtual std::unique_ptr<SplitVectorResponse> requestSplitVector( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, const SplitVectorSettings& commandSettings) = 0; virtual std::unique_ptr<SplitChunkResponse> requestSplitChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, const std::vector<BSONObj>& splitPoints) = 0; virtual std::unique_ptr<ChunkDataSizeResponse> requestChunkDataSize( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index 3e0a2451946..602423605b5 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -31,6 +31,7 @@ #include "mongo/db/s/balancer/balancer_commands_scheduler_impl.h" #include "mongo/db/client.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/grid.h" @@ -77,24 +78,24 @@ BalancerCommandsSchedulerImpl::~BalancerCommandsSchedulerImpl() { stop(); } -void BalancerCommandsSchedulerImpl::_setState(WithLock, SchedulerState state) { - _state = state; - _stateUpdatedCV.notify_all(); -} - -void BalancerCommandsSchedulerImpl::start() { +void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx) { LOGV2(5847200, "Balancer command scheduler start requested"); stdx::lock_guard<Latch> lgss(_startStopMutex); stdx::lock_guard<Latch> lg(_mutex); - if (_state == SchedulerState::Running) { + if (_state == SchedulerState::Recovering || _state == SchedulerState::Running) { return; } - // TODO init _requestIdCounter here based on the stored running requests from a past invocation? invariant(!_workerThreadHandle.joinable()); _incompleteRequests.reserve(_maxRunningRequests * 10); _runningRequestIds.reserve(_maxRunningRequests); - _state = SchedulerState::Running; + auto requestsToRecover = _loadRequestsToRecover(opCtx); + _state = requestsToRecover.empty() ? SchedulerState::Running : SchedulerState::Recovering; + + for (auto& requestToRecover : requestsToRecover) { + _enqueueRequest(lg, std::move(requestToRecover)); + } + _workerThreadHandle = stdx::thread([this] { _workerThread(); }); } @@ -108,12 +109,14 @@ void BalancerCommandsSchedulerImpl::stop() { } invariant(_workerThreadHandle.joinable()); - _setState(lg, SchedulerState::Stopping); + _state = SchedulerState::Stopping; + _stateUpdatedCV.notify_all(); } _workerThreadHandle.join(); } std::unique_ptr<MoveChunkResponse> BalancerCommandsSchedulerImpl::requestMoveChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardId& recipient, @@ -130,17 +133,17 @@ std::unique_ptr<MoveChunkResponse> BalancerCommandsSchedulerImpl::requestMoveChu commandSettings.forceJumbo, chunk.getVersion()); - auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo)); + auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo)); return std::make_unique<MoveChunkResponseImpl>(std::move(requestCollectionInfo)); } std::unique_ptr<MergeChunksResponse> BalancerCommandsSchedulerImpl::requestMergeChunks( - const NamespaceString& nss, const ChunkType& lowerBound, const ChunkType& upperBound) { + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkType& lowerBound, + const ChunkType& upperBound) { invariant(lowerBound.getShard() == upperBound.getShard()); invariant(lowerBound.getMax().woCompare(upperBound.getMin()) <= 0); - // TODO why this may fail? - // invariant(lowerBound.epoch() == upperBound.epoch() && - // lowerBound.getVersion().majorVersion() == upperBound.getVersion().majorVersion()); auto commandInfo = std::make_shared<MergeChunksCommandInfo>( nss, @@ -150,11 +153,12 @@ std::unique_ptr<MergeChunksResponse> BalancerCommandsSchedulerImpl::requestMerge lowerBound.getVersion().isOlderThan(upperBound.getVersion()) ? upperBound.getVersion() : lowerBound.getVersion()); - auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo)); + auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo)); return std::make_unique<MergeChunksResponseImpl>(std::move(requestCollectionInfo)); } std::unique_ptr<SplitVectorResponse> BalancerCommandsSchedulerImpl::requestSplitVector( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, @@ -170,11 +174,12 @@ std::unique_ptr<SplitVectorResponse> BalancerCommandsSchedulerImpl::requestSplit commandSettings.maxChunkSizeBytes, commandSettings.force); - auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo)); + auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo)); return std::make_unique<SplitVectorResponseImpl>(std::move(requestCollectionInfo)); } std::unique_ptr<SplitChunkResponse> BalancerCommandsSchedulerImpl::requestSplitChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, @@ -188,11 +193,12 @@ std::unique_ptr<SplitChunkResponse> BalancerCommandsSchedulerImpl::requestSplitC chunk.getVersion(), splitPoints); - auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo)); + auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo)); return std::make_unique<SplitChunkResponseImpl>(std::move(requestCollectionInfo)); } std::unique_ptr<ChunkDataSizeResponse> BalancerCommandsSchedulerImpl::requestChunkDataSize( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, @@ -205,36 +211,73 @@ std::unique_ptr<ChunkDataSizeResponse> BalancerCommandsSchedulerImpl::requestChu estimatedValue, chunk.getVersion()); - auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo)); + auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo)); return std::make_unique<ChunkDataSizeResponseImpl>(std::move(requestCollectionInfo)); } -ResponseHandle BalancerCommandsSchedulerImpl::_enqueueNewRequest( - std::shared_ptr<CommandInfo>&& commandInfo) { - const auto newRequestId = _requestIdCounter.fetchAndAddRelaxed(1); +ResponseHandle BalancerCommandsSchedulerImpl::_buildAndEnqueueNewRequest( + OperationContext* opCtx, std::shared_ptr<CommandInfo>&& commandInfo) { + const auto newRequestId = UUID::gen(); LOGV2(5847202, - "Enqueuing new Balancer command request with id {reqId}. Details: {command} ", + "Enqueuing new Balancer command request with id {reqId}. Details: {command}", "Enqueuing new Balancer command request", "reqId"_attr = newRequestId, - "command"_attr = commandInfo->serialise().toString()); + "command"_attr = commandInfo->serialise().toString(), + "recoveryDocRequired"_attr = commandInfo->requiresRecoveryOnCrash()); + + if (commandInfo->requiresRecoveryOnCrash()) { + DBDirectClient dbClient(opCtx); + PersistedBalancerCommand recoveryDoc(newRequestId, + commandInfo->serialise(), + commandInfo->getTarget(), + commandInfo->getNameSpace(), + commandInfo->requiresDistributedLock()); + std::vector<BSONObj> serialisedRecoveryInfo; + serialisedRecoveryInfo.emplace_back(recoveryDoc.toBSON()); + auto reply = dbClient.insertAcknowledged( + NamespaceString::kConfigBalancerCommandsNamespace.toString(), + serialisedRecoveryInfo, + true, + WriteConcernOptions::Majority); + + if (auto writeStatus = getStatusFromWriteCommandReply(reply); !writeStatus.isOK()) { + LOGV2(5847210, + "Failed to persist command document for reqId {reqId}. Error status: {status}", + "Failed to persist request command document", + "reqId"_attr = newRequestId, + "status"_attr = writeStatus); + return ResponseHandle(newRequestId, writeStatus); + } + } RequestData pendingRequest(newRequestId, std::move(commandInfo)); - auto deferredResponseHandle = pendingRequest.getResponseHandle(); - { - stdx::lock_guard<Latch> lg(_mutex); - if (_state == SchedulerState::Running) { - invariant(_workerThreadHandle.joinable()); - _incompleteRequests.emplace(std::make_pair(newRequestId, std::move(pendingRequest))); - _pendingRequestIds.push_back(newRequestId); - _stateUpdatedCV.notify_all(); - } else { - deferredResponseHandle.handle->set(Status( - ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped")); - } + + stdx::unique_lock<Latch> ul(_mutex); + _stateUpdatedCV.wait(ul, [this] { return _state != SchedulerState::Recovering; }); + + return _enqueueRequest(ul, std::move(pendingRequest)); +} + +ResponseHandle BalancerCommandsSchedulerImpl::_enqueueRequest(WithLock, RequestData&& request) { + auto requestId = request.getId(); + auto deferredResponseHandle = request.getResponseHandle(); + if (_state == SchedulerState::Recovering || _state == SchedulerState::Running) { + _incompleteRequests.emplace(std::make_pair(requestId, std::move(request))); + _pendingRequestIds.push_back(requestId); + _stateUpdatedCV.notify_all(); + } else { + deferredResponseHandle.handle->set(Status( + ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped")); } + return deferredResponseHandle; } +bool BalancerCommandsSchedulerImpl::_canSubmitNewRequests(WithLock) { + return (!_pendingRequestIds.empty() && _runningRequestIds.size() < _maxRunningRequests && + MONGO_likely(!pauseBalancerWorkerThread.shouldFail())); +} + Status BalancerCommandsSchedulerImpl::_acquireDistLock(OperationContext* opCtx, NamespaceString nss) { auto it = _migrationLocks.find(nss); @@ -304,7 +347,7 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit( _applyCommandResponse(opCtx, requestId, args.response); }; - if (handle.commandInfo->getType() == CommandInfo::Type::kMoveChunk) { + if (handle.commandInfo->requiresDistributedLock()) { Status lockAcquisitionResponse = _acquireDistLock(opCtx, handle.commandInfo->getNameSpace()); if (!lockAcquisitionResponse.isOK()) { @@ -336,18 +379,21 @@ void BalancerCommandsSchedulerImpl::_applySubmissionResult( * - set the execution context on the RequestData to be able to serve future cancel requests * - update the data structure describing running command requests * If failed, - * - persist the outcome into the deferred response - * - remove the RequestData (and the info on migration) + * - store the outcome into the deferred response + * - remove the RequestData and its persisted info (if any) */ auto& requestToUpdate = requestToUpdateIt->second; if (submissionResult.context.isOK()) { requestToUpdate.addExecutionContext(std::move(submissionResult.context.getValue())); _runningRequestIds.insert(submissionResult.id); } else { + const auto& submittedCommandInfo = requestToUpdate.getCommandInfo(); if (submissionResult.acquiredDistLock) { - const auto& submittedCommandInfo = requestToUpdate.getCommandInfo(); _releaseDistLock(opCtx, submittedCommandInfo.getNameSpace()); } + if (submittedCommandInfo.requiresRecoveryOnCrash()) { + _obsoleteRecoveryDocumentIds.push_back(submissionResult.id); + } requestToUpdate.setOutcome(submissionResult.context.getStatus()); _incompleteRequests.erase(requestToUpdateIt); } @@ -355,24 +401,31 @@ void BalancerCommandsSchedulerImpl::_applySubmissionResult( void BalancerCommandsSchedulerImpl::_applyCommandResponse( OperationContext* opCtx, - uint32_t requestId, + UUID requestId, const executor::TaskExecutor::ResponseStatus& response) { { stdx::lock_guard<Latch> lg(_mutex); auto requestToCompleteIt = _incompleteRequests.find(requestId); - if (_state != SchedulerState::Running || requestToCompleteIt == _incompleteRequests.end()) { + if (_state == SchedulerState::Stopping || _state == SchedulerState::Stopped || + requestToCompleteIt == _incompleteRequests.end()) { // Drop the response - the request is being cancelled in the worker thread. - // (TODO modify to handle stepdown scenarios). return; } auto& requestToComplete = requestToCompleteIt->second; requestToComplete.setOutcome(response); auto& commandInfo = requestToComplete.getCommandInfo(); - if (commandInfo.getType() == CommandInfo::Type::kMoveChunk) { + if (commandInfo.requiresDistributedLock()) { _releaseDistLock(opCtx, commandInfo.getNameSpace()); } + if (commandInfo.requiresRecoveryOnCrash()) { + _obsoleteRecoveryDocumentIds.push_back(requestId); + } _runningRequestIds.erase(requestId); _incompleteRequests.erase(requestToCompleteIt); + if (_incompleteRequests.empty() && _state == SchedulerState::Recovering) { + LOGV2(5847213, "Balancer scheduler recovery complete. Switching to regular execution"); + _state = SchedulerState::Running; + } _stateUpdatedCV.notify_all(); } LOGV2(5847204, @@ -382,6 +435,59 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse( "response"_attr = response.toString()); } +std::vector<RequestData> BalancerCommandsSchedulerImpl::_loadRequestsToRecover( + OperationContext* opCtx) { + std::vector<RequestData> requestsToRecover; + auto documentProcessor = [&requestsToRecover](const BSONObj& commandToRecoverDoc) { + auto originalCommand = PersistedBalancerCommand::parse( + IDLParserErrorContext("BalancerCommandsScheduler"), commandToRecoverDoc); + auto recoveryCommand = std::make_shared<RecoveryCommandInfo>(originalCommand); + LOGV2(5847212, + "Recovered request id {reqId}, which command will be rescheduled. Details: " + "{command}", + "Command request recovered and set for rescheduling", + "reqId"_attr = originalCommand.getRequestId(), + "command"_attr = recoveryCommand->serialise()); + requestsToRecover.emplace_back(originalCommand.getRequestId(), std::move(recoveryCommand)); + }; + DBDirectClient dbClient(opCtx); + dbClient.query(documentProcessor, NamespaceString::kConfigBalancerCommandsNamespace, BSONObj()); + return requestsToRecover; +} + +void BalancerCommandsSchedulerImpl::_cleanUpObsoleteRecoveryInfo(WithLock, + OperationContext* opCtx) { + if (_obsoleteRecoveryDocumentIds.empty()) { + return; + } + BSONObjBuilder queryBuilder; + if (_obsoleteRecoveryDocumentIds.size() == 1) { + _obsoleteRecoveryDocumentIds[0].appendToBuilder( + &queryBuilder, PersistedBalancerCommand::kRequestIdFieldName); + } else { + BSONObjBuilder requestIdClause( + queryBuilder.subobjStart(PersistedBalancerCommand::kRequestIdFieldName)); + BSONArrayBuilder valuesBuilder(requestIdClause.subarrayStart("$in")); + for (const auto& requestId : _obsoleteRecoveryDocumentIds) { + requestId.appendToArrayBuilder(&valuesBuilder); + } + } + + _obsoleteRecoveryDocumentIds.clear(); + + auto query = queryBuilder.obj(); + DBDirectClient dbClient(opCtx); + + auto reply = dbClient.removeAcknowledged( + NamespaceString::kConfigBalancerCommandsNamespace.toString(), query); + + LOGV2(5847211, + "Clean up of obsolete document info performed with outcome {reply}", + "Clean up of obsolete document info performed", + "query"_attr = query, + "reply"_attr = reply); +} + void BalancerCommandsSchedulerImpl::_workerThread() { ON_BLOCK_EXIT([this] { @@ -389,32 +495,38 @@ void BalancerCommandsSchedulerImpl::_workerThread() { "BalancerCommandsScheduler worker thread failed to release all locks on exit"); LOGV2(5847208, "Leaving balancer command scheduler thread"); stdx::lock_guard<Latch> lg(_mutex); - _setState(lg, SchedulerState::Stopped); + _state = SchedulerState::Stopped; + _stateUpdatedCV.notify_all(); }); Client::initThread("BalancerCommandsScheduler"); auto opCtxHolder = cc().makeOperationContext(); - stdx::unordered_map<uint32_t, RequestData> requestsToCleanUpOnExit; + stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToCleanUpOnExit; LOGV2(5847205, "Balancer scheduler thread started"); while (true) { std::vector<CommandSubmissionHandle> commandsToSubmit; { stdx::unique_lock<Latch> ul(_mutex); - _stateUpdatedCV.wait(ul, [this] { - return (_state != SchedulerState::Running || - (!_pendingRequestIds.empty() && - _runningRequestIds.size() < _maxRunningRequests && - MONGO_likely(!pauseBalancerWorkerThread.shouldFail()))); + invariant(_state != SchedulerState::Stopped); + _stateUpdatedCV.wait(ul, [this, &ul] { + return (_state == SchedulerState::Stopping || _canSubmitNewRequests(ul) || + !_obsoleteRecoveryDocumentIds.empty()); }); - if (_state != SchedulerState::Running) { + _cleanUpObsoleteRecoveryInfo(ul, opCtxHolder.get()); + + if (_state == SchedulerState::Stopping) { _runningRequestIds.clear(); _pendingRequestIds.clear(); _incompleteRequests.swap(requestsToCleanUpOnExit); break; } + if (!_canSubmitNewRequests(ul)) { + continue; + } + // 1. Pick up new requests to be submitted from the pending list, if possible. const auto availableSubmissionSlots = static_cast<size_t>(_maxRunningRequests - _runningRequestIds.size()); @@ -450,6 +562,7 @@ void BalancerCommandsSchedulerImpl::_workerThread() { } // In case of clean exit, cancel all the pending/running command requests + // (but keep the related descriptor documents to ensure they will be reissued on recovery). auto executor = Grid::get(opCtxHolder.get())->getExecutorPool()->getFixedExecutor(); for (auto& idAndRequest : requestsToCleanUpOnExit) { idAndRequest.second.setOutcome(Status( diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h index 7dcab8b20fd..6970d5a965c 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h @@ -32,6 +32,7 @@ #include <list> #include <unordered_map> +#include "mongo/db/s/balancer/balancer_command_document_gen.h" #include "mongo/db/s/balancer/balancer_commands_scheduler.h" #include "mongo/db/s/dist_lock_manager.h" #include "mongo/db/service_context.h" @@ -46,13 +47,20 @@ namespace mongo { * Data structure generated from RequestData to support the creation of SchedulerResponse objects. */ struct ResponseHandle { - ResponseHandle(uint32_t requestId, + ResponseHandle(UUID requestId, const std::shared_ptr<Notification<executor::RemoteCommandResponse>>& handle) : requestId(requestId), handle(handle) {} + + ResponseHandle(UUID requestId, Status outcome) + : requestId(requestId), + handle(std::make_shared<Notification<executor::RemoteCommandResponse>>()) { + handle->set(outcome); + } + ResponseHandle(ResponseHandle&& rhs) : requestId(rhs.requestId), handle(std::move(rhs.handle)) {} - const uint32_t requestId; + const UUID requestId; const std::shared_ptr<Notification<executor::RemoteCommandResponse>> handle; }; @@ -69,7 +77,7 @@ public: ~SchedulerResponse() = default; - uint32_t getRequestId() const { + UUID getRequestId() const { return _requestId; } @@ -90,7 +98,7 @@ public: } private: - uint32_t _requestId; + UUID _requestId; std::shared_ptr<Notification<executor::RemoteCommandResponse>> _deferredValue; }; @@ -104,7 +112,7 @@ public: MoveChunkResponseImpl(ResponseHandle&& responseHandle) : SchedulerResponse(std::move(responseHandle)) {} - uint32_t getRequestId() const override { + UUID getRequestId() const override { return SchedulerResponse::getRequestId(); } @@ -122,7 +130,7 @@ public: MergeChunksResponseImpl(ResponseHandle&& responseHandle) : SchedulerResponse(std::move(responseHandle)) {} - uint32_t getRequestId() const override { + UUID getRequestId() const override { return SchedulerResponse::getRequestId(); } @@ -140,7 +148,7 @@ public: SplitVectorResponseImpl(ResponseHandle&& responseHandle) : SchedulerResponse(std::move(responseHandle)) {} - uint32_t getRequestId() const override { + UUID getRequestId() const override { return SchedulerResponse::getRequestId(); } @@ -171,7 +179,7 @@ public: SplitChunkResponseImpl(ResponseHandle&& responseHandle) : SchedulerResponse(std::move(responseHandle)) {} - uint32_t getRequestId() const override { + UUID getRequestId() const override { return SchedulerResponse::getRequestId(); } @@ -189,7 +197,7 @@ public: ChunkDataSizeResponseImpl(ResponseHandle&& responseHandle) : SchedulerResponse(std::move(responseHandle)) {} - uint32_t getRequestId() const override { + UUID getRequestId() const override { return SchedulerResponse::getRequestId(); } @@ -225,17 +233,19 @@ public: */ class CommandInfo { public: - enum class Type { kMoveChunk, kMergeChunks, kSplitVector, kSplitChunk, kDataSize }; - - CommandInfo(Type type, const ShardId& targetShardId, const NamespaceString& nss) - : _type(type), _targetShardId(targetShardId), _nss(nss) {} + CommandInfo(const ShardId& targetShardId, const NamespaceString& nss) + : _targetShardId(targetShardId), _nss(nss) {} virtual ~CommandInfo() {} virtual BSONObj serialise() const = 0; - Type getType() const { - return _type; + virtual bool requiresRecoveryOnCrash() const { + return false; + } + + virtual bool requiresDistributedLock() const { + return false; } const ShardId& getTarget() const { @@ -247,7 +257,6 @@ public: } private: - Type _type; ShardId _targetShardId; NamespaceString _nss; }; @@ -267,7 +276,7 @@ public: bool waitForDelete, MoveChunkRequest::ForceJumbo forceJumbo, const ChunkVersion& version) - : CommandInfo(Type::kMoveChunk, origin, nss), + : CommandInfo(origin, nss), _chunkBoundaries(lowerBoundKey, upperBoundKey), _recipient(recipient), _version(version), @@ -291,6 +300,14 @@ public: return commandBuilder.obj(); } + bool requiresRecoveryOnCrash() const override { + return true; + } + + bool requiresDistributedLock() const override { + return true; + } + private: ChunkRange _chunkBoundaries; ShardId _recipient; @@ -308,7 +325,7 @@ public: const BSONObj& lowerBoundKey, const BSONObj& upperBoundKey, const ChunkVersion& version) - : CommandInfo(Type::kMergeChunks, shardId, nss), + : CommandInfo(shardId, nss), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), _version(version) {} @@ -351,7 +368,7 @@ public: boost::optional<long long> maxChunkObjects, boost::optional<long long> maxChunkSizeBytes, bool force) - : CommandInfo(Type::kSplitVector, shardId, nss), + : CommandInfo(shardId, nss), _shardKeyPattern(shardKeyPattern), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), @@ -408,7 +425,7 @@ public: const BSONObj& upperBoundKey, bool estimatedValue, const ChunkVersion& version) - : CommandInfo(Type::kDataSize, shardId, nss), + : CommandInfo(shardId, nss), _shardKeyPattern(shardKeyPattern), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), @@ -452,7 +469,7 @@ public: const BSONObj& upperBoundKey, const ChunkVersion& version, const std::vector<BSONObj>& splitPoints) - : CommandInfo(Type::kSplitChunk, shardId, nss), + : CommandInfo(shardId, nss), _shardKeyPattern(shardKeyPattern), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), @@ -487,18 +504,42 @@ private: static const std::string kSplitKeys; }; +class RecoveryCommandInfo : public CommandInfo { +public: + RecoveryCommandInfo(const PersistedBalancerCommand& persistedCommand) + : CommandInfo(persistedCommand.getTarget(), persistedCommand.getNss()), + _serialisedCommand(persistedCommand.getRemoteCommand()), + _requiresDistributedLock(persistedCommand.getRequiresDistributedLock()) {} + + BSONObj serialise() const override { + return _serialisedCommand; + } + + bool requiresRecoveryOnCrash() const override { + return true; + } + + bool requiresDistributedLock() const override { + return _requiresDistributedLock; + } + +private: + BSONObj _serialisedCommand; + bool _requiresDistributedLock; +}; + /** * Helper data structure for submitting the shard command associated to a Request to the * BalancerCommandsScheduler. */ struct CommandSubmissionHandle { - CommandSubmissionHandle(uint32_t id, const std::shared_ptr<CommandInfo>& commandInfo) + CommandSubmissionHandle(UUID id, const std::shared_ptr<CommandInfo>& commandInfo) : id(id), commandInfo(commandInfo) {} CommandSubmissionHandle(CommandSubmissionHandle&& rhs) : id(rhs.id), commandInfo(std::move(rhs.commandInfo)) {} - const uint32_t id; + const UUID id; const std::shared_ptr<CommandInfo> commandInfo; }; @@ -509,14 +550,12 @@ using ExecutionContext = executor::TaskExecutor::CallbackHandle; * Helper data structure for storing the outcome of a Command submission. */ struct CommandSubmissionResult { - CommandSubmissionResult(uint32_t id, - bool acquiredDistLock, - StatusWith<ExecutionContext>&& context) + CommandSubmissionResult(UUID id, bool acquiredDistLock, StatusWith<ExecutionContext>&& context) : id(id), acquiredDistLock(acquiredDistLock), context(std::move(context)) {} CommandSubmissionResult(CommandSubmissionResult&& rhs) : id(rhs.id), acquiredDistLock(rhs.acquiredDistLock), context(std::move(rhs.context)) {} CommandSubmissionResult(const CommandSubmissionResult& rhs) = delete; - uint32_t id; + UUID id; bool acquiredDistLock; StatusWith<ExecutionContext> context; }; @@ -528,7 +567,7 @@ struct CommandSubmissionResult { */ class RequestData { public: - RequestData(uint32_t id, std::shared_ptr<CommandInfo>&& commandInfo) + RequestData(UUID id, std::shared_ptr<CommandInfo>&& commandInfo) : _id(id), _commandInfo(std::move(commandInfo)), _deferredResponse(std::make_shared<Notification<executor::RemoteCommandResponse>>()), @@ -544,7 +583,7 @@ public: ~RequestData() = default; - uint32_t getId() { + UUID getId() const { return _id; } @@ -577,7 +616,7 @@ private: RequestData(const RequestData& rhs) = delete; - const uint32_t _id; + const UUID _id; std::shared_ptr<CommandInfo> _commandInfo; @@ -596,40 +635,45 @@ public: ~BalancerCommandsSchedulerImpl(); - void start() override; + void start(OperationContext* opCtx) override; void stop() override; std::unique_ptr<MoveChunkResponse> requestMoveChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardId& destination, const MoveChunkSettings& commandSettings) override; - std::unique_ptr<MergeChunksResponse> requestMergeChunks(const NamespaceString& nss, + std::unique_ptr<MergeChunksResponse> requestMergeChunks(OperationContext* opCtx, + const NamespaceString& nss, const ChunkType& lowerBound, const ChunkType& upperBound) override; std::unique_ptr<SplitVectorResponse> requestSplitVector( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, const SplitVectorSettings& commandSettings) override; std::unique_ptr<SplitChunkResponse> requestSplitChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, const std::vector<BSONObj>& splitPoints) override; std::unique_ptr<ChunkDataSizeResponse> requestChunkDataSize( + OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk, const ShardKeyPattern& shardKeyPattern, bool estimatedValue) override; private: - enum class SchedulerState { Running, Stopping, Stopped }; + enum class SchedulerState { Recovering, Running, Stopping, Stopped }; static const int32_t _maxRunningRequests{10}; @@ -643,25 +687,22 @@ private: stdx::thread _workerThreadHandle; /** - * Generator of unique Request IDs (within the life scope of BalancerCommandsSchedulerImpl) - */ - AtomicWord<uint32_t> _requestIdCounter{0}; - - /** * Collection of all pending + running requests currently managed by * BalancerCommandsSchedulerImpl, organized by ID. */ - stdx::unordered_map<uint32_t, RequestData> _incompleteRequests; + stdx::unordered_map<UUID, RequestData, UUID::Hash> _incompleteRequests; /** * List of request IDs that have not been yet submitted. */ - std::list<uint32_t> _pendingRequestIds; + std::list<UUID> _pendingRequestIds; /** * List of request IDs that are currently running (submitted, but not yet completed). */ - stdx::unordered_set<uint32_t> _runningRequestIds; + stdx::unordered_set<UUID, UUID::Hash> _runningRequestIds; + + std::vector<UUID> _obsoleteRecoveryDocumentIds; /** * State to acquire and release DistLocks on a per namespace basis @@ -674,9 +715,12 @@ private: }; stdx::unordered_map<NamespaceString, Migrations> _migrationLocks; - void _setState(WithLock, SchedulerState state); + ResponseHandle _buildAndEnqueueNewRequest(OperationContext* opCtx, + std::shared_ptr<CommandInfo>&& commandInfo); - ResponseHandle _enqueueNewRequest(std::shared_ptr<CommandInfo>&& commandInfo); + ResponseHandle _enqueueRequest(WithLock, RequestData&& request); + + bool _canSubmitNewRequests(WithLock); Status _acquireDistLock(OperationContext* opCtx, NamespaceString nss); @@ -689,9 +733,13 @@ private: CommandSubmissionResult&& submissionResult); void _applyCommandResponse(OperationContext* opCtx, - uint32_t requestId, + UUID requestId, const executor::TaskExecutor::ResponseStatus& response); + std::vector<RequestData> _loadRequestsToRecover(OperationContext* opCtx); + + void _cleanUpObsoleteRecoveryInfo(WithLock, OperationContext* opCtx); + void _workerThread(); }; diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index 6127d08634b..e449c829eaa 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -29,9 +29,10 @@ #include "mongo/platform/basic.h" +#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/s/balancer/balancer_commands_scheduler.h" #include "mongo/db/s/balancer/balancer_commands_scheduler_impl.h" -#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" namespace mongo { @@ -39,49 +40,25 @@ namespace { using unittest::assertGet; -class BalancerCommandsSchedulerTest : public ShardServerTestFixture { +class BalancerCommandsSchedulerTest : public ConfigServerTestFixture { public: - const std::vector<ShardType> kShardList{ShardType("shard0", "Host0:12345"), - ShardType("shard1", "Host1:12345")}; - const NamespaceString kNss{"testDb.testColl"}; - - void setUp() override { - ShardServerTestFixture::setUp(); - for (auto shardType : kShardList) { - auto shard = assertGet( - shardRegistry()->getShard(operationContext(), ShardId(shardType.getName()))); - RemoteCommandTargeterMock::get(shard->getTargeter()) - ->setFindHostReturnValue(HostAndPort::parse(shardType.getHost())); - } - } + const ShardId kShardId0 = ShardId("shard0"); + const ShardId kShardId1 = ShardId("shard1"); + const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345); + const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346); - void tearDown() override { - _scheduler.stop(); - ShardServerTestFixture::tearDown(); - } + const std::vector<ShardType> kShardList{ + ShardType(kShardId0.toString(), kShardHost0.toString()), + ShardType(kShardId1.toString(), kShardHost1.toString())}; - std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient() override { - class StaticCatalogClient final : public ShardingCatalogClientMock { - public: - StaticCatalogClient(const std::vector<ShardType> kShardList) : _shards(kShardList) {} - - StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( - OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { - return repl::OpTimeWith<std::vector<ShardType>>(_shards); - } - - private: - const std::vector<ShardType> _shards; - }; - return std::make_unique<StaticCatalogClient>(kShardList); - } + const NamespaceString kNss{"testDb.testColl"}; - ChunkType makeChunk(long long min, std::string shardName) { + ChunkType makeChunk(long long min, const ShardId& shardId) { ChunkType chunk; chunk.setMin(BSON("x" << min)); chunk.setMax(BSON("x" << min + 10)); chunk.setJumbo(false); - chunk.setShard(ShardId(shardName)); + chunk.setShard(shardId); chunk.setVersion(ChunkVersion(1, 1, OID::gen(), Timestamp(10))); return chunk; } @@ -95,29 +72,65 @@ public: MoveChunkRequest::ForceJumbo::kDoNotForce); } + std::vector<BSONObj> getPersistedCommandDocuments(OperationContext* opCtx) { + auto statusWithPersistedCommandDocs = + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString::kConfigBalancerCommandsNamespace, + BSONObj(), + BSONObj(), + boost::none); + + ASSERT_OK(statusWithPersistedCommandDocs.getStatus()); + return statusWithPersistedCommandDocs.getValue().docs; + } + + protected: + void setUp() override { + setUpAndInitializeConfigDb(); + setupShards(kShardList); + // Scheduler commands target shards that need to be retrieved. + auto opCtx = operationContext(); + configureTargeter(opCtx, kShardId0, kShardHost0); + configureTargeter(opCtx, kShardId1, kShardHost1); + } + + void tearDown() override { + _scheduler.stop(); + ConfigServerTestFixture::tearDown(); + } + + void configureTargeter(OperationContext* opCtx, ShardId shardId, const HostAndPort& host) { + auto targeter = RemoteCommandTargeterMock::get( + uassertStatusOK(shardRegistry()->getShard(opCtx, shardId))->getTargeter()); + targeter->setFindHostReturnValue(kShardHost0); + } + BalancerCommandsSchedulerImpl _scheduler; }; TEST_F(BalancerCommandsSchedulerTest, StartAndStopScheduler) { - _scheduler.start(); + _scheduler.start(operationContext()); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, ResilientToMultipleStarts) { - _scheduler.start(); - _scheduler.start(); + _scheduler.start(operationContext()); + _scheduler.start(operationContext()); } TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) { - _scheduler.start(); - ChunkType moveChunk = makeChunk(0, "shard0"); + _scheduler.start(operationContext()); + ChunkType moveChunk = makeChunk(0, kShardId0); auto networkResponseFuture = launchAsync([&]() { onCommand( [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); }); auto resp = _scheduler.requestMoveChunk( - kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings()); + operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); ASSERT_OK(resp->getOutcome()); networkResponseFuture.default_timed_get(); // Ensure DistLock is released correctly @@ -133,34 +146,34 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) { } TEST_F(BalancerCommandsSchedulerTest, SuccessfulMergeChunkCommand) { - _scheduler.start(); - ChunkType chunk1 = makeChunk(0, "shard0"); - ChunkType chunk2 = makeChunk(10, "shard0"); + _scheduler.start(operationContext()); + ChunkType chunk1 = makeChunk(0, kShardId0); + ChunkType chunk2 = makeChunk(10, kShardId0); auto networkResponseFuture = launchAsync([&]() { onCommand( [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); }); - auto resp = _scheduler.requestMergeChunks(kNss, chunk1, chunk2); + auto resp = _scheduler.requestMergeChunks(operationContext(), kNss, chunk1, chunk2); ASSERT_OK(resp->getOutcome()); networkResponseFuture.default_timed_get(); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, MergeChunkNonexistentShard) { - _scheduler.start(); - ChunkType brokenChunk1 = makeChunk(0, "shard0"); + _scheduler.start(operationContext()); + ChunkType brokenChunk1 = makeChunk(0, kShardId0); brokenChunk1.setShard(ShardId("nonexistent")); - ChunkType brokenChunk2 = makeChunk(10, "shard0"); + ChunkType brokenChunk2 = makeChunk(10, kShardId0); brokenChunk2.setShard(ShardId("nonexistent")); - auto resp = _scheduler.requestMergeChunks(kNss, brokenChunk1, brokenChunk2); + auto resp = _scheduler.requestMergeChunks(operationContext(), kNss, brokenChunk1, brokenChunk2); auto shardNotFoundError = Status{ErrorCodes::ShardNotFound, "Shard nonexistent not found"}; ASSERT_EQ(resp->getOutcome(), shardNotFoundError); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitVectorCommand) { - _scheduler.start(); - ChunkType splitChunk = makeChunk(0, "shard0"); + _scheduler.start(operationContext()); + ChunkType splitChunk = makeChunk(0, kShardId0); BSONObjBuilder splitChunkResponse; splitChunkResponse.append("ok", "1"); BSONArrayBuilder splitKeys(splitChunkResponse.subarrayStart("splitKeys")); @@ -171,8 +184,11 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitVectorCommand) { return splitChunkResponse.obj(); }); }); - auto resp = _scheduler.requestSplitVector( - kNss, splitChunk, ShardKeyPattern(BSON("x" << 1)), SplitVectorSettings()); + auto resp = _scheduler.requestSplitVector(operationContext(), + kNss, + splitChunk, + ShardKeyPattern(BSON("x" << 1)), + SplitVectorSettings()); ASSERT_OK(resp->getOutcome()); ASSERT_OK(resp->getSplitKeys().getStatus()); ASSERT_EQ(resp->getSplitKeys().getValue().size(), 1); @@ -182,22 +198,25 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitVectorCommand) { } TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitChunkCommand) { - _scheduler.start(); - ChunkType splitChunk = makeChunk(0, "shard0"); + _scheduler.start(operationContext()); + ChunkType splitChunk = makeChunk(0, kShardId0); auto networkResponseFuture = launchAsync([&]() { onCommand( [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); }); - auto resp = _scheduler.requestSplitChunk( - kNss, splitChunk, ShardKeyPattern(BSON("x" << 1)), std::vector<BSONObj>{BSON("x" << 5)}); + auto resp = _scheduler.requestSplitChunk(operationContext(), + kNss, + splitChunk, + ShardKeyPattern(BSON("x" << 1)), + std::vector<BSONObj>{BSON("x" << 5)}); ASSERT_OK(resp->getOutcome()); networkResponseFuture.default_timed_get(); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) { - _scheduler.start(); - ChunkType chunk = makeChunk(0, "shard0"); + _scheduler.start(operationContext()); + ChunkType chunk = makeChunk(0, kShardId0); BSONObjBuilder chunkSizeResponse; chunkSizeResponse.append("ok", "1"); chunkSizeResponse.append("size", 156); @@ -206,8 +225,8 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) { onCommand( [&](const executor::RemoteCommandRequest& request) { return chunkSizeResponse.obj(); }); }); - auto resp = - _scheduler.requestChunkDataSize(kNss, chunk, ShardKeyPattern(BSON("x" << 1)), false); + auto resp = _scheduler.requestChunkDataSize( + operationContext(), kNss, chunk, ShardKeyPattern(BSON("x" << 1)), false); ASSERT_OK(resp->getOutcome()); ASSERT_OK(resp->getSize().getStatus()); ASSERT_EQ(resp->getSize().getValue(), 156); @@ -218,14 +237,14 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) { } TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) { - _scheduler.start(); - ChunkType moveChunk = makeChunk(0, "shard0"); + _scheduler.start(operationContext()); + ChunkType moveChunk = makeChunk(0, kShardId0); auto timeoutError = Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"}; auto networkResponseFuture = launchAsync([&]() { onCommand([&](const executor::RemoteCommandRequest& request) { return timeoutError; }); }); auto resp = _scheduler.requestMoveChunk( - kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings()); + operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); ASSERT_EQUALS(resp->getOutcome(), timeoutError); networkResponseFuture.default_timed_get(); // Ensure DistLock is released correctly @@ -241,9 +260,9 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) { } TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) { - ChunkType moveChunk = makeChunk(0, "shard0"); + ChunkType moveChunk = makeChunk(0, kShardId0); auto resp = _scheduler.requestMoveChunk( - kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings()); + operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); ASSERT_EQUALS( resp->getOutcome(), Status(ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped")); @@ -262,10 +281,10 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) { std::unique_ptr<MoveChunkResponse> resp; { FailPointEnableBlock failPoint("pauseBalancerWorkerThread"); - _scheduler.start(); - ChunkType moveChunk = makeChunk(0, "shard0"); + _scheduler.start(operationContext()); + ChunkType moveChunk = makeChunk(0, kShardId0); resp = _scheduler.requestMoveChunk( - kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings()); + operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); _scheduler.stop(); } ASSERT_EQUALS( @@ -282,12 +301,98 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) { } } +TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenRequestIsSubmitted) { + // This prevents the request from being submitted by the scheduler worker thread. + FailPointEnableBlock failPoint("pauseBalancerWorkerThread"); + + auto opCtx = operationContext(); + _scheduler.start(opCtx); + ChunkType moveChunk = makeChunk(0, kShardId0); + auto requestSettings = getDefaultMoveChunkSettings(); + + auto deferredResponse = _scheduler.requestMoveChunk( + operationContext(), kNss, moveChunk, ShardId(kShardId1), requestSettings); + + // The command is persisted... + auto persistedCommandDocs = getPersistedCommandDocuments(opCtx); + ASSERT_EQUALS(1, persistedCommandDocs.size()); + auto persistedCommand = PersistedBalancerCommand::parse( + IDLParserErrorContext("BalancerCommandsSchedulerTest"), persistedCommandDocs[0]); + // ... with the expected info. + ASSERT_EQ(deferredResponse->getRequestId(), persistedCommand.getRequestId()); + ASSERT_EQ(kNss, persistedCommand.getNss()); + ASSERT_EQ(moveChunk.getShard(), persistedCommand.getTarget()); + ASSERT_TRUE(persistedCommand.getRequiresDistributedLock()); + auto originalCommandInfo = MoveChunkCommandInfo(kNss, + moveChunk.getShard(), + kShardId1, + moveChunk.getMin(), + moveChunk.getMax(), + requestSettings.maxChunkSizeBytes, + requestSettings.secondaryThrottle, + requestSettings.waitForDelete, + requestSettings.forceJumbo, + moveChunk.getVersion()); + ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), persistedCommand.getRemoteCommand()); +} + +TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecoveringFromCrash) { + FailPoint* failpoint = globalFailPointRegistry().find("pauseBalancerWorkerThread"); + failpoint->setMode(FailPoint::Mode::alwaysOn); + auto opCtx = operationContext(); + _scheduler.start(opCtx); + ChunkType moveChunk = makeChunk(0, kShardId0); + auto requestSettings = getDefaultMoveChunkSettings(); + auto networkResponseFuture = launchAsync([&]() { + onCommand([&](const executor::RemoteCommandRequest& request) { + auto originalCommandInfo = MoveChunkCommandInfo(kNss, + moveChunk.getShard(), + kShardId1, + moveChunk.getMin(), + moveChunk.getMax(), + requestSettings.maxChunkSizeBytes, + requestSettings.secondaryThrottle, + requestSettings.waitForDelete, + requestSettings.forceJumbo, + moveChunk.getVersion()); + // 4. ... Which is inspected here. + ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), request.cmdObj); + + return BSON("ok" << true); + }); + }); + + auto resp = _scheduler.requestMoveChunk( + operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); + _scheduler.stop(); + failpoint->setMode(FailPoint::Mode::off); + + // 1. The original submission is expected to fail... + ASSERT_EQUALS( + resp->getOutcome(), + Status(ErrorCodes::CallbackCanceled, "Request cancelled - balancer scheduler is stopping")); + + // 2. ... And a recovery document to be persisted + auto persistedCommandDocs = getPersistedCommandDocuments(operationContext()); + ASSERT_EQUALS(1, persistedCommandDocs.size()); + + // 3. After restarting, the persisted document should eventually trigger a remote execution... + _scheduler.start(opCtx); + networkResponseFuture.default_timed_get(); + + // 5. Once the recovery is complete, no persisted documents should remain + // (stop() is invoked to ensure that the observed state is stable). + _scheduler.stop(); + persistedCommandDocs = getPersistedCommandDocuments(operationContext()); + ASSERT_EQUALS(0, persistedCommandDocs.size()); +} + TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL) { OperationContext* opCtx; FailPoint* failpoint = globalFailPointRegistry().find("pauseBalancerWorkerThread"); failpoint->setMode(FailPoint::Mode::alwaysOn); { - _scheduler.start(); + _scheduler.start(operationContext()); opCtx = Client::getCurrent()->getOperationContext(); const std::string whyMessage(str::stream() << "Test acquisition of distLock for " << kNss.ns()); @@ -295,9 +400,9 @@ TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL opCtx, kNss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); ASSERT_OK(scopedDistLock.getStatus()); failpoint->setMode(FailPoint::Mode::off); - ChunkType moveChunk = makeChunk(0, "shard0"); + ChunkType moveChunk = makeChunk(0, kShardId0); auto resp = _scheduler.requestMoveChunk( - kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings()); + operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); ASSERT_EQ( resp->getOutcome(), Status(ErrorCodes::LockBusy, "Failed to acquire dist lock testDb.testColl locally")); diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp index 188279a1b7c..c3bdca61a81 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp @@ -45,6 +45,7 @@ #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/query_request_helper.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/balancer/balancer_command_document_gen.h" #include "mongo/db/s/balancer/type_migration.h" #include "mongo/db/s/sharding_util.h" #include "mongo/db/s/type_lockpings.h" @@ -427,6 +428,18 @@ Status ShardingCatalogManager::_initConfigIndexes(OperationContext* opCtx) { return result.withContext("couldn't create ns_1_min_1 index on config.migrations"); } + result = + configShard->createIndexOnConfig(opCtx, + NamespaceString::kConfigBalancerCommandsNamespace, + BSON(PersistedBalancerCommand::kRequestIdFieldName << 1), + unique); + if (!result.isOK()) { + return result.withContext( + "couldn't create requestId_1 index on " + "config.balancerCommandsSchedulerOngoingOperations"); + } + + result = configShard->createIndexOnConfig( opCtx, ShardType::ConfigNS, BSON(ShardType::host() << 1), unique); if (!result.isOK()) { |