diff options
author | Janna Golden <janna.golden@mongodb.com> | 2019-11-05 15:52:39 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-05 15:52:39 +0000 |
commit | c150b588cb4400e0324becd916de2a699988af99 (patch) | |
tree | b007619ce25c62cad0e275d01a8f57be0fa36d57 /src/mongo/db/s | |
parent | da5b6eff05c710029994ae1e06b47d1974974f6c (diff) | |
download | mongo-c150b588cb4400e0324becd916de2a699988af99.tar.gz |
SERVER-42273 Introduce 'force' option to 'moveChunk' to allow migrating jumbo chunks
Diffstat (limited to 'src/mongo/db/s')
17 files changed, 449 insertions, 179 deletions
diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp index 0af9cf3aeba..31c3262724d 100644 --- a/src/mongo/db/s/active_migrations_registry_test.cpp +++ b/src/mongo/db/s/active_migrations_registry_test.cpp @@ -61,7 +61,8 @@ MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) { ChunkRange(BSON("Key" << -100), BSON("Key" << 100)), 1024, MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff), - true); + true, + MoveChunkRequest::ForceJumbo::kDoNotForce); return assertGet(MoveChunkRequest::createFromCommand(nss, builder.obj())); } diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 0f1f30a89dc..67bcb23220f 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -273,14 +273,22 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx, const ShardId& newShardId, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete) { + bool waitForDelete, + bool forceJumbo) { auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(opCtx, chunk, newShardId); if (!moveAllowedStatus.isOK()) { return moveAllowedStatus; } return _migrationManager.executeManualMigration( - opCtx, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete); + opCtx, + MigrateInfo(newShardId, + chunk, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceManual + : MoveChunkRequest::ForceJumbo::kDoNotForce), + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete); } void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) { @@ -603,7 +611,13 @@ int Balancer::_moveChunks(OperationContext* opCtx, }); invariant(requestIt != candidateChunks.end()); - if (status == ErrorCodes::ChunkTooBig) { + // ChunkTooBig is returned by the source shard during the cloning phase if the migration + // manager finds that the chunk is larger than some calculated size, the source shard is + // *not* in draining mode, and the 'forceJumbo' balancer setting is 'kDoNotForce'. + // ExceededMemoryLimit is returned when the transfer mods queue surpasses 500MB regardless + // of whether the source shard is in draining mode or the value if the 'froceJumbo' balancer + // setting. + if (status == ErrorCodes::ChunkTooBig || status == ErrorCodes::ExceededMemoryLimit) { numChunksProcessed++; log() << "Performing a split because migration " << redact(requestIt->toString()) diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index 1df09000920..b4d8c9e02a0 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -136,7 +136,8 @@ public: const ShardId& newShardId, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete); + bool waitForDelete, + bool forceJumbo); /** * Appends the runtime state of the balancer instance to the specified builder. diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index a8f7cde67f4..4e24a95ea91 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -39,6 +39,7 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj_comparator_interface.h" +#include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_tags.h" @@ -468,7 +469,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi } } - return BalancerPolicy::balance(shardStats, distribution, usedShards); + return BalancerPolicy::balance( + shardStats, + distribution, + usedShards, + Grid::get(opCtx)->getBalancerConfiguration()->attemptToBalanceJumboChunks()); } } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp index 6b3fb9d78ba..894c40dea3d 100644 --- a/src/mongo/db/s/balancer/balancer_policy.cpp +++ b/src/mongo/db/s/balancer/balancer_policy.cpp @@ -347,12 +347,15 @@ MigrateInfo chooseRandomMigration(const ShardStatisticsVector& shardStats, const auto& chunks = distribution.getChunks(sourceShardId); - return {destShardId, chunks[getRandomIndex(chunks.size())]}; + return {destShardId, + chunks[getRandomIndex(chunks.size())], + MoveChunkRequest::ForceJumbo::kDoNotForce}; } vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, - std::set<ShardId>* usedShards) { + std::set<ShardId>* usedShards, + bool forceJumbo) { vector<MigrateInfo> migrations; if (MONGO_unlikely(balancerShouldReturnRandomMigrations.shouldFail()) && @@ -405,7 +408,7 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt } invariant(to != stat.shardId); - migrations.emplace_back(to, chunk); + migrations.emplace_back(to, chunk, MoveChunkRequest::ForceJumbo::kForceBalancer); invariant(usedShards->insert(stat.shardId).second); invariant(usedShards->insert(to).second); break; @@ -452,7 +455,10 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt } invariant(to != stat.shardId); - migrations.emplace_back(to, chunk); + migrations.emplace_back(to, + chunk, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer + : MoveChunkRequest::ForceJumbo::kDoNotForce); invariant(usedShards->insert(stat.shardId).second); invariant(usedShards->insert(to).second); break; @@ -498,7 +504,9 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt tag, idealNumberOfChunksPerShardForTag, &migrations, - usedShards)) + usedShards, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer + : MoveChunkRequest::ForceJumbo::kDoNotForce)) ; } @@ -517,7 +525,7 @@ boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk( return boost::optional<MigrateInfo>(); } - return MigrateInfo(newShardId, chunk); + return MigrateInfo(newShardId, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce); } bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, @@ -525,7 +533,8 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, const string& tag, size_t idealNumberOfChunksPerShardForTag, vector<MigrateInfo>* migrations, - set<ShardId>* usedShards) { + set<ShardId>* usedShards, + MoveChunkRequest::ForceJumbo forceJumbo) { const ShardId from = _getMostOverloadedShard(shardStats, distribution, tag, *usedShards); if (!from.isValid()) return false; @@ -576,7 +585,7 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, continue; } - migrations->emplace_back(to, chunk); + migrations->emplace_back(to, chunk, forceJumbo); invariant(usedShards->insert(chunk.getShard()).second); invariant(usedShards->insert(to).second); return true; @@ -598,7 +607,9 @@ string ZoneRange::toString() const { return str::stream() << min << " -->> " << max << " on " << zone; } -MigrateInfo::MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk) { +MigrateInfo::MigrateInfo(const ShardId& a_to, + const ChunkType& a_chunk, + const MoveChunkRequest::ForceJumbo a_forceJumbo) { invariant(a_chunk.validate()); invariant(a_to.isValid()); @@ -609,6 +620,7 @@ MigrateInfo::MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk) { minKey = a_chunk.getMin(); maxKey = a_chunk.getMax(); version = a_chunk.getVersion(); + forceJumbo = a_forceJumbo; } std::string MigrateInfo::getName() const { diff --git a/src/mongo/db/s/balancer/balancer_policy.h b/src/mongo/db/s/balancer/balancer_policy.h index 326fd7985a2..c75c5ceda2d 100644 --- a/src/mongo/db/s/balancer/balancer_policy.h +++ b/src/mongo/db/s/balancer/balancer_policy.h @@ -37,6 +37,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/s/balancer/cluster_statistics.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/request_types/move_chunk_request.h" #include "mongo/s/shard_id.h" namespace mongo { @@ -52,7 +53,9 @@ struct ZoneRange { }; struct MigrateInfo { - MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk); + MigrateInfo(const ShardId& a_to, + const ChunkType& a_chunk, + MoveChunkRequest::ForceJumbo a_forceJumbo); std::string getName() const; @@ -66,6 +69,7 @@ struct MigrateInfo { BSONObj minKey; BSONObj maxKey; ChunkVersion version; + MoveChunkRequest::ForceJumbo forceJumbo; }; typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector; @@ -190,7 +194,8 @@ public: */ static std::vector<MigrateInfo> balance(const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, - std::set<ShardId>* usedShards); + std::set<ShardId>* usedShards, + bool forceJumbo); /** * Using the specified distribution information, returns a suggested better location for the @@ -236,7 +241,8 @@ private: const std::string& tag, size_t idealNumberOfChunksPerShardForTag, std::vector<MigrateInfo>* migrations, - std::set<ShardId>* usedShards); + std::set<ShardId>* usedShards, + MoveChunkRequest::ForceJumbo forceJumbo); }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_policy_test.cpp b/src/mongo/db/s/balancer/balancer_policy_test.cpp index 0c5e9fe5822..6ef9f2543a8 100644 --- a/src/mongo/db/s/balancer/balancer_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_policy_test.cpp @@ -113,9 +113,10 @@ std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster( std::vector<MigrateInfo> balanceChunks(const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, - bool shouldAggressivelyBalance) { + bool shouldAggressivelyBalance, + bool forceJumbo) { std::set<ShardId> usedShards; - return BalancerPolicy::balance(shardStats, distribution, &usedShards); + return BalancerPolicy::balance(shardStats, distribution, &usedShards, forceJumbo); } TEST(BalancerPolicy, Basic) { @@ -125,7 +126,7 @@ TEST(BalancerPolicy, Basic) { {ShardStatistics(kShardId2, kNoMaxSize, 3, false, emptyTagSet, emptyShardVersion), 3}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -140,7 +141,7 @@ TEST(BalancerPolicy, SmallClusterShouldBePerfectlyBalanced) { {ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId1, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); @@ -153,10 +154,11 @@ TEST(BalancerPolicy, SingleChunkShouldNotMove) { {{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}, {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); - ASSERT( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true).empty()); - ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false) + ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true, false) .empty()); + ASSERT( + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false) + .empty()); } TEST(BalancerPolicy, BalanceThresholdObeyed) { @@ -166,10 +168,11 @@ TEST(BalancerPolicy, BalanceThresholdObeyed) { {ShardStatistics(kShardId2, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}, {ShardStatistics(kShardId3, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}}); - ASSERT( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true).empty()); - ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false) + ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true, false) .empty()); + ASSERT( + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false) + .empty()); } TEST(BalancerPolicy, ParallelBalancing) { @@ -180,7 +183,7 @@ TEST(BalancerPolicy, ParallelBalancing) { {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -204,7 +207,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotPutChunksOnShardsAboveTheOptimal) { {ShardStatistics(kShardId5, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -226,7 +229,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotMoveChunksFromShardsBelowOptimal) { {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -245,7 +248,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNe // Here kShardId0 would have been selected as a donor std::set<ShardId> usedShards{kShardId0}; const auto migrations(BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards)); + cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId1, migrations[0].from); @@ -264,7 +267,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNo // Here kShardId0 would have been selected as a donor std::set<ShardId> usedShards{kShardId0}; const auto migrations(BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards)); + cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false)); ASSERT_EQ(0U, migrations.size()); } @@ -278,7 +281,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) { // Here kShardId2 would have been selected as a recipient std::set<ShardId> usedShards{kShardId2}; const auto migrations(BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards)); + cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -298,7 +301,7 @@ TEST(BalancerPolicy, JumboChunksNotMoved) { cluster.second[kShardId0][3].setJumbo(true); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -324,7 +327,7 @@ TEST(BalancerPolicy, JumboChunksNotMovedParallel) { cluster.second[kShardId2][3].setJumbo(true); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -345,7 +348,7 @@ TEST(BalancerPolicy, DrainingSingleChunk) { {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -362,7 +365,7 @@ TEST(BalancerPolicy, DrainingSingleChunkPerShard) { {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); @@ -383,7 +386,7 @@ TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) { {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -400,7 +403,7 @@ TEST(BalancerPolicy, DrainingMultipleShardsFirstOneSelected) { {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 16}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); @@ -416,7 +419,7 @@ TEST(BalancerPolicy, DrainingMultipleShardsWontAcceptChunks) { {ShardStatistics(kShardId2, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT(migrations.empty()); } @@ -430,7 +433,7 @@ TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) { ASSERT_OK(distribution.addRangeToZone(ZoneRange( cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "LAX"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -448,7 +451,7 @@ TEST(BalancerPolicy, DrainingNoAppropriateShardsFoundDueToTag) { ASSERT_OK(distribution.addRangeToZone(ZoneRange( cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "SEA"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT(migrations.empty()); } @@ -460,7 +463,7 @@ TEST(BalancerPolicy, NoBalancingDueToAllNodesEitherDrainingOrMaxedOut) { {ShardStatistics(kShardId2, kNoMaxSize, 1, true, emptyTagSet, emptyShardVersion), 1}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT(migrations.empty()); } @@ -474,7 +477,7 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeOnlyBalanceToNonMaxed) { {ShardStatistics(kShardId2, kNoMaxSize, 10, false, emptyTagSet, emptyShardVersion), 10}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); @@ -492,7 +495,7 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeWhenAllBalanced) { {ShardStatistics(kShardId2, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}}); const auto migrations( - balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)); ASSERT(migrations.empty()); } @@ -507,7 +510,7 @@ TEST(BalancerPolicy, BalancerRespectsTagsWhenDraining) { ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "a"))); ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "b"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId1, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); @@ -526,7 +529,7 @@ TEST(BalancerPolicy, BalancerRespectsTagPolicyBeforeImbalance) { DistributionStatus distribution(kNamespace, cluster.second); ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 100), "a"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); @@ -546,7 +549,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsWithCrossShardViolationOfTags) { ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 1), "b"))); ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "a"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); @@ -564,7 +567,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedCluster) { DistributionStatus distribution(kNamespace, cluster.second); ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 10), "a"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); @@ -583,7 +586,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParalle DistributionStatus distribution(kNamespace, cluster.second); ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 20), "a"))); - const auto migrations(balanceChunks(cluster.first, distribution, false)); + const auto migrations(balanceChunks(cluster.first, distribution, false, false)); ASSERT_EQ(2U, migrations.size()); ASSERT_EQ(kShardId2, migrations[0].from); @@ -606,7 +609,7 @@ TEST(BalancerPolicy, BalancerHandlesNoShardsWithTag) { ASSERT_OK( distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "NonExistentZone"))); - ASSERT(balanceChunks(cluster.first, distribution, false).empty()); + ASSERT(balanceChunks(cluster.first, distribution, false, false).empty()); } TEST(DistributionStatus, AddTagRangeOverlap) { diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 4af124368e4..ae0997fa9de 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -170,7 +170,6 @@ Status MigrationManager::executeManualMigration( const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { _waitForRecovery(); - // Write a document to the config.migrations collection, in case this migration must be // recovered by the Balancer. Fail if the chunk is already moving. auto statusWithScopedMigrationRequest = @@ -455,7 +454,8 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( ChunkRange(migrateInfo.minKey, migrateInfo.maxKey), maxChunkSizeBytes, secondaryThrottle, - waitForDelete); + waitForDelete, + migrateInfo.forceJumbo); stdx::lock_guard<Latch> lock(_mutex); diff --git a/src/mongo/db/s/balancer/migration_manager_test.cpp b/src/mongo/db/s/balancer/migration_manager_test.cpp index 23f3b42d223..8af4d60993f 100644 --- a/src/mongo/db/s/balancer/migration_manager_test.cpp +++ b/src/mongo/db/s/balancer/migration_manager_test.cpp @@ -224,6 +224,7 @@ void MigrationManagerTest::setUpMigration(const ChunkType& chunk, const ShardId& builder.append(MigrationType::toShard(), toShard.toString()); builder.append(MigrationType::fromShard(), chunk.getShard().toString()); chunk.getVersion().appendWithField(&builder, "chunkVersion"); + builder.append(MigrationType::forceJumbo(), "doNotForceJumbo"); MigrationType migrationType = assertGet(MigrationType::fromBSON(builder.obj())); ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), @@ -311,7 +312,9 @@ TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) { setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([this, migrationRequests] { ThreadClient tc("Test", getGlobalServiceContext()); @@ -370,10 +373,11 @@ TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) { setUpChunk(collName2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2); // Going to request that these four chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1coll1}, - {kShardId3, chunk2coll1}, - {kShardId1, chunk1coll2}, - {kShardId3, chunk2coll2}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1coll1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2coll1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId1, chunk1coll2, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2coll2, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([this, migrationRequests] { ThreadClient tc("Test", getGlobalServiceContext()); @@ -427,7 +431,9 @@ TEST_F(MigrationManagerTest, SourceShardNotFound) { setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([this, chunk1, chunk2, migrationRequests] { ThreadClient tc("Test", getGlobalServiceContext()); @@ -474,7 +480,8 @@ TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) { setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); // Going to request that this chunk gets migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([this, chunk1, migrationRequests] { ThreadClient tc("Test", getGlobalServiceContext()); @@ -525,7 +532,11 @@ TEST_F(MigrationManagerTest, InterruptMigration) { ASSERT_EQ(ErrorCodes::BalancerInterrupted, _migrationManager->executeManualMigration( - opCtx.get(), {kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); + opCtx.get(), + {kShardId1, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce}, + 0, + kDefaultSecondaryThrottle, + false)); }); // Wait till the move chunk request gets sent and pretend that it is stuck by never responding @@ -549,7 +560,11 @@ TEST_F(MigrationManagerTest, InterruptMigration) { // Ensure that no new migrations can be scheduled ASSERT_EQ(ErrorCodes::BalancerInterrupted, _migrationManager->executeManualMigration( - operationContext(), {kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); + operationContext(), + {kShardId1, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce}, + 0, + kDefaultSecondaryThrottle, + false)); // Ensure that the migration manager is no longer handling any migrations. _migrationManager->drainActiveMigrations(); @@ -613,7 +628,11 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) { shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); ASSERT_OK(_migrationManager->executeManualMigration( - opCtx.get(), {kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false)); + opCtx.get(), + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + 0, + kDefaultSecondaryThrottle, + false)); }); // Expect only one moveChunk command to be called. @@ -760,7 +779,9 @@ TEST_F(MigrationManagerTest, RemoteCallErrorConversionToOperationFailed) { setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}}; + const std::vector<MigrateInfo> migrationRequests{ + {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}, + {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}}; auto future = launchAsync([&] { ThreadClient tc("Test", getGlobalServiceContext()); diff --git a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp b/src/mongo/db/s/balancer/scoped_migration_request_test.cpp index fd380f9c613..e30c73a1a1e 100644 --- a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp +++ b/src/mongo/db/s/balancer/scoped_migration_request_test.cpp @@ -113,7 +113,7 @@ MigrateInfo makeMigrateInfo() { ChunkType chunkType = assertGet(ChunkType::parseFromConfigBSONCommand(chunkBuilder.obj())); ASSERT_OK(chunkType.validate()); - return MigrateInfo(kToShard, chunkType); + return MigrateInfo(kToShard, chunkType, MoveChunkRequest::ForceJumbo::kDoNotForce); } TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) { diff --git a/src/mongo/db/s/balancer/type_migration.cpp b/src/mongo/db/s/balancer/type_migration.cpp index a3d47608604..4bedb26300c 100644 --- a/src/mongo/db/s/balancer/type_migration.cpp +++ b/src/mongo/db/s/balancer/type_migration.cpp @@ -50,6 +50,7 @@ const BSONField<BSONObj> MigrationType::max("max"); const BSONField<std::string> MigrationType::fromShard("fromShard"); const BSONField<std::string> MigrationType::toShard("toShard"); const BSONField<bool> MigrationType::waitForDelete("waitForDelete"); +const BSONField<std::string> MigrationType::forceJumbo("forceJumbo"); MigrationType::MigrationType() = default; @@ -60,7 +61,8 @@ MigrationType::MigrationType(MigrateInfo info, bool waitForDelete) _fromShard(info.from), _toShard(info.to), _chunkVersion(info.version), - _waitForDelete(waitForDelete) {} + _waitForDelete(waitForDelete), + _forceJumbo(MoveChunkRequest::forceJumboToString(info.forceJumbo)) {} StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) { MigrationType migrationType; @@ -115,6 +117,21 @@ StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) { migrationType._waitForDelete = waitForDeleteVal; } + { + std::string forceJumboVal; + Status status = bsonExtractStringField(source, forceJumbo.name(), &forceJumboVal); + if (!status.isOK()) + return status; + + auto forceJumbo = MoveChunkRequest::parseForceJumbo(forceJumboVal); + if (forceJumbo != MoveChunkRequest::ForceJumbo::kDoNotForce && + forceJumbo != MoveChunkRequest::ForceJumbo::kForceManual && + forceJumbo != MoveChunkRequest::ForceJumbo::kForceBalancer) { + return Status{ErrorCodes::BadValue, "Unknown value for forceJumbo"}; + } + migrationType._forceJumbo = std::move(forceJumboVal); + } + return migrationType; } @@ -132,6 +149,7 @@ BSONObj MigrationType::toBSON() const { _chunkVersion.appendWithField(&builder, kChunkVersion); builder.append(waitForDelete.name(), _waitForDelete); + builder.append(forceJumbo.name(), _forceJumbo); return builder.obj(); } @@ -143,7 +161,7 @@ MigrateInfo MigrationType::toMigrateInfo() const { chunk.setMax(_max); chunk.setVersion(_chunkVersion); - return MigrateInfo(_toShard, chunk); + return MigrateInfo(_toShard, chunk, MoveChunkRequest::parseForceJumbo(_forceJumbo)); } } // namespace mongo diff --git a/src/mongo/db/s/balancer/type_migration.h b/src/mongo/db/s/balancer/type_migration.h index 23fc1cf85a8..9f383244f8d 100644 --- a/src/mongo/db/s/balancer/type_migration.h +++ b/src/mongo/db/s/balancer/type_migration.h @@ -54,6 +54,7 @@ public: static const BSONField<std::string> fromShard; static const BSONField<std::string> toShard; static const BSONField<bool> waitForDelete; + static const BSONField<std::string> forceJumbo; /** * The Balancer encapsulates migration information in MigrateInfo objects, so this facilitates @@ -85,6 +86,10 @@ public: return _waitForDelete; } + MoveChunkRequest::ForceJumbo getForceJumbo() const { + return MoveChunkRequest::parseForceJumbo(_forceJumbo); + } + private: MigrationType(); @@ -96,6 +101,7 @@ private: ShardId _toShard; ChunkVersion _chunkVersion; bool _waitForDelete{false}; + std::string _forceJumbo{0}; }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/type_migration_test.cpp b/src/mongo/db/s/balancer/type_migration_test.cpp index 3059480cfba..a1d2e9193b9 100644 --- a/src/mongo/db/s/balancer/type_migration_test.cpp +++ b/src/mongo/db/s/balancer/type_migration_test.cpp @@ -61,7 +61,7 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) { ChunkType chunkType = assertGet(ChunkType::fromConfigBSON(chunkBuilder.obj())); ASSERT_OK(chunkType.validate()); - MigrateInfo migrateInfo(kToShard, chunkType); + MigrateInfo migrateInfo(kToShard, chunkType, MoveChunkRequest::ForceJumbo::kDoNotForce); MigrationType migrationType(migrateInfo, kWaitForDelete); BSONObjBuilder builder; @@ -72,6 +72,8 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) { builder.append(MigrationType::toShard(), kToShard.toString()); version.appendWithField(&builder, "chunkVersion"); builder.append(MigrationType::waitForDelete(), kWaitForDelete); + builder.append(MigrationType::forceJumbo(), + MoveChunkRequest::forceJumboToString(MoveChunkRequest::ForceJumbo::kDoNotForce)); BSONObj obj = builder.obj(); @@ -89,6 +91,8 @@ TEST(MigrationTypeTest, FromAndToBSON) { builder.append(MigrationType::toShard(), kToShard.toString()); version.appendWithField(&builder, "chunkVersion"); builder.append(MigrationType::waitForDelete(), kWaitForDelete); + builder.append(MigrationType::forceJumbo(), + MoveChunkRequest::forceJumboToString(MoveChunkRequest::ForceJumbo::kDoNotForce)); BSONObj obj = builder.obj(); diff --git a/src/mongo/db/s/config/configsvr_move_chunk_command.cpp b/src/mongo/db/s/config/configsvr_move_chunk_command.cpp index 9ed7f4f292d..ab8c8e752ec 100644 --- a/src/mongo/db/s/config/configsvr_move_chunk_command.cpp +++ b/src/mongo/db/s/config/configsvr_move_chunk_command.cpp @@ -100,7 +100,8 @@ public: request.getToShardId(), request.getMaxChunkSizeBytes(), request.getSecondaryThrottle(), - request.getWaitForDelete())); + request.getWaitForDelete(), + request.getForceJumbo())); } else { uassertStatusOK(Balancer::get(opCtx)->rebalanceSingleChunk(opCtx, request.getChunk())); } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 6a5ce99c1c7..6162fc725ca 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -40,7 +40,6 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_descriptor.h" -#include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/s/collection_sharding_runtime.h" @@ -69,6 +68,9 @@ const char kRecvChunkCommit[] = "_recvChunkCommit"; const char kRecvChunkAbort[] = "_recvChunkAbort"; const int kMaxObjectPerChunk{250000}; +const Hours kMaxWaitToCommitCloneForJumboChunk(6); + +MONGO_FAIL_POINT_DEFINE(failTooMuchMemoryUsed); bool isInRange(const BSONObj& obj, const BSONObj& min, @@ -227,7 +229,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(), _args.getToShardId().toString())), _donorConnStr(std::move(donorConnStr)), - _recipientHost(std::move(recipientHost)) {} + _recipientHost(std::move(recipientHost)), + _forceJumbo(_args.getForceJumbo() != MoveChunkRequest::ForceJumbo::kDoNotForce) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -263,7 +266,10 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { PrepareConflictBehavior::kIgnoreConflicts); auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx); - if (!storeCurrentLocsStatus.isOK()) { + if (storeCurrentLocsStatus == ErrorCodes::ChunkTooBig && _forceJumbo) { + stdx::lock_guard<Latch> sl(_mutex); + _jumboChunkCloneState.emplace(); + } else if (!storeCurrentLocsStatus.isOK()) { return storeCurrentLocsStatus; } } @@ -302,89 +308,30 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( OperationContext* opCtx, Milliseconds maxTimeToWait) { invariant(_state == kCloning); invariant(!opCtx->lockState()->isLocked()); - - const auto startTime = Date_t::now(); - - int iteration = 0; - while ((Date_t::now() - startTime) < maxTimeToWait) { - auto responseStatus = _callRecipient( - createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true)); - if (!responseStatus.isOK()) { - return responseStatus.getStatus().withContext( - "Failed to contact recipient shard to monitor data transfer"); - } - - const BSONObj& res = responseStatus.getValue(); - - if (!res["waited"].boolean()) { - sleepmillis(1LL << std::min(iteration, 10)); - } - iteration++; - - stdx::lock_guard<Latch> sl(_mutex); - - const std::size_t cloneLocsRemaining = _cloneLocs.size(); - - log() << "moveChunk data transfer progress: " << redact(res) << " mem used: " << _memoryUsed - << " documents remaining to clone: " << cloneLocsRemaining; - - if (res["state"].String() == "steady") { - if (cloneLocsRemaining != 0) { - return {ErrorCodes::OperationIncomplete, - str::stream() << "Unable to enter critical section because the recipient " - "shard thinks all data is cloned while there are still " - << cloneLocsRemaining << " documents remaining"}; - } - - return Status::OK(); - } - - if (res["state"].String() == "fail") { - return {ErrorCodes::OperationFailed, - str::stream() << "Data transfer error: " << res["errmsg"].str()}; - } - - auto migrationSessionIdStatus = MigrationSessionId::extractFromBSON(res); - if (!migrationSessionIdStatus.isOK()) { - return {ErrorCodes::OperationIncomplete, - str::stream() << "Unable to retrieve the id of the migration session due to " - << migrationSessionIdStatus.getStatus().toString()}; - } - - if (res["ns"].str() != _args.getNss().ns() || - (res.hasField("fromShardId") - ? (res["fromShardId"].str() != _args.getFromShardId().toString()) - : (res["from"].str() != _donorConnStr.toString())) || - !res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 || - !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 || - !_sessionId.matches(migrationSessionIdStatus.getValue())) { - // This can happen when the destination aborted the migration and received another - // recvChunk before this thread sees the transition to the abort state. This is - // currently possible only if multiple migrations are happening at once. This is an - // unfortunate consequence of the shards not being able to keep track of multiple - // incoming and outgoing migrations. - return {ErrorCodes::OperationIncomplete, - "Destination shard aborted migration because a new one is running"}; - } - - if (_memoryUsed > 500 * 1024 * 1024) { - // This is too much memory for us to use so we're going to abort the migration - return {ErrorCodes::ExceededMemoryLimit, - "Aborting migration because of high memory usage"}; - } - - Status interruptStatus = opCtx->checkForInterruptNoAssert(); - if (!interruptStatus.isOK()) { - return interruptStatus; - } + // If this migration is manual migration that specified "force", enter the critical section + // immediately. This means the entire cloning phase will be done under the critical section. + if (_jumboChunkCloneState && + _args.getForceJumbo() == MoveChunkRequest::ForceJumbo::kForceManual) { + return Status::OK(); } - return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"}; + return _checkRecipientCloningStatus(opCtx, maxTimeToWait); } StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { invariant(_state == kCloning); invariant(!opCtx->lockState()->isLocked()); + if (_jumboChunkCloneState && _forceJumbo) { + if (_args.getForceJumbo() == MoveChunkRequest::ForceJumbo::kForceManual) { + auto status = _checkRecipientCloningStatus(opCtx, kMaxWaitToCommitCloneForJumboChunk); + if (!status.isOK()) { + return status; + } + } else { + invariant(PlanExecutor::IS_EOF == _jumboChunkCloneState->clonerState); + invariant(_cloneLocs.empty()); + } + } if (_sessionCatalogSource) { _sessionCatalogSource->onCommitCloneStarted(); @@ -605,18 +552,78 @@ void MigrationChunkClonerSourceLegacy::_decrementOutstandingOperationTrackReques } } -uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { - stdx::lock_guard<Latch> sl(_mutex); +void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationContext* opCtx, + Collection* collection, + BSONArrayBuilder* arrBuilder) { + ElapsedTracker tracker(opCtx->getServiceContext()->getFastClockSource(), + internalQueryExecYieldIterations.load(), + Milliseconds(internalQueryExecYieldPeriodMS.load())); - return std::min(static_cast<uint64_t>(BSONObjMaxUserSize), - _averageObjectSizeForCloneLocs * _cloneLocs.size()); -} + if (!_jumboChunkCloneState->clonerExec) { + auto exec = uassertStatusOK(_getIndexScanExecutor(opCtx, collection)); + _jumboChunkCloneState->clonerExec = std::move(exec); + } else { + _jumboChunkCloneState->clonerExec->reattachToOperationContext(opCtx); + _jumboChunkCloneState->clonerExec->restoreState(); + } -Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, - Collection* collection, - BSONArrayBuilder* arrBuilder) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS)); + BSONObj obj; + RecordId recordId; + PlanExecutor::ExecState execState; + + while (PlanExecutor::ADVANCED == + (execState = _jumboChunkCloneState->clonerExec->getNext( + &obj, _jumboChunkCloneState->stashedRecordId ? nullptr : &recordId))) { + + stdx::unique_lock<Latch> lk(_mutex); + _jumboChunkCloneState->clonerState = execState; + lk.unlock(); + + opCtx->checkForInterrupt(); + + // Use the builder size instead of accumulating the document sizes directly so + // that we take into consideration the overhead of BSONArray indices. + if (arrBuilder->arrSize() && + (arrBuilder->len() + obj.objsize() + 1024) > BSONObjMaxUserSize) { + _jumboChunkCloneState->clonerExec->enqueue(obj); + + // Stash the recordId we just read to add to the next batch. + if (!recordId.isNull()) { + invariant(!_jumboChunkCloneState->stashedRecordId); + _jumboChunkCloneState->stashedRecordId = std::move(recordId); + } + + break; + } + + Snapshotted<BSONObj> doc; + invariant(collection->findDoc( + opCtx, _jumboChunkCloneState->stashedRecordId.value_or(recordId), &doc)); + arrBuilder->append(doc.value()); + _jumboChunkCloneState->stashedRecordId = boost::none; + + lk.lock(); + _jumboChunkCloneState->docsCloned++; + lk.unlock(); + + ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); + } + + stdx::unique_lock<Latch> lk(_mutex); + _jumboChunkCloneState->clonerState = execState; + lk.unlock(); + + _jumboChunkCloneState->clonerExec->saveState(); + _jumboChunkCloneState->clonerExec->detachFromOperationContext(); + + if (PlanExecutor::FAILURE == execState) + uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(obj).withContext( + "Executor error while scanning for documents belonging to chunk")); +} +void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx, + Collection* collection, + BSONArrayBuilder* arrBuilder) { ElapsedTracker tracker(opCtx->getServiceContext()->getFastClockSource(), internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); @@ -625,8 +632,8 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, auto iter = _cloneLocs.begin(); for (; iter != _cloneLocs.end(); ++iter) { - // We must always make progress in this method by at least one document because empty return - // indicates there is no more initial clone data. + // We must always make progress in this method by at least one document because empty + // return indicates there is no more initial clone data. if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) { break; } @@ -641,6 +648,7 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, // that we take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && (arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) { + break; } @@ -652,7 +660,34 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, } _cloneLocs.erase(_cloneLocs.begin(), iter); +} + +uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { + stdx::lock_guard<Latch> sl(_mutex); + if (_jumboChunkCloneState && _forceJumbo) + return static_cast<uint64_t>(BSONObjMaxUserSize); + + return std::min(static_cast<uint64_t>(BSONObjMaxUserSize), + _averageObjectSizeForCloneLocs * _cloneLocs.size()); +} + +Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, + Collection* collection, + BSONArrayBuilder* arrBuilder) { + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS)); + // If this chunk is too large to store records in _cloneLocs and the command args specify to + // attempt to move it, scan the collection directly. + if (_jumboChunkCloneState && _forceJumbo) { + try { + _nextCloneBatchFromIndexScan(opCtx, collection, arrBuilder); + return Status::OK(); + } catch (const DBException& ex) { + return ex.toStatus(); + } + } + + _nextCloneBatchFromCloneLocs(opCtx, collection, arrBuilder); return Status::OK(); } @@ -732,15 +767,9 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO return responseStatus.data.getOwned(); } -Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) { - AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); - - Collection* const collection = autoColl.getCollection(); - if (!collection) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "Collection " << _args.getNss().ns() << " does not exist."}; - } - +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> +MigrationChunkClonerSourceLegacy::_getIndexScanExecutor(OperationContext* opCtx, + Collection* const collection) { // Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any // multi-key index prefixed by shard key cannot be multikey over the shard key fields. const IndexDescriptor* idx = @@ -761,13 +790,29 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC // We can afford to yield here because any change to the base data that we might miss is already // being queued and will migrate in the 'transferMods' stage. - auto exec = InternalPlanner::indexScan(opCtx, - collection, - idx, - min, - max, - BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_AUTO); + return InternalPlanner::indexScan(opCtx, + collection, + idx, + min, + max, + BoundInclusion::kIncludeStartKeyOnly, + PlanExecutor::YIELD_AUTO); +} + +Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) { + AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); + + Collection* const collection = autoColl.getCollection(); + if (!collection) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "Collection " << _args.getNss().ns() << " does not exist."}; + } + + auto swExec = _getIndexScanExecutor(opCtx, collection); + if (!swExec.isOK()) { + return swExec.getStatus(); + } + auto exec = std::move(swExec.getValue()); // Use the average object size to estimate how many objects a full chunk would carry do that // while traversing the chunk's range using the sharding index, below there's a fair amount of @@ -812,8 +857,11 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC if (++recCount > maxRecsWhenFull) { isLargeChunk = true; - // Continue on despite knowing that it will fail, just to get the correct value for - // recCount + + if (_forceJumbo) { + _cloneLocs.clear(); + break; + } } } @@ -898,6 +946,96 @@ long long MigrationChunkClonerSourceLegacy::_xferUpdates(OperationContext* opCtx return totalSize; } +Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationContext* opCtx, + Milliseconds maxTimeToWait) { + const auto startTime = Date_t::now(); + int iteration = 0; + while ((Date_t::now() - startTime) < maxTimeToWait) { + auto responseStatus = _callRecipient( + createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true)); + if (!responseStatus.isOK()) { + return responseStatus.getStatus().withContext( + "Failed to contact recipient shard to monitor data transfer"); + } + + const BSONObj& res = responseStatus.getValue(); + if (!res["waited"].boolean()) { + sleepmillis(1LL << std::min(iteration, 10)); + } + iteration++; + + stdx::lock_guard<Latch> sl(_mutex); + + const std::size_t cloneLocsRemaining = _cloneLocs.size(); + + if (_forceJumbo && _jumboChunkCloneState) { + log() << "moveChunk data transfer progress: " << redact(res) + << " mem used: " << _memoryUsed + << " documents cloned so far: " << _jumboChunkCloneState->docsCloned; + } else { + log() << "moveChunk data transfer progress: " << redact(res) + << " mem used: " << _memoryUsed + << " documents remaining to clone: " << cloneLocsRemaining; + } + + if (res["state"].String() == "steady") { + if (cloneLocsRemaining != 0 || + (_jumboChunkCloneState && _forceJumbo && + PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) { + return {ErrorCodes::OperationIncomplete, + str::stream() << "Unable to enter critical section because the recipient " + "shard thinks all data is cloned while there are still " + "documents remaining"}; + } + + return Status::OK(); + } + + if (res["state"].String() == "fail") { + return {ErrorCodes::OperationFailed, + str::stream() << "Data transfer error: " << res["errmsg"].str()}; + } + + auto migrationSessionIdStatus = MigrationSessionId::extractFromBSON(res); + if (!migrationSessionIdStatus.isOK()) { + return {ErrorCodes::OperationIncomplete, + str::stream() << "Unable to retrieve the id of the migration session due to " + << migrationSessionIdStatus.getStatus().toString()}; + } + + if (res["ns"].str() != _args.getNss().ns() || + (res.hasField("fromShardId") + ? (res["fromShardId"].str() != _args.getFromShardId().toString()) + : (res["from"].str() != _donorConnStr.toString())) || + !res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 || + !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 || + !_sessionId.matches(migrationSessionIdStatus.getValue())) { + // This can happen when the destination aborted the migration and received another + // recvChunk before this thread sees the transition to the abort state. This is + // currently possible only if multiple migrations are happening at once. This is an + // unfortunate consequence of the shards not being able to keep track of multiple + // incoming and outgoing migrations. + return {ErrorCodes::OperationIncomplete, + "Destination shard aborted migration because a new one is running"}; + } + + if (_args.getForceJumbo() != MoveChunkRequest::ForceJumbo::kForceManual && + (_memoryUsed > 500 * 1024 * 1024 || + (_jumboChunkCloneState && MONGO_unlikely(failTooMuchMemoryUsed.shouldFail())))) { + // This is too much memory for us to use so we're going to abort the migration + return {ErrorCodes::ExceededMemoryLimit, + "Aborting migration because of high memory usage"}; + } + + Status interruptStatus = opCtx->checkForInterruptNoAssert(); + if (!interruptStatus.isOK()) { + return interruptStatus; + } + } + + return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"}; +} + boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( OperationContext* opCtx, BSONArrayBuilder* arrBuilder) { if (!_sessionCatalogSource) { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 8e5809093af..2653f401ef1 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -36,6 +36,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/client/connection_string.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/optime.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_session_id.h" @@ -220,6 +221,17 @@ private: */ StatusWith<BSONObj> _callRecipient(const BSONObj& cmdObj); + StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getIndexScanExecutor( + OperationContext* opCtx, Collection* const collection); + + void _nextCloneBatchFromIndexScan(OperationContext* opCtx, + Collection* collection, + BSONArrayBuilder* arrBuilder); + + void _nextCloneBatchFromCloneLocs(OperationContext* opCtx, + Collection* collection, + BSONArrayBuilder* arrBuilder); + /** * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs (to avoid * seeking disk later). @@ -307,6 +319,12 @@ private: std::list<BSONObj>* updateList, long long initialSize); + /** + * Sends _recvChunkStatus to the recipient shard until it receives 'steady' from the recipient, + * an error has occurred, or a timeout is hit. + */ + Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait); + // The original move chunk request const MoveChunkRequest _args; @@ -356,6 +374,27 @@ private: // Total bytes in _reload + _deleted (xfer mods) uint64_t _memoryUsed{0}; + + // False if the move chunk request specified ForceJumbo::kDoNotForce, true otherwise. + const bool _forceJumbo; + + struct JumboChunkCloneState { + // Plan executor for collection scan used to clone docs. + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> clonerExec; + + // The current state of 'clonerExec'. + PlanExecutor::ExecState clonerState; + + // RecordId of the last doc read in by 'clonerExec' if collection scan yields during + // cloning. + boost::optional<RecordId> stashedRecordId; + + // Number docs in jumbo chunk cloned so far + int docsCloned = 0; + }; + + // Set only once its discovered a chunk is jumbo + boost::optional<JumboChunkCloneState> _jumboChunkCloneState; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 678557c5a1e..c117aa5b26a 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -154,7 +154,8 @@ protected: chunkRange, 1024 * 1024, MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault), - false); + false, + MoveChunkRequest::ForceJumbo::kDoNotForce); return assertGet(MoveChunkRequest::createFromCommand(kNss, cmdBuilder.obj())); } |