summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp68
-rw-r--r--src/mongo/db/s/balancer/balancer.h14
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler.h5
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp16
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h34
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp36
-rw-r--r--src/mongo/db/s/config/configsvr_move_range_command.cpp40
7 files changed, 154 insertions, 59 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index 0271eeb5d4c..344e0442eac 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -201,6 +201,17 @@ Status processManualMigrationOutcome(OperationContext* opCtx,
return outcome;
}
+
+uint64_t getMaxChunkSizeBytes(OperationContext* opCtx, const CollectionType& coll) {
+ const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
+ uassertStatusOK(balancerConfig->refreshAndCheck(opCtx));
+ return coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes());
+}
+
+const int64_t getMaxChunkSizeMB(OperationContext* opCtx, const CollectionType& coll) {
+ return getMaxChunkSizeBytes(opCtx, coll) / (1024 * 1024);
+};
+
const auto _balancerDecoration = ServiceContext::declareDecoration<Balancer>();
const ReplicaSetAwareServiceRegistry::Registerer<Balancer> _balancerRegisterer("Balancer");
@@ -351,16 +362,7 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx,
auto coll = Grid::get(opCtx)->catalogClient()->getCollection(
opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
- auto maxChunkSize = coll.getMaxChunkSizeBytes().value_or(-1);
- if (maxChunkSize <= 0) {
- auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
- Status refreshStatus = balancerConfig->refreshAndCheck(opCtx);
- if (!refreshStatus.isOK()) {
- return refreshStatus;
- }
-
- maxChunkSize = balancerConfig->getMaxChunkSizeBytes();
- }
+ const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll);
MoveChunkSettings settings(maxChunkSize, secondaryThrottle, waitForDelete);
MigrateInfo migrateInfo(newShardId,
@@ -376,6 +378,40 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx,
return processManualMigrationOutcome(opCtx, chunk.getMin(), nss, newShardId, response);
}
+Status Balancer::moveRange(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const MoveRangeRequest& request,
+ bool issuedByRemoteUser) {
+ auto coll = Grid::get(opCtx)->catalogClient()->getCollection(
+ opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
+ const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll);
+
+ const auto chunk = [&]() {
+ const auto cm =
+ Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfo(opCtx, nss);
+ return cm.findIntersectingChunkWithSimpleCollation(request.getMin());
+ }();
+
+ // TODO SERVER-64148 handle `moveRange` with bound(s) not necessarily matching a chunk
+ bool validBounds = request.getMin().woCompare(chunk.getMin()) == 0 &&
+ request.getMax().woCompare(chunk.getMax()) == 0;
+ uassert(ErrorCodes::CommandFailed,
+ "No chunk found with the provided shard key bounds",
+ validBounds);
+
+ ShardsvrMoveRange shardSvrRequest(nss);
+ shardSvrRequest.setDbName(NamespaceString::kAdminDb);
+ shardSvrRequest.setMoveRangeRequest(request);
+ shardSvrRequest.setMaxChunkSizeBytes(maxChunkSize);
+ shardSvrRequest.setFromShard(chunk.getShardId());
+ shardSvrRequest.setEpoch(coll.getEpoch());
+
+ auto response = _commandScheduler->requestMoveRange(opCtx, shardSvrRequest, issuedByRemoteUser)
+ .getNoThrow();
+ return processManualMigrationOutcome(
+ opCtx, chunk.getMin(), nss, shardSvrRequest.getToShard(), std::move(response));
+}
+
void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) {
auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
balancerConfig->refreshAndCheck(opCtx).ignore();
@@ -957,17 +993,7 @@ BalancerCollectionStatusResponse Balancer::getBalancerStatusForNs(OperationConte
uasserted(ErrorCodes::NamespaceNotSharded, "Collection unsharded or undefined");
}
- const auto maxChunkSizeMB = [&]() -> int64_t {
- int64_t value = 0;
- if (const auto& collOverride = coll.getMaxChunkSizeBytes(); collOverride.is_initialized()) {
- value = *collOverride;
- } else {
- auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
- uassertStatusOK(balancerConfig->refreshAndCheck(opCtx));
- value = balancerConfig->getMaxChunkSizeBytes();
- }
- return value / (1024 * 1024);
- }();
+ const auto maxChunkSizeMB = getMaxChunkSizeMB(opCtx, coll);
BalancerCollectionStatusResponse response(maxChunkSizeMB, true /*balancerCompliant*/);
auto setViolationOnResponse = [&response](const StringData& reason,
const boost::optional<BSONObj>& details =
diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h
index f0710e245b6..54ac2fce63f 100644
--- a/src/mongo/db/s/balancer/balancer.h
+++ b/src/mongo/db/s/balancer/balancer.h
@@ -34,6 +34,7 @@
#include "mongo/db/s/balancer/balancer_random.h"
#include "mongo/platform/mutex.h"
#include "mongo/s/request_types/balancer_collection_status_gen.h"
+#include "mongo/s/request_types/move_range_request_gen.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
@@ -169,6 +170,19 @@ public:
bool forceJumbo);
/**
+ * Blocking call, which requests the balancer to move a range to the specified location
+ * in accordance with the active balancer policy. An error will be returned if the attempt to
+ * move fails for any reason.
+ *
+ * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the
+ * move regardless.
+ */
+ Status moveRange(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const MoveRangeRequest& request,
+ bool issuedByRemoteUser);
+
+ /**
* Appends the runtime state of the balancer instance to the specified builder.
*/
void report(OperationContext* opCtx, BSONObjBuilder* builder);
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler.h b/src/mongo/db/s/balancer/balancer_commands_scheduler.h
index dae383b5723..e3c444161d0 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler.h
@@ -33,6 +33,7 @@
#include "mongo/db/s/balancer/balancer_policy.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/request_types/move_chunk_request.h"
+#include "mongo/s/request_types/move_range_request_gen.h"
#include "mongo/s/shard_id.h"
namespace mongo {
@@ -148,6 +149,10 @@ public:
const ChunkVersion& version,
const KeyPattern& keyPattern,
bool estimatedValue) = 0;
+
+ virtual SemiFuture<void> requestMoveRange(OperationContext* opCtx,
+ ShardsvrMoveRange& request,
+ bool issuedByRemoteUser) = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
index fe8f6561ca3..338734c905e 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
@@ -229,6 +229,22 @@ SemiFuture<void> BalancerCommandsSchedulerImpl::requestMoveChunk(
.semi();
}
+SemiFuture<void> BalancerCommandsSchedulerImpl::requestMoveRange(OperationContext* opCtx,
+ ShardsvrMoveRange& request,
+ bool issuedByRemoteUser) {
+ auto externalClientInfo =
+ issuedByRemoteUser ? boost::optional<ExternalClientInfo>(opCtx) : boost::none;
+
+ auto commandInfo = std::make_shared<MoveRangeCommandInfo>(
+ request, opCtx->getWriteConcern(), std::move(externalClientInfo));
+
+ return _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo))
+ .then([](const executor::RemoteCommandResponse& remoteResponse) {
+ return processRemoteResponse(remoteResponse);
+ })
+ .semi();
+}
+
SemiFuture<void> BalancerCommandsSchedulerImpl::requestMergeChunks(OperationContext* opCtx,
const NamespaceString& nss,
const ShardId& shardId,
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
index b433018d1ef..7f57527367e 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
@@ -113,6 +113,36 @@ private:
boost::optional<ExternalClientInfo> _clientInfo;
};
+class MoveRangeCommandInfo : public CommandInfo {
+public:
+ MoveRangeCommandInfo(const ShardsvrMoveRange& request,
+ const WriteConcernOptions& writeConcern,
+ boost::optional<ExternalClientInfo>&& clientInfo)
+ : CommandInfo(request.getFromShard(), request.getCommandParameter(), std::move(clientInfo)),
+ _request(request),
+ _wc(writeConcern) {}
+
+ const ShardsvrMoveRange& getMoveRangeRequest() {
+ return _request;
+ }
+
+ BSONObj serialise() const override {
+ BSONObjBuilder commandBuilder;
+ _request.serialize(BSON(WriteConcernOptions::kWriteConcernField << _wc.toBSON()),
+ &commandBuilder);
+ appendCommandMetadataTo(&commandBuilder);
+ return commandBuilder.obj();
+ }
+
+ bool requiresDistributedLock() const override {
+ return true;
+ }
+
+private:
+ const ShardsvrMoveRange _request;
+ const WriteConcernOptions _wc;
+};
+
/**
* Set of command-specific subclasses of CommandInfo.
*/
@@ -521,6 +551,10 @@ public:
const MoveChunkSettings& commandSettings,
bool issuedByRemoteUser) override;
+ SemiFuture<void> requestMoveRange(OperationContext* opCtx,
+ ShardsvrMoveRange& request,
+ bool issuedByRemoteUser) override;
+
SemiFuture<void> requestMergeChunks(OperationContext* opCtx,
const NamespaceString& nss,
const ShardId& shardId,
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
index 4968b71ebb4..467be0cf563 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
@@ -170,6 +170,42 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) {
_scheduler.stop();
}
+TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) {
+ auto deferredCleanupCompletedCheckpoint =
+ globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint");
+ auto timesEnteredFailPoint =
+ deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0);
+ _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
+ ShardsvrMoveRange shardsvrRequest(kNss);
+ shardsvrRequest.setDbName(NamespaceString::kAdminDb);
+ shardsvrRequest.setFromShard(kShardId0);
+ auto& moveRangeRequest = shardsvrRequest.getMoveRangeRequest();
+ moveRangeRequest.setToShard(kShardId1);
+ moveRangeRequest.setMin({});
+ moveRangeRequest.setMax({});
+
+ auto networkResponseFuture = launchAsync([&]() {
+ onCommand(
+ [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
+ });
+ auto futureResponse = _scheduler.requestMoveRange(
+ operationContext(), shardsvrRequest, false /* issuedByRemoteUser */);
+ ASSERT_OK(futureResponse.getNoThrow());
+ networkResponseFuture.default_timed_get();
+ deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1);
+ // Ensure DistLock is released correctly
+ {
+ auto opCtx = Client::getCurrent()->getOperationContext();
+ const std::string whyMessage(str::stream()
+ << "Test acquisition of distLock for " << kNss.ns());
+ auto scopedDistLock = DistLockManager::get(opCtx)->lock(
+ opCtx, kNss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
+ ASSERT_OK(scopedDistLock.getStatus());
+ }
+ deferredCleanupCompletedCheckpoint->setMode(FailPoint::off, 0);
+ _scheduler.stop();
+}
+
TEST_F(BalancerCommandsSchedulerTest, SuccessfulMergeChunkCommand) {
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
auto networkResponseFuture = launchAsync([&]() {
diff --git a/src/mongo/db/s/config/configsvr_move_range_command.cpp b/src/mongo/db/s/config/configsvr_move_range_command.cpp
index c95b02b6e25..0aba8cc7274 100644
--- a/src/mongo/db/s/config/configsvr_move_range_command.cpp
+++ b/src/mongo/db/s/config/configsvr_move_range_command.cpp
@@ -85,44 +85,8 @@ public:
Grid::get(opCtx)->shardRegistry()->getShard(opCtx, req.getToShard()),
"Could not find destination shard");
- const auto cm =
- Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfo(opCtx, nss);
- const auto chunk = cm.findIntersectingChunkWithSimpleCollation(req.getMin());
-
- bool validBounds = req.getMin().woCompare(chunk.getMin()) == 0 &&
- req.getMax().woCompare(chunk.getMax()) == 0;
- uassert(ErrorCodes::CommandFailed,
- "No chunk found with the provided shard key bounds",
- validBounds);
-
- ChunkType chunkType;
- chunkType.setCollectionUUID(cm.getUUID());
- chunkType.setMin(chunk.getMin());
- chunkType.setMax(chunk.getMax());
- chunkType.setShard(chunk.getShardId());
- chunkType.setVersion(cm.getVersion());
-
- {
- // TODO SERVER-64324 replace this scope calling moveRange instead of moveChunk
- MigrationSecondaryThrottleOptions secondaryThrottle = [&]() {
- if (!req.getSecondaryThrottle()) {
- return MigrationSecondaryThrottleOptions::create(
- MigrationSecondaryThrottleOptions::kOff);
- }
-
- return MigrationSecondaryThrottleOptions::createWithWriteConcern(
- opCtx->getWriteConcern());
- }();
-
- const bool forceJumbo = req.getForceJumbo() != ForceJumbo::kDoNotForce;
- uassertStatusOK(Balancer::get(opCtx)->moveSingleChunk(opCtx,
- nss,
- chunkType,
- req.getToShard(),
- secondaryThrottle,
- req.getWaitForDelete(),
- forceJumbo));
- }
+ uassertStatusOK(Balancer::get(opCtx)->moveRange(
+ opCtx, nss, req.getMoveRangeRequest(), true /* issuedByRemoteUser */));
}
private: