diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/balancer/balancer.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer.h | 14 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h | 34 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_move_range_command.cpp | 40 |
7 files changed, 154 insertions, 59 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 0271eeb5d4c..344e0442eac 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -201,6 +201,17 @@ Status processManualMigrationOutcome(OperationContext* opCtx, return outcome; } + +uint64_t getMaxChunkSizeBytes(OperationContext* opCtx, const CollectionType& coll) { + const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + uassertStatusOK(balancerConfig->refreshAndCheck(opCtx)); + return coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes()); +} + +const int64_t getMaxChunkSizeMB(OperationContext* opCtx, const CollectionType& coll) { + return getMaxChunkSizeBytes(opCtx, coll) / (1024 * 1024); +}; + const auto _balancerDecoration = ServiceContext::declareDecoration<Balancer>(); const ReplicaSetAwareServiceRegistry::Registerer<Balancer> _balancerRegisterer("Balancer"); @@ -351,16 +362,7 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx, auto coll = Grid::get(opCtx)->catalogClient()->getCollection( opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern); - auto maxChunkSize = coll.getMaxChunkSizeBytes().value_or(-1); - if (maxChunkSize <= 0) { - auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); - Status refreshStatus = balancerConfig->refreshAndCheck(opCtx); - if (!refreshStatus.isOK()) { - return refreshStatus; - } - - maxChunkSize = balancerConfig->getMaxChunkSizeBytes(); - } + const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll); MoveChunkSettings settings(maxChunkSize, secondaryThrottle, waitForDelete); MigrateInfo migrateInfo(newShardId, @@ -376,6 +378,40 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx, return processManualMigrationOutcome(opCtx, chunk.getMin(), nss, newShardId, response); } +Status Balancer::moveRange(OperationContext* opCtx, + const NamespaceString& nss, + const MoveRangeRequest& request, + bool issuedByRemoteUser) { + auto coll = Grid::get(opCtx)->catalogClient()->getCollection( + opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern); + const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll); + + const auto chunk = [&]() { + const auto cm = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfo(opCtx, nss); + return cm.findIntersectingChunkWithSimpleCollation(request.getMin()); + }(); + + // TODO SERVER-64148 handle `moveRange` with bound(s) not necessarily matching a chunk + bool validBounds = request.getMin().woCompare(chunk.getMin()) == 0 && + request.getMax().woCompare(chunk.getMax()) == 0; + uassert(ErrorCodes::CommandFailed, + "No chunk found with the provided shard key bounds", + validBounds); + + ShardsvrMoveRange shardSvrRequest(nss); + shardSvrRequest.setDbName(NamespaceString::kAdminDb); + shardSvrRequest.setMoveRangeRequest(request); + shardSvrRequest.setMaxChunkSizeBytes(maxChunkSize); + shardSvrRequest.setFromShard(chunk.getShardId()); + shardSvrRequest.setEpoch(coll.getEpoch()); + + auto response = _commandScheduler->requestMoveRange(opCtx, shardSvrRequest, issuedByRemoteUser) + .getNoThrow(); + return processManualMigrationOutcome( + opCtx, chunk.getMin(), nss, shardSvrRequest.getToShard(), std::move(response)); +} + void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) { auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); balancerConfig->refreshAndCheck(opCtx).ignore(); @@ -957,17 +993,7 @@ BalancerCollectionStatusResponse Balancer::getBalancerStatusForNs(OperationConte uasserted(ErrorCodes::NamespaceNotSharded, "Collection unsharded or undefined"); } - const auto maxChunkSizeMB = [&]() -> int64_t { - int64_t value = 0; - if (const auto& collOverride = coll.getMaxChunkSizeBytes(); collOverride.is_initialized()) { - value = *collOverride; - } else { - auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); - uassertStatusOK(balancerConfig->refreshAndCheck(opCtx)); - value = balancerConfig->getMaxChunkSizeBytes(); - } - return value / (1024 * 1024); - }(); + const auto maxChunkSizeMB = getMaxChunkSizeMB(opCtx, coll); BalancerCollectionStatusResponse response(maxChunkSizeMB, true /*balancerCompliant*/); auto setViolationOnResponse = [&response](const StringData& reason, const boost::optional<BSONObj>& details = diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index f0710e245b6..54ac2fce63f 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -34,6 +34,7 @@ #include "mongo/db/s/balancer/balancer_random.h" #include "mongo/platform/mutex.h" #include "mongo/s/request_types/balancer_collection_status_gen.h" +#include "mongo/s/request_types/move_range_request_gen.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" @@ -169,6 +170,19 @@ public: bool forceJumbo); /** + * Blocking call, which requests the balancer to move a range to the specified location + * in accordance with the active balancer policy. An error will be returned if the attempt to + * move fails for any reason. + * + * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the + * move regardless. + */ + Status moveRange(OperationContext* opCtx, + const NamespaceString& nss, + const MoveRangeRequest& request, + bool issuedByRemoteUser); + + /** * Appends the runtime state of the balancer instance to the specified builder. */ void report(OperationContext* opCtx, BSONObjBuilder* builder); diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler.h b/src/mongo/db/s/balancer/balancer_commands_scheduler.h index dae383b5723..e3c444161d0 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler.h @@ -33,6 +33,7 @@ #include "mongo/db/s/balancer/balancer_policy.h" #include "mongo/executor/task_executor.h" #include "mongo/s/request_types/move_chunk_request.h" +#include "mongo/s/request_types/move_range_request_gen.h" #include "mongo/s/shard_id.h" namespace mongo { @@ -148,6 +149,10 @@ public: const ChunkVersion& version, const KeyPattern& keyPattern, bool estimatedValue) = 0; + + virtual SemiFuture<void> requestMoveRange(OperationContext* opCtx, + ShardsvrMoveRange& request, + bool issuedByRemoteUser) = 0; }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index fe8f6561ca3..338734c905e 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -229,6 +229,22 @@ SemiFuture<void> BalancerCommandsSchedulerImpl::requestMoveChunk( .semi(); } +SemiFuture<void> BalancerCommandsSchedulerImpl::requestMoveRange(OperationContext* opCtx, + ShardsvrMoveRange& request, + bool issuedByRemoteUser) { + auto externalClientInfo = + issuedByRemoteUser ? boost::optional<ExternalClientInfo>(opCtx) : boost::none; + + auto commandInfo = std::make_shared<MoveRangeCommandInfo>( + request, opCtx->getWriteConcern(), std::move(externalClientInfo)); + + return _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo)) + .then([](const executor::RemoteCommandResponse& remoteResponse) { + return processRemoteResponse(remoteResponse); + }) + .semi(); +} + SemiFuture<void> BalancerCommandsSchedulerImpl::requestMergeChunks(OperationContext* opCtx, const NamespaceString& nss, const ShardId& shardId, diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h index b433018d1ef..7f57527367e 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h @@ -113,6 +113,36 @@ private: boost::optional<ExternalClientInfo> _clientInfo; }; +class MoveRangeCommandInfo : public CommandInfo { +public: + MoveRangeCommandInfo(const ShardsvrMoveRange& request, + const WriteConcernOptions& writeConcern, + boost::optional<ExternalClientInfo>&& clientInfo) + : CommandInfo(request.getFromShard(), request.getCommandParameter(), std::move(clientInfo)), + _request(request), + _wc(writeConcern) {} + + const ShardsvrMoveRange& getMoveRangeRequest() { + return _request; + } + + BSONObj serialise() const override { + BSONObjBuilder commandBuilder; + _request.serialize(BSON(WriteConcernOptions::kWriteConcernField << _wc.toBSON()), + &commandBuilder); + appendCommandMetadataTo(&commandBuilder); + return commandBuilder.obj(); + } + + bool requiresDistributedLock() const override { + return true; + } + +private: + const ShardsvrMoveRange _request; + const WriteConcernOptions _wc; +}; + /** * Set of command-specific subclasses of CommandInfo. */ @@ -521,6 +551,10 @@ public: const MoveChunkSettings& commandSettings, bool issuedByRemoteUser) override; + SemiFuture<void> requestMoveRange(OperationContext* opCtx, + ShardsvrMoveRange& request, + bool issuedByRemoteUser) override; + SemiFuture<void> requestMergeChunks(OperationContext* opCtx, const NamespaceString& nss, const ShardId& shardId, diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index 4968b71ebb4..467be0cf563 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -170,6 +170,42 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) { _scheduler.stop(); } +TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) { + auto deferredCleanupCompletedCheckpoint = + globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint"); + auto timesEnteredFailPoint = + deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0); + _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); + ShardsvrMoveRange shardsvrRequest(kNss); + shardsvrRequest.setDbName(NamespaceString::kAdminDb); + shardsvrRequest.setFromShard(kShardId0); + auto& moveRangeRequest = shardsvrRequest.getMoveRangeRequest(); + moveRangeRequest.setToShard(kShardId1); + moveRangeRequest.setMin({}); + moveRangeRequest.setMax({}); + + auto networkResponseFuture = launchAsync([&]() { + onCommand( + [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + auto futureResponse = _scheduler.requestMoveRange( + operationContext(), shardsvrRequest, false /* issuedByRemoteUser */); + ASSERT_OK(futureResponse.getNoThrow()); + networkResponseFuture.default_timed_get(); + deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1); + // Ensure DistLock is released correctly + { + auto opCtx = Client::getCurrent()->getOperationContext(); + const std::string whyMessage(str::stream() + << "Test acquisition of distLock for " << kNss.ns()); + auto scopedDistLock = DistLockManager::get(opCtx)->lock( + opCtx, kNss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); + ASSERT_OK(scopedDistLock.getStatus()); + } + deferredCleanupCompletedCheckpoint->setMode(FailPoint::off, 0); + _scheduler.stop(); +} + TEST_F(BalancerCommandsSchedulerTest, SuccessfulMergeChunkCommand) { _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); auto networkResponseFuture = launchAsync([&]() { diff --git a/src/mongo/db/s/config/configsvr_move_range_command.cpp b/src/mongo/db/s/config/configsvr_move_range_command.cpp index c95b02b6e25..0aba8cc7274 100644 --- a/src/mongo/db/s/config/configsvr_move_range_command.cpp +++ b/src/mongo/db/s/config/configsvr_move_range_command.cpp @@ -85,44 +85,8 @@ public: Grid::get(opCtx)->shardRegistry()->getShard(opCtx, req.getToShard()), "Could not find destination shard"); - const auto cm = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfo(opCtx, nss); - const auto chunk = cm.findIntersectingChunkWithSimpleCollation(req.getMin()); - - bool validBounds = req.getMin().woCompare(chunk.getMin()) == 0 && - req.getMax().woCompare(chunk.getMax()) == 0; - uassert(ErrorCodes::CommandFailed, - "No chunk found with the provided shard key bounds", - validBounds); - - ChunkType chunkType; - chunkType.setCollectionUUID(cm.getUUID()); - chunkType.setMin(chunk.getMin()); - chunkType.setMax(chunk.getMax()); - chunkType.setShard(chunk.getShardId()); - chunkType.setVersion(cm.getVersion()); - - { - // TODO SERVER-64324 replace this scope calling moveRange instead of moveChunk - MigrationSecondaryThrottleOptions secondaryThrottle = [&]() { - if (!req.getSecondaryThrottle()) { - return MigrationSecondaryThrottleOptions::create( - MigrationSecondaryThrottleOptions::kOff); - } - - return MigrationSecondaryThrottleOptions::createWithWriteConcern( - opCtx->getWriteConcern()); - }(); - - const bool forceJumbo = req.getForceJumbo() != ForceJumbo::kDoNotForce; - uassertStatusOK(Balancer::get(opCtx)->moveSingleChunk(opCtx, - nss, - chunkType, - req.getToShard(), - secondaryThrottle, - req.getWaitForDelete(), - forceJumbo)); - } + uassertStatusOK(Balancer::get(opCtx)->moveRange( + opCtx, nss, req.getMoveRangeRequest(), true /* issuedByRemoteUser */)); } private: |