diff options
author | Janna Golden <janna.golden@mongodb.com> | 2019-11-05 15:52:39 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-05 15:52:39 +0000 |
commit | c150b588cb4400e0324becd916de2a699988af99 (patch) | |
tree | b007619ce25c62cad0e275d01a8f57be0fa36d57 /src/mongo/db/s/balancer | |
parent | da5b6eff05c710029994ae1e06b47d1974974f6c (diff) | |
download | mongo-c150b588cb4400e0324becd916de2a699988af99.tar.gz |
SERVER-42273 Introduce 'force' option to 'moveChunk' to allow migrating jumbo chunks
Diffstat (limited to 'src/mongo/db/s/balancer')
-rw-r--r-- | src/mongo/db/s/balancer/balancer.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_policy.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_policy.h | 12 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_policy_test.cpp | 71 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager_test.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/scoped_migration_request_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/type_migration.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/type_migration.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/type_migration_test.cpp | 6 |
12 files changed, 158 insertions, 68 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 0f1f30a89dc..67bcb23220f 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -273,14 +273,22 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx, const ShardId& newShardId, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete) { + bool waitForDelete, + bool forceJumbo) { auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(opCtx, chunk, newShardId); if (!moveAllowedStatus.isOK()) { return moveAllowedStatus; } return _migrationManager.executeManualMigration( - opCtx, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete); + opCtx, + MigrateInfo(newShardId, + chunk, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceManual + : MoveChunkRequest::ForceJumbo::kDoNotForce), + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete); } void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) { @@ -603,7 +611,13 @@ int Balancer::_moveChunks(OperationContext* opCtx, }); invariant(requestIt != candidateChunks.end()); - if (status == ErrorCodes::ChunkTooBig) { + // ChunkTooBig is returned by the source shard during the cloning phase if the migration + // manager finds that the chunk is larger than some calculated size, the source shard is + // *not* in draining mode, and the 'forceJumbo' balancer setting is 'kDoNotForce'. + // ExceededMemoryLimit is returned when the transfer mods queue surpasses 500MB regardless + // of whether the source shard is in draining mode or the value if the 'froceJumbo' balancer + // setting. + if (status == ErrorCodes::ChunkTooBig || status == ErrorCodes::ExceededMemoryLimit) { numChunksProcessed++; log() << "Performing a split because migration " << redact(requestIt->toString()) diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index 1df09000920..b4d8c9e02a0 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -136,7 +136,8 @@ public: const ShardId& newShardId, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete); + bool waitForDelete, + bool forceJumbo); /** * Appends the runtime state of the balancer instance to the specified builder. diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index a8f7cde67f4..4e24a95ea91 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -39,6 +39,7 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj_comparator_interface.h" +#include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_tags.h" @@ -468,7 +469,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi } } - return BalancerPolicy::balance(shardStats, distribution, usedShards); + return BalancerPolicy::balance( + shardStats, + distribution, + usedShards, + Grid::get(opCtx)->getBalancerConfiguration()->attemptToBalanceJumboChunks()); } } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp index 6b3fb9d78ba..894c40dea3d 100644 --- a/src/mongo/db/s/balancer/balancer_policy.cpp +++ b/src/mongo/db/s/balancer/balancer_policy.cpp @@ -347,12 +347,15 @@ MigrateInfo chooseRandomMigration(const ShardStatisticsVector& shardStats, const auto& chunks = distribution.getChunks(sourceShardId); - return {destShardId, chunks[getRandomIndex(chunks.size())]}; + return {destShardId, + chunks[getRandomIndex(chunks.size())], + MoveChunkRequest::ForceJumbo::kDoNotForce}; } vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, - std::set<ShardId>* usedShards) { + std::set<ShardId>* usedShards, + bool forceJumbo) { vector<MigrateInfo> migrations; if (MONGO_unlikely(balancerShouldReturnRandomMigrations.shouldFail()) && @@ -405,7 +408,7 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt } invariant(to != stat.shardId); - migrations.emplace_back(to, chunk); + migrations.emplace_back(to, chunk, MoveChunkRequest::ForceJumbo::kForceBalancer); invariant(usedShards->insert(stat.shardId).second); invariant(usedShards->insert(to).second); break; @@ -452,7 +455,10 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt } invariant(to != stat.shardId); - migrations.emplace_back(to, chunk); + migrations.emplace_back(to, + chunk, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer + : MoveChunkRequest::ForceJumbo::kDoNotForce); invariant(usedShards->insert(stat.shardId).second); invariant(usedShards->insert(to).second); break; @@ -498,7 +504,9 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt tag, idealNumberOfChunksPerShardForTag, &migrations, - usedShards)) + usedShards, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer + : MoveChunkRequest::ForceJumbo::kDoNotForce)) ; } @@ -517,7 +525,7 @@ boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk( return boost::optional<MigrateInfo>(); } - return MigrateInfo(newShardId, chunk); + return MigrateInfo(newShardId, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce); } bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, @@ -525,7 +533,8 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, const string& tag, size_t idealNumberOfChunksPerShardForTag, vector<MigrateInfo>* migrations, - set<ShardId>* usedShards) { + set<ShardId>* usedShards, + MoveChunkRequest::ForceJumbo forceJumbo) { const ShardId from = _getMostOverloadedShard(shardStats, distribution, tag, *usedShards); if (!from.isValid()) return false; @@ -576,7 +585,7 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, continue; } - migrations->emplace_back(to, chunk); + migrations->emplace_back(to, chunk, forceJumbo); invariant(usedShards->insert(chunk.getShard()).second); invariant(usedShards->insert(to).second); return true; @@ -598,7 +607,9 @@ string ZoneRange::toString() const { return str::stream() << min << " -->> " << max << " on " << zone; } -MigrateInfo::MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk) { +MigrateInfo::MigrateInfo(const ShardId& a_to, + const ChunkType& a_chunk, + const MoveChunkRequest::ForceJumbo a_forceJumbo) { invariant(a_chunk.validate()); invariant(a_to.isValid()); @@ -609,6 +620,7 @@ MigrateInfo::MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk) { minKey = a_chunk.getMin(); maxKey = a_chunk.getMax(); version = a_chunk.getVersion(); + forceJumbo = a_forceJumbo; } std::string MigrateInfo::getName() const { diff --git a/src/mongo/db/s/balancer/balancer_policy.h b/src/mongo/db/s/balancer/balancer_policy.h index 326fd7985a2..c75c5ceda2d 100644 --- a/src/mongo/db/s/balancer/balancer_policy.h +++ b/src/mongo/db/s/balancer/balancer_policy.h @@ -37,6 +37,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/s/balancer/cluster_statistics.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/request_types/move_chunk_request.h" #include "mongo/s/shard_id.h" namespace mongo { @@ -52,7 +53,9 @@ struct ZoneRange { }; struct MigrateInfo { - MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk); + MigrateInfo(const ShardId& a_to, + const ChunkType& a_chunk, + MoveChunkRequest::ForceJumbo a_forceJumbo); std::string getName() const; @@ -66,6 +69,7 @@ struct MigrateInfo { BSONObj minKey; BSONObj maxKey; ChunkVersion version; + MoveChunkRequest::ForceJumbo forceJumbo; }; typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector; @@ -190,7 +194,8 @@ public: */ static std::vector<MigrateInfo> balance(const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, - std::set<ShardId>* usedShards); + std::set<ShardId>* usedShards, + bool forceJumbo); /** * Using the specified distribution information, returns a suggested better location for the @@ -236,7 +241,8 @@ private: const std::string& tag, size_t idealNumberOfChunksPerShardForTag, std::vector<MigrateInfo>* migrations, - std::set<ShardId>* usedShards); + std::set<ShardId>* usedShards, + MoveChunkRequest::ForceJumbo forceJumbo); }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_policy_test.cpp b/src/mongo/db/s/balancer/balancer_policy_test.cpp index 0c5e9fe5822..6ef9f2543a8 100644 --- a/src/mongo/db/s/balancer/balancer_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_policy_test.cpp @@ -113,9 +113,10 @@ std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster( std::vector<MigrateInfo> balanceChunks(const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, - bool shouldAggressivelyBalance) { + bool shouldAggressivelyBalance, + bool forceJumbo) { std::set<ShardId> usedShards; - return BalancerPolicy::balance(shardStats, distribution, &usedShards); + return BalancerPolicy::balance(shardStats, distribution, &usedShards, forceJumbo); } TEST(BalancerPolicy, Basic) { @@ -125,7 +126,7 @@ TEST(BalancerPolicy, Basic) { {ShardStatistics(kShardId2, kNoMaxSize, 3, false, emptyTagSet, emptyShardVersion), 3}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -140,7 +141,7 @@ TEST(BalancerPolicy, SmallClusterShouldBePerfectlyBalanced) { {ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId1, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); @@ -153,10 +154,11 @@ TEST(BalancerPolicy, SingleChunkShouldNotMove) { {{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}, {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); - ASSERT( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true).empty()); - ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false) + ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true, false) .empty()); + ASSERT( + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false) + .empty()); } TEST(BalancerPolicy, BalanceThresholdObeyed) { @@ -166,10 +168,11 @@ TEST(BalancerPolicy, BalanceThresholdObeyed) { {ShardStatistics(kShardId2, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}, {ShardStatistics(kShardId3, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}}); - ASSERT( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true).empty()); - ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false) + ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true, false) .empty()); + ASSERT( + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false) + .empty()); } TEST(BalancerPolicy, ParallelBalancing) { @@ -180,7 +183,7 @@ TEST(BalancerPolicy, ParallelBalancing) { {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -204,7 +207,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotPutChunksOnShardsAboveTheOptimal) { {ShardStatistics(kShardId5, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -226,7 +229,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotMoveChunksFromShardsBelowOptimal) { {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -245,7 +248,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNe // Here kShardId0 would have been selected as a donor std::set<ShardId> usedShards{kShardId0}; const auto migrations(BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards)); + cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId1, migrations[0].from); @@ -264,7 +267,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNo // Here kShardId0 would have been selected as a donor std::set<ShardId> usedShards{kShardId0}; const auto migrations(BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards)); + cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false)); ASSERT_EQ(0U, migrations.size()); } @@ -278,7 +281,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) { // Here kShardId2 would have been selected as a recipient std::set<ShardId> usedShards{kShardId2}; const auto migrations(BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards)); + cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -298,7 +301,7 @@ TEST(BalancerPolicy, JumboChunksNotMoved) { cluster.second[kShardId0][3].setJumbo(true); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -324,7 +327,7 @@ TEST(BalancerPolicy, JumboChunksNotMovedParallel) { cluster.second[kShardId2][3].setJumbo(true); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -345,7 +348,7 @@ TEST(BalancerPolicy, DrainingSingleChunk) { {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -362,7 +365,7 @@ TEST(BalancerPolicy, DrainingSingleChunkPerShard) { {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -383,7 +386,7 @@ TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) { {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -400,7 +403,7 @@ TEST(BalancerPolicy, DrainingMultipleShardsFirstOneSelected) { {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 16}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); @@ -416,7 +419,7 @@ TEST(BalancerPolicy, DrainingMultipleShardsWontAcceptChunks) { {ShardStatistics(kShardId2, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT(migrations.empty()); } @@ -430,7 +433,7 @@ TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) { ASSERT_OK(distribution.addRangeToZone(ZoneRange( cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "LAX"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -448,7 +451,7 @@ TEST(BalancerPolicy, DrainingNoAppropriateShardsFoundDueToTag) { ASSERT_OK(distribution.addRangeToZone(ZoneRange( cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "SEA"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT(migrations.empty()); } @@ -460,7 +463,7 @@ TEST(BalancerPolicy, NoBalancingDueToAllNodesEitherDrainingOrMaxedOut) { {ShardStatistics(kShardId2, kNoMaxSize, 1, true, emptyTagSet, emptyShardVersion), 1}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT(migrations.empty()); } @@ -474,7 +477,7 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeOnlyBalanceToNonMaxed) { {ShardStatistics(kShardId2, kNoMaxSize, 10, false, emptyTagSet, emptyShardVersion), 10}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -492,7 +495,7 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeWhenAllBalanced) { {ShardStatistics(kShardId2, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT(migrations.empty()); } @@ -507,7 +510,7 @@ TEST(BalancerPolicy, BalancerRespectsTagsWhenDraining) { ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "a"))); ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "b"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId1, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); @@ -526,7 +529,7 @@ TEST(BalancerPolicy, BalancerRespectsTagPolicyBeforeImbalance) { DistributionStatus distribution(kNamespace, cluster.second); ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 100), "a"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); @@ -546,7 +549,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsWithCrossShardViolationOfTags) { ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 1), "b"))); ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "a"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); @@ -564,7 +567,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedCluster) { DistributionStatus distribution(kNamespace, cluster.second); ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 10), "a"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); @@ -583,7 +586,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParalle DistributionStatus distribution(kNamespace, cluster.second); ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 20), "a"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); @@ -606,7 +609,7 @@ TEST(BalancerPolicy, BalancerHandlesNoShardsWithTag) { ASSERT_OK( distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "NonExistentZone"))); - ASSERT(balanceChunks(cluster.first, distribution, false).empty()); + ASSERT(balanceChunks(cluster.first, distribution, false, false).empty()); } TEST(DistributionStatus, AddTagRangeOverlap) { diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 4af124368e4..ae0997fa9de 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -170,7 +170,6 @@ Status MigrationManager::executeManualMigration( const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { _waitForRecovery(); - // Write a document to the config.migrations collection, in case this migration must be // recovered by the Balancer. Fail if the chunk is already moving. auto statusWithScopedMigrationRequest = @@ -455,7 +454,8 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( ChunkRange(migrateInfo.minKey, migrateInfo.maxKey), maxChunkSizeBytes, secondaryThrottle, - waitForDelete); + waitForDelete, + migrateInfo.forceJumbo); stdx::lock_guard<Latch> lock(_mutex); diff --git a/src/mongo/db/s/balancer/migration_manager_test.cpp b/src/mongo/db/s/balancer/migration_manager_test.cpp index 23f3b42d223..8af4d60993f 100644 --- a/src/mongo/db/s/balancer/migration_manager_test.cpp +++ b/src/mongo/db/s/balancer/migration_manager_test.cpp @@ -224,6 +224,7 @@ void MigrationManagerTest::setUpMigration(const ChunkType& chunk, const ShardId& builder.append(MigrationType::toShard(), toShard.toString()); builder.append(MigrationType::fromShard(), chunk.getShard().toString()); chunk.getVersion().appendWithField(&builder, "chunkVersion"); + builder.append(MigrationType::forceJumbo(), "doNotForceJumbo"); MigrationType migrationType = assertGet(MigrationType::fromBSON(builder.obj())); ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), @@ -311,7 +312,9 @@ TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) { setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([this, migrationRequests] { ThreadClient tc("Test", getGlobalServiceContext()); @@ -370,10 +373,11 @@ TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) { setUpChunk(collName2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2); // Going to request that these four chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1coll1}, - {kShardId3, chunk2coll1}, - {kShardId1, chunk1coll2}, - {kShardId3, chunk2coll2}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1coll1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2coll1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId1, chunk1coll2, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2coll2, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([this, migrationRequests] { ThreadClient tc("Test", getGlobalServiceContext()); @@ -427,7 +431,9 @@ TEST_F(MigrationManagerTest, SourceShardNotFound) { setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([this, chunk1, chunk2, migrationRequests] { ThreadClient tc("Test", getGlobalServiceContext()); @@ -474,7 +480,8 @@ TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) { setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); // Going to request that this chunk gets migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([this, chunk1, migrationRequests] { ThreadClient tc("Test", getGlobalServiceContext()); @@ -525,7 +532,11 @@ TEST_F(MigrationManagerTest, InterruptMigration) { ASSERT_EQ(ErrorCodes::BalancerInterrupted, _migrationManager->executeManualMigration( - opCtx.get(), {kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); + opCtx.get(), + {kShardId1, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce}, + 0, + kDefaultSecondaryThrottle, + false)); }); // Wait till the move chunk request gets sent and pretend that it is stuck by never responding @@ -549,7 +560,11 @@ TEST_F(MigrationManagerTest, InterruptMigration) { // Ensure that no new migrations can be scheduled ASSERT_EQ(ErrorCodes::BalancerInterrupted, _migrationManager->executeManualMigration( - operationContext(), {kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); + operationContext(), + {kShardId1, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce}, + 0, + kDefaultSecondaryThrottle, + false)); // Ensure that the migration manager is no longer handling any migrations. _migrationManager->drainActiveMigrations(); @@ -613,7 +628,11 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) { shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); ASSERT_OK(_migrationManager->executeManualMigration( - opCtx.get(), {kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false)); + opCtx.get(), + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + 0, + kDefaultSecondaryThrottle, + false)); }); // Expect only one moveChunk command to be called. @@ -760,7 +779,9 @@ TEST_F(MigrationManagerTest, RemoteCallErrorConversionToOperationFailed) { setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([&] { ThreadClient tc("Test", getGlobalServiceContext()); diff --git a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp b/src/mongo/db/s/balancer/scoped_migration_request_test.cpp index fd380f9c613..e30c73a1a1e 100644 --- a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp +++ b/src/mongo/db/s/balancer/scoped_migration_request_test.cpp @@ -113,7 +113,7 @@ MigrateInfo makeMigrateInfo() { ChunkType chunkType = assertGet(ChunkType::parseFromConfigBSONCommand(chunkBuilder.obj())); ASSERT_OK(chunkType.validate()); - return MigrateInfo(kToShard, chunkType); + return MigrateInfo(kToShard, chunkType, MoveChunkRequest::ForceJumbo::kDoNotForce); } TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) { diff --git a/src/mongo/db/s/balancer/type_migration.cpp b/src/mongo/db/s/balancer/type_migration.cpp index a3d47608604..4bedb26300c 100644 --- a/src/mongo/db/s/balancer/type_migration.cpp +++ b/src/mongo/db/s/balancer/type_migration.cpp @@ -50,6 +50,7 @@ const BSONField<BSONObj> MigrationType::max("max"); const BSONField<std::string> MigrationType::fromShard("fromShard"); const BSONField<std::string> MigrationType::toShard("toShard"); const BSONField<bool> MigrationType::waitForDelete("waitForDelete"); +const BSONField<std::string> MigrationType::forceJumbo("forceJumbo"); MigrationType::MigrationType() = default; @@ -60,7 +61,8 @@ MigrationType::MigrationType(MigrateInfo info, bool waitForDelete) _fromShard(info.from), _toShard(info.to), _chunkVersion(info.version), - _waitForDelete(waitForDelete) {} + _waitForDelete(waitForDelete), + _forceJumbo(MoveChunkRequest::forceJumboToString(info.forceJumbo)) {} StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) { MigrationType migrationType; @@ -115,6 +117,21 @@ StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) { migrationType._waitForDelete = waitForDeleteVal; } + { + std::string forceJumboVal; + Status status = bsonExtractStringField(source, forceJumbo.name(), &forceJumboVal); + if (!status.isOK()) + return status; + + auto forceJumbo = MoveChunkRequest::parseForceJumbo(forceJumboVal); + if (forceJumbo != MoveChunkRequest::ForceJumbo::kDoNotForce && + forceJumbo != MoveChunkRequest::ForceJumbo::kForceManual && + forceJumbo != MoveChunkRequest::ForceJumbo::kForceBalancer) { + return Status{ErrorCodes::BadValue, "Unknown value for forceJumbo"}; + } + migrationType._forceJumbo = std::move(forceJumboVal); + } + return migrationType; } @@ -132,6 +149,7 @@ BSONObj MigrationType::toBSON() const { _chunkVersion.appendWithField(&builder, kChunkVersion); builder.append(waitForDelete.name(), _waitForDelete); + builder.append(forceJumbo.name(), _forceJumbo); return builder.obj(); } @@ -143,7 +161,7 @@ MigrateInfo MigrationType::toMigrateInfo() const { chunk.setMax(_max); chunk.setVersion(_chunkVersion); - return MigrateInfo(_toShard, chunk); + return MigrateInfo(_toShard, chunk, MoveChunkRequest::parseForceJumbo(_forceJumbo)); } } // namespace mongo diff --git a/src/mongo/db/s/balancer/type_migration.h b/src/mongo/db/s/balancer/type_migration.h index 23fc1cf85a8..9f383244f8d 100644 --- a/src/mongo/db/s/balancer/type_migration.h +++ b/src/mongo/db/s/balancer/type_migration.h @@ -54,6 +54,7 @@ public: static const BSONField<std::string> fromShard; static const BSONField<std::string> toShard; static const BSONField<bool> waitForDelete; + static const BSONField<std::string> forceJumbo; /** * The Balancer encapsulates migration information in MigrateInfo objects, so this facilitates @@ -85,6 +86,10 @@ public: return _waitForDelete; } + MoveChunkRequest::ForceJumbo getForceJumbo() const { + return MoveChunkRequest::parseForceJumbo(_forceJumbo); + } + private: MigrationType(); @@ -96,6 +101,7 @@ private: ShardId _toShard; ChunkVersion _chunkVersion; bool _waitForDelete{false}; + std::string _forceJumbo{0}; }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/type_migration_test.cpp b/src/mongo/db/s/balancer/type_migration_test.cpp index 3059480cfba..a1d2e9193b9 100644 --- a/src/mongo/db/s/balancer/type_migration_test.cpp +++ b/src/mongo/db/s/balancer/type_migration_test.cpp @@ -61,7 +61,7 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) { ChunkType chunkType = assertGet(ChunkType::fromConfigBSON(chunkBuilder.obj())); ASSERT_OK(chunkType.validate()); - MigrateInfo migrateInfo(kToShard, chunkType); + MigrateInfo migrateInfo(kToShard, chunkType, MoveChunkRequest::ForceJumbo::kDoNotForce); MigrationType migrationType(migrateInfo, kWaitForDelete); BSONObjBuilder builder; @@ -72,6 +72,8 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) { builder.append(MigrationType::toShard(), kToShard.toString()); version.appendWithField(&builder, "chunkVersion"); builder.append(MigrationType::waitForDelete(), kWaitForDelete); + builder.append(MigrationType::forceJumbo(), + MoveChunkRequest::forceJumboToString(MoveChunkRequest::ForceJumbo::kDoNotForce)); BSONObj obj = builder.obj(); @@ -89,6 +91,8 @@ TEST(MigrationTypeTest, FromAndToBSON) { builder.append(MigrationType::toShard(), kToShard.toString()); version.appendWithField(&builder, "chunkVersion"); builder.append(MigrationType::waitForDelete(), kWaitForDelete); + builder.append(MigrationType::forceJumbo(), + MoveChunkRequest::forceJumboToString(MoveChunkRequest::ForceJumbo::kDoNotForce)); BSONObj obj = builder.obj(); |