summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorJanna Golden <janna.golden@mongodb.com>2019-11-05 15:52:39 +0000
committerevergreen <evergreen@mongodb.com>2019-11-05 15:52:39 +0000
commitc150b588cb4400e0324becd916de2a699988af99 (patch)
treeb007619ce25c62cad0e275d01a8f57be0fa36d57 /src/mongo/db/s
parentda5b6eff05c710029994ae1e06b47d1974974f6c (diff)
downloadmongo-c150b588cb4400e0324becd916de2a699988af99.tar.gz
SERVER-42273 Introduce 'force' option to 'moveChunk' to allow migrating jumbo chunks
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/active_migrations_registry_test.cpp3
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp20
-rw-r--r--src/mongo/db/s/balancer/balancer.h3
-rw-r--r--src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp7
-rw-r--r--src/mongo/db/s/balancer/balancer_policy.cpp30
-rw-r--r--src/mongo/db/s/balancer/balancer_policy.h12
-rw-r--r--src/mongo/db/s/balancer/balancer_policy_test.cpp71
-rw-r--r--src/mongo/db/s/balancer/migration_manager.cpp4
-rw-r--r--src/mongo/db/s/balancer/migration_manager_test.cpp43
-rw-r--r--src/mongo/db/s/balancer/scoped_migration_request_test.cpp2
-rw-r--r--src/mongo/db/s/balancer/type_migration.cpp22
-rw-r--r--src/mongo/db/s/balancer/type_migration.h6
-rw-r--r--src/mongo/db/s/balancer/type_migration_test.cpp6
-rw-r--r--src/mongo/db/s/config/configsvr_move_chunk_command.cpp3
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp354
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h39
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp3
17 files changed, 449 insertions, 179 deletions
diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp
index 0af9cf3aeba..31c3262724d 100644
--- a/src/mongo/db/s/active_migrations_registry_test.cpp
+++ b/src/mongo/db/s/active_migrations_registry_test.cpp
@@ -61,7 +61,8 @@ MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) {
ChunkRange(BSON("Key" << -100), BSON("Key" << 100)),
1024,
MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff),
- true);
+ true,
+ MoveChunkRequest::ForceJumbo::kDoNotForce);
return assertGet(MoveChunkRequest::createFromCommand(nss, builder.obj()));
}
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index 0f1f30a89dc..67bcb23220f 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -273,14 +273,22 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx,
const ShardId& newShardId,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete) {
+ bool waitForDelete,
+ bool forceJumbo) {
auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(opCtx, chunk, newShardId);
if (!moveAllowedStatus.isOK()) {
return moveAllowedStatus;
}
return _migrationManager.executeManualMigration(
- opCtx, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete);
+ opCtx,
+ MigrateInfo(newShardId,
+ chunk,
+ forceJumbo ? MoveChunkRequest::ForceJumbo::kForceManual
+ : MoveChunkRequest::ForceJumbo::kDoNotForce),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete);
}
void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) {
@@ -603,7 +611,13 @@ int Balancer::_moveChunks(OperationContext* opCtx,
});
invariant(requestIt != candidateChunks.end());
- if (status == ErrorCodes::ChunkTooBig) {
+ // ChunkTooBig is returned by the source shard during the cloning phase if the migration
+ // manager finds that the chunk is larger than some calculated size, the source shard is
+ // *not* in draining mode, and the 'forceJumbo' balancer setting is 'kDoNotForce'.
+ // ExceededMemoryLimit is returned when the transfer mods queue surpasses 500MB regardless
+ // of whether the source shard is in draining mode or the value if the 'froceJumbo' balancer
+ // setting.
+ if (status == ErrorCodes::ChunkTooBig || status == ErrorCodes::ExceededMemoryLimit) {
numChunksProcessed++;
log() << "Performing a split because migration " << redact(requestIt->toString())
diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h
index 1df09000920..b4d8c9e02a0 100644
--- a/src/mongo/db/s/balancer/balancer.h
+++ b/src/mongo/db/s/balancer/balancer.h
@@ -136,7 +136,8 @@ public:
const ShardId& newShardId,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete);
+ bool waitForDelete,
+ bool forceJumbo);
/**
* Appends the runtime state of the balancer instance to the specified builder.
diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
index a8f7cde67f4..4e24a95ea91 100644
--- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
@@ -39,6 +39,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj_comparator_interface.h"
+#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_tags.h"
@@ -468,7 +469,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi
}
}
- return BalancerPolicy::balance(shardStats, distribution, usedShards);
+ return BalancerPolicy::balance(
+ shardStats,
+ distribution,
+ usedShards,
+ Grid::get(opCtx)->getBalancerConfiguration()->attemptToBalanceJumboChunks());
}
} // namespace mongo
diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp
index 6b3fb9d78ba..894c40dea3d 100644
--- a/src/mongo/db/s/balancer/balancer_policy.cpp
+++ b/src/mongo/db/s/balancer/balancer_policy.cpp
@@ -347,12 +347,15 @@ MigrateInfo chooseRandomMigration(const ShardStatisticsVector& shardStats,
const auto& chunks = distribution.getChunks(sourceShardId);
- return {destShardId, chunks[getRandomIndex(chunks.size())]};
+ return {destShardId,
+ chunks[getRandomIndex(chunks.size())],
+ MoveChunkRequest::ForceJumbo::kDoNotForce};
}
vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
- std::set<ShardId>* usedShards) {
+ std::set<ShardId>* usedShards,
+ bool forceJumbo) {
vector<MigrateInfo> migrations;
if (MONGO_unlikely(balancerShouldReturnRandomMigrations.shouldFail()) &&
@@ -405,7 +408,7 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt
}
invariant(to != stat.shardId);
- migrations.emplace_back(to, chunk);
+ migrations.emplace_back(to, chunk, MoveChunkRequest::ForceJumbo::kForceBalancer);
invariant(usedShards->insert(stat.shardId).second);
invariant(usedShards->insert(to).second);
break;
@@ -452,7 +455,10 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt
}
invariant(to != stat.shardId);
- migrations.emplace_back(to, chunk);
+ migrations.emplace_back(to,
+ chunk,
+ forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer
+ : MoveChunkRequest::ForceJumbo::kDoNotForce);
invariant(usedShards->insert(stat.shardId).second);
invariant(usedShards->insert(to).second);
break;
@@ -498,7 +504,9 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt
tag,
idealNumberOfChunksPerShardForTag,
&migrations,
- usedShards))
+ usedShards,
+ forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer
+ : MoveChunkRequest::ForceJumbo::kDoNotForce))
;
}
@@ -517,7 +525,7 @@ boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk(
return boost::optional<MigrateInfo>();
}
- return MigrateInfo(newShardId, chunk);
+ return MigrateInfo(newShardId, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce);
}
bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats,
@@ -525,7 +533,8 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats,
const string& tag,
size_t idealNumberOfChunksPerShardForTag,
vector<MigrateInfo>* migrations,
- set<ShardId>* usedShards) {
+ set<ShardId>* usedShards,
+ MoveChunkRequest::ForceJumbo forceJumbo) {
const ShardId from = _getMostOverloadedShard(shardStats, distribution, tag, *usedShards);
if (!from.isValid())
return false;
@@ -576,7 +585,7 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats,
continue;
}
- migrations->emplace_back(to, chunk);
+ migrations->emplace_back(to, chunk, forceJumbo);
invariant(usedShards->insert(chunk.getShard()).second);
invariant(usedShards->insert(to).second);
return true;
@@ -598,7 +607,9 @@ string ZoneRange::toString() const {
return str::stream() << min << " -->> " << max << " on " << zone;
}
-MigrateInfo::MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk) {
+MigrateInfo::MigrateInfo(const ShardId& a_to,
+ const ChunkType& a_chunk,
+ const MoveChunkRequest::ForceJumbo a_forceJumbo) {
invariant(a_chunk.validate());
invariant(a_to.isValid());
@@ -609,6 +620,7 @@ MigrateInfo::MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk) {
minKey = a_chunk.getMin();
maxKey = a_chunk.getMax();
version = a_chunk.getVersion();
+ forceJumbo = a_forceJumbo;
}
std::string MigrateInfo::getName() const {
diff --git a/src/mongo/db/s/balancer/balancer_policy.h b/src/mongo/db/s/balancer/balancer_policy.h
index 326fd7985a2..c75c5ceda2d 100644
--- a/src/mongo/db/s/balancer/balancer_policy.h
+++ b/src/mongo/db/s/balancer/balancer_policy.h
@@ -37,6 +37,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/balancer/cluster_statistics.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/request_types/move_chunk_request.h"
#include "mongo/s/shard_id.h"
namespace mongo {
@@ -52,7 +53,9 @@ struct ZoneRange {
};
struct MigrateInfo {
- MigrateInfo(const ShardId& a_to, const ChunkType& a_chunk);
+ MigrateInfo(const ShardId& a_to,
+ const ChunkType& a_chunk,
+ MoveChunkRequest::ForceJumbo a_forceJumbo);
std::string getName() const;
@@ -66,6 +69,7 @@ struct MigrateInfo {
BSONObj minKey;
BSONObj maxKey;
ChunkVersion version;
+ MoveChunkRequest::ForceJumbo forceJumbo;
};
typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector;
@@ -190,7 +194,8 @@ public:
*/
static std::vector<MigrateInfo> balance(const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
- std::set<ShardId>* usedShards);
+ std::set<ShardId>* usedShards,
+ bool forceJumbo);
/**
* Using the specified distribution information, returns a suggested better location for the
@@ -236,7 +241,8 @@ private:
const std::string& tag,
size_t idealNumberOfChunksPerShardForTag,
std::vector<MigrateInfo>* migrations,
- std::set<ShardId>* usedShards);
+ std::set<ShardId>* usedShards,
+ MoveChunkRequest::ForceJumbo forceJumbo);
};
} // namespace mongo
diff --git a/src/mongo/db/s/balancer/balancer_policy_test.cpp b/src/mongo/db/s/balancer/balancer_policy_test.cpp
index 0c5e9fe5822..6ef9f2543a8 100644
--- a/src/mongo/db/s/balancer/balancer_policy_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_policy_test.cpp
@@ -113,9 +113,10 @@ std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster(
std::vector<MigrateInfo> balanceChunks(const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
- bool shouldAggressivelyBalance) {
+ bool shouldAggressivelyBalance,
+ bool forceJumbo) {
std::set<ShardId> usedShards;
- return BalancerPolicy::balance(shardStats, distribution, &usedShards);
+ return BalancerPolicy::balance(shardStats, distribution, &usedShards, forceJumbo);
}
TEST(BalancerPolicy, Basic) {
@@ -125,7 +126,7 @@ TEST(BalancerPolicy, Basic) {
{ShardStatistics(kShardId2, kNoMaxSize, 3, false, emptyTagSet, emptyShardVersion), 3}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@@ -140,7 +141,7 @@ TEST(BalancerPolicy, SmallClusterShouldBePerfectlyBalanced) {
{ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId1, migrations[0].from);
ASSERT_EQ(kShardId2, migrations[0].to);
@@ -153,10 +154,11 @@ TEST(BalancerPolicy, SingleChunkShouldNotMove) {
{{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
- ASSERT(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true).empty());
- ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)
+ ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true, false)
.empty());
+ ASSERT(
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)
+ .empty());
}
TEST(BalancerPolicy, BalanceThresholdObeyed) {
@@ -166,10 +168,11 @@ TEST(BalancerPolicy, BalanceThresholdObeyed) {
{ShardStatistics(kShardId2, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId3, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}});
- ASSERT(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true).empty());
- ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false)
+ ASSERT(balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), true, false)
.empty());
+ ASSERT(
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false)
+ .empty());
}
TEST(BalancerPolicy, ParallelBalancing) {
@@ -180,7 +183,7 @@ TEST(BalancerPolicy, ParallelBalancing) {
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@@ -204,7 +207,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotPutChunksOnShardsAboveTheOptimal) {
{ShardStatistics(kShardId5, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@@ -226,7 +229,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotMoveChunksFromShardsBelowOptimal) {
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@@ -245,7 +248,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNe
// Here kShardId0 would have been selected as a donor
std::set<ShardId> usedShards{kShardId0};
const auto migrations(BalancerPolicy::balance(
- cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards));
+ cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId1, migrations[0].from);
@@ -264,7 +267,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNo
// Here kShardId0 would have been selected as a donor
std::set<ShardId> usedShards{kShardId0};
const auto migrations(BalancerPolicy::balance(
- cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards));
+ cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false));
ASSERT_EQ(0U, migrations.size());
}
@@ -278,7 +281,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) {
// Here kShardId2 would have been selected as a recipient
std::set<ShardId> usedShards{kShardId2};
const auto migrations(BalancerPolicy::balance(
- cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards));
+ cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@@ -298,7 +301,7 @@ TEST(BalancerPolicy, JumboChunksNotMoved) {
cluster.second[kShardId0][3].setJumbo(true);
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@@ -324,7 +327,7 @@ TEST(BalancerPolicy, JumboChunksNotMovedParallel) {
cluster.second[kShardId2][3].setJumbo(true);
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@@ -345,7 +348,7 @@ TEST(BalancerPolicy, DrainingSingleChunk) {
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@@ -362,7 +365,7 @@ TEST(BalancerPolicy, DrainingSingleChunkPerShard) {
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@@ -383,7 +386,7 @@ TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) {
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@@ -400,7 +403,7 @@ TEST(BalancerPolicy, DrainingMultipleShardsFirstOneSelected) {
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 16}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId2, migrations[0].to);
@@ -416,7 +419,7 @@ TEST(BalancerPolicy, DrainingMultipleShardsWontAcceptChunks) {
{ShardStatistics(kShardId2, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT(migrations.empty());
}
@@ -430,7 +433,7 @@ TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) {
ASSERT_OK(distribution.addRangeToZone(ZoneRange(
cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "LAX")));
- const auto migrations(balanceChunks(cluster.first, distribution, false));
+ const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId2, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@@ -448,7 +451,7 @@ TEST(BalancerPolicy, DrainingNoAppropriateShardsFoundDueToTag) {
ASSERT_OK(distribution.addRangeToZone(ZoneRange(
cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "SEA")));
- const auto migrations(balanceChunks(cluster.first, distribution, false));
+ const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT(migrations.empty());
}
@@ -460,7 +463,7 @@ TEST(BalancerPolicy, NoBalancingDueToAllNodesEitherDrainingOrMaxedOut) {
{ShardStatistics(kShardId2, kNoMaxSize, 1, true, emptyTagSet, emptyShardVersion), 1}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT(migrations.empty());
}
@@ -474,7 +477,7 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeOnlyBalanceToNonMaxed) {
{ShardStatistics(kShardId2, kNoMaxSize, 10, false, emptyTagSet, emptyShardVersion), 10}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId2, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@@ -492,7 +495,7 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeWhenAllBalanced) {
{ShardStatistics(kShardId2, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}});
const auto migrations(
- balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ balanceChunks(cluster.first, DistributionStatus(kNamespace, cluster.second), false, false));
ASSERT(migrations.empty());
}
@@ -507,7 +510,7 @@ TEST(BalancerPolicy, BalancerRespectsTagsWhenDraining) {
ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "a")));
ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "b")));
- const auto migrations(balanceChunks(cluster.first, distribution, false));
+ const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId1, migrations[0].from);
ASSERT_EQ(kShardId0, migrations[0].to);
@@ -526,7 +529,7 @@ TEST(BalancerPolicy, BalancerRespectsTagPolicyBeforeImbalance) {
DistributionStatus distribution(kNamespace, cluster.second);
ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 100), "a")));
- const auto migrations(balanceChunks(cluster.first, distribution, false));
+ const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId2, migrations[0].from);
ASSERT_EQ(kShardId0, migrations[0].to);
@@ -546,7 +549,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsWithCrossShardViolationOfTags) {
ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 1), "b")));
ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "a")));
- const auto migrations(balanceChunks(cluster.first, distribution, false));
+ const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId2, migrations[0].to);
@@ -564,7 +567,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedCluster) {
DistributionStatus distribution(kNamespace, cluster.second);
ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 10), "a")));
- const auto migrations(balanceChunks(cluster.first, distribution, false));
+ const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId2, migrations[0].from);
ASSERT_EQ(kShardId0, migrations[0].to);
@@ -583,7 +586,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParalle
DistributionStatus distribution(kNamespace, cluster.second);
ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 20), "a")));
- const auto migrations(balanceChunks(cluster.first, distribution, false));
+ const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId2, migrations[0].from);
@@ -606,7 +609,7 @@ TEST(BalancerPolicy, BalancerHandlesNoShardsWithTag) {
ASSERT_OK(
distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "NonExistentZone")));
- ASSERT(balanceChunks(cluster.first, distribution, false).empty());
+ ASSERT(balanceChunks(cluster.first, distribution, false, false).empty());
}
TEST(DistributionStatus, AddTagRangeOverlap) {
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp
index 4af124368e4..ae0997fa9de 100644
--- a/src/mongo/db/s/balancer/migration_manager.cpp
+++ b/src/mongo/db/s/balancer/migration_manager.cpp
@@ -170,7 +170,6 @@ Status MigrationManager::executeManualMigration(
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete) {
_waitForRecovery();
-
// Write a document to the config.migrations collection, in case this migration must be
// recovered by the Balancer. Fail if the chunk is already moving.
auto statusWithScopedMigrationRequest =
@@ -455,7 +454,8 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
ChunkRange(migrateInfo.minKey, migrateInfo.maxKey),
maxChunkSizeBytes,
secondaryThrottle,
- waitForDelete);
+ waitForDelete,
+ migrateInfo.forceJumbo);
stdx::lock_guard<Latch> lock(_mutex);
diff --git a/src/mongo/db/s/balancer/migration_manager_test.cpp b/src/mongo/db/s/balancer/migration_manager_test.cpp
index 23f3b42d223..8af4d60993f 100644
--- a/src/mongo/db/s/balancer/migration_manager_test.cpp
+++ b/src/mongo/db/s/balancer/migration_manager_test.cpp
@@ -224,6 +224,7 @@ void MigrationManagerTest::setUpMigration(const ChunkType& chunk, const ShardId&
builder.append(MigrationType::toShard(), toShard.toString());
builder.append(MigrationType::fromShard(), chunk.getShard().toString());
chunk.getVersion().appendWithField(&builder, "chunkVersion");
+ builder.append(MigrationType::forceJumbo(), "doNotForceJumbo");
MigrationType migrationType = assertGet(MigrationType::fromBSON(builder.obj()));
ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(),
@@ -311,7 +312,9 @@ TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) {
setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
// Going to request that these two chunks get migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}};
+ const std::vector<MigrateInfo> migrationRequests{
+ {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}};
auto future = launchAsync([this, migrationRequests] {
ThreadClient tc("Test", getGlobalServiceContext());
@@ -370,10 +373,11 @@ TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) {
setUpChunk(collName2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2);
// Going to request that these four chunks get migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1coll1},
- {kShardId3, chunk2coll1},
- {kShardId1, chunk1coll2},
- {kShardId3, chunk2coll2}};
+ const std::vector<MigrateInfo> migrationRequests{
+ {kShardId1, chunk1coll1, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ {kShardId3, chunk2coll1, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ {kShardId1, chunk1coll2, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ {kShardId3, chunk2coll2, MoveChunkRequest::ForceJumbo::kDoNotForce}};
auto future = launchAsync([this, migrationRequests] {
ThreadClient tc("Test", getGlobalServiceContext());
@@ -427,7 +431,9 @@ TEST_F(MigrationManagerTest, SourceShardNotFound) {
setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
// Going to request that these two chunks get migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}};
+ const std::vector<MigrateInfo> migrationRequests{
+ {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}};
auto future = launchAsync([this, chunk1, chunk2, migrationRequests] {
ThreadClient tc("Test", getGlobalServiceContext());
@@ -474,7 +480,8 @@ TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) {
setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
// Going to request that this chunk gets migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}};
+ const std::vector<MigrateInfo> migrationRequests{
+ {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce}};
auto future = launchAsync([this, chunk1, migrationRequests] {
ThreadClient tc("Test", getGlobalServiceContext());
@@ -525,7 +532,11 @@ TEST_F(MigrationManagerTest, InterruptMigration) {
ASSERT_EQ(ErrorCodes::BalancerInterrupted,
_migrationManager->executeManualMigration(
- opCtx.get(), {kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false));
+ opCtx.get(),
+ {kShardId1, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ 0,
+ kDefaultSecondaryThrottle,
+ false));
});
// Wait till the move chunk request gets sent and pretend that it is stuck by never responding
@@ -549,7 +560,11 @@ TEST_F(MigrationManagerTest, InterruptMigration) {
// Ensure that no new migrations can be scheduled
ASSERT_EQ(ErrorCodes::BalancerInterrupted,
_migrationManager->executeManualMigration(
- operationContext(), {kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false));
+ operationContext(),
+ {kShardId1, chunk, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ 0,
+ kDefaultSecondaryThrottle,
+ false));
// Ensure that the migration manager is no longer handling any migrations.
_migrationManager->drainActiveMigrations();
@@ -613,7 +628,11 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) {
shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
ASSERT_OK(_migrationManager->executeManualMigration(
- opCtx.get(), {kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false));
+ opCtx.get(),
+ {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ 0,
+ kDefaultSecondaryThrottle,
+ false));
});
// Expect only one moveChunk command to be called.
@@ -760,7 +779,9 @@ TEST_F(MigrationManagerTest, RemoteCallErrorConversionToOperationFailed) {
setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
// Going to request that these two chunks get migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1, chunk1}, {kShardId3, chunk2}};
+ const std::vector<MigrateInfo> migrationRequests{
+ {kShardId1, chunk1, MoveChunkRequest::ForceJumbo::kDoNotForce},
+ {kShardId3, chunk2, MoveChunkRequest::ForceJumbo::kDoNotForce}};
auto future = launchAsync([&] {
ThreadClient tc("Test", getGlobalServiceContext());
diff --git a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp b/src/mongo/db/s/balancer/scoped_migration_request_test.cpp
index fd380f9c613..e30c73a1a1e 100644
--- a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp
+++ b/src/mongo/db/s/balancer/scoped_migration_request_test.cpp
@@ -113,7 +113,7 @@ MigrateInfo makeMigrateInfo() {
ChunkType chunkType = assertGet(ChunkType::parseFromConfigBSONCommand(chunkBuilder.obj()));
ASSERT_OK(chunkType.validate());
- return MigrateInfo(kToShard, chunkType);
+ return MigrateInfo(kToShard, chunkType, MoveChunkRequest::ForceJumbo::kDoNotForce);
}
TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) {
diff --git a/src/mongo/db/s/balancer/type_migration.cpp b/src/mongo/db/s/balancer/type_migration.cpp
index a3d47608604..4bedb26300c 100644
--- a/src/mongo/db/s/balancer/type_migration.cpp
+++ b/src/mongo/db/s/balancer/type_migration.cpp
@@ -50,6 +50,7 @@ const BSONField<BSONObj> MigrationType::max("max");
const BSONField<std::string> MigrationType::fromShard("fromShard");
const BSONField<std::string> MigrationType::toShard("toShard");
const BSONField<bool> MigrationType::waitForDelete("waitForDelete");
+const BSONField<std::string> MigrationType::forceJumbo("forceJumbo");
MigrationType::MigrationType() = default;
@@ -60,7 +61,8 @@ MigrationType::MigrationType(MigrateInfo info, bool waitForDelete)
_fromShard(info.from),
_toShard(info.to),
_chunkVersion(info.version),
- _waitForDelete(waitForDelete) {}
+ _waitForDelete(waitForDelete),
+ _forceJumbo(MoveChunkRequest::forceJumboToString(info.forceJumbo)) {}
StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) {
MigrationType migrationType;
@@ -115,6 +117,21 @@ StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) {
migrationType._waitForDelete = waitForDeleteVal;
}
+ {
+ std::string forceJumboVal;
+ Status status = bsonExtractStringField(source, forceJumbo.name(), &forceJumboVal);
+ if (!status.isOK())
+ return status;
+
+ auto forceJumbo = MoveChunkRequest::parseForceJumbo(forceJumboVal);
+ if (forceJumbo != MoveChunkRequest::ForceJumbo::kDoNotForce &&
+ forceJumbo != MoveChunkRequest::ForceJumbo::kForceManual &&
+ forceJumbo != MoveChunkRequest::ForceJumbo::kForceBalancer) {
+ return Status{ErrorCodes::BadValue, "Unknown value for forceJumbo"};
+ }
+ migrationType._forceJumbo = std::move(forceJumboVal);
+ }
+
return migrationType;
}
@@ -132,6 +149,7 @@ BSONObj MigrationType::toBSON() const {
_chunkVersion.appendWithField(&builder, kChunkVersion);
builder.append(waitForDelete.name(), _waitForDelete);
+ builder.append(forceJumbo.name(), _forceJumbo);
return builder.obj();
}
@@ -143,7 +161,7 @@ MigrateInfo MigrationType::toMigrateInfo() const {
chunk.setMax(_max);
chunk.setVersion(_chunkVersion);
- return MigrateInfo(_toShard, chunk);
+ return MigrateInfo(_toShard, chunk, MoveChunkRequest::parseForceJumbo(_forceJumbo));
}
} // namespace mongo
diff --git a/src/mongo/db/s/balancer/type_migration.h b/src/mongo/db/s/balancer/type_migration.h
index 23fc1cf85a8..9f383244f8d 100644
--- a/src/mongo/db/s/balancer/type_migration.h
+++ b/src/mongo/db/s/balancer/type_migration.h
@@ -54,6 +54,7 @@ public:
static const BSONField<std::string> fromShard;
static const BSONField<std::string> toShard;
static const BSONField<bool> waitForDelete;
+ static const BSONField<std::string> forceJumbo;
/**
* The Balancer encapsulates migration information in MigrateInfo objects, so this facilitates
@@ -85,6 +86,10 @@ public:
return _waitForDelete;
}
+ MoveChunkRequest::ForceJumbo getForceJumbo() const {
+ return MoveChunkRequest::parseForceJumbo(_forceJumbo);
+ }
+
private:
MigrationType();
@@ -96,6 +101,7 @@ private:
ShardId _toShard;
ChunkVersion _chunkVersion;
bool _waitForDelete{false};
+ std::string _forceJumbo{0};
};
} // namespace mongo
diff --git a/src/mongo/db/s/balancer/type_migration_test.cpp b/src/mongo/db/s/balancer/type_migration_test.cpp
index 3059480cfba..a1d2e9193b9 100644
--- a/src/mongo/db/s/balancer/type_migration_test.cpp
+++ b/src/mongo/db/s/balancer/type_migration_test.cpp
@@ -61,7 +61,7 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) {
ChunkType chunkType = assertGet(ChunkType::fromConfigBSON(chunkBuilder.obj()));
ASSERT_OK(chunkType.validate());
- MigrateInfo migrateInfo(kToShard, chunkType);
+ MigrateInfo migrateInfo(kToShard, chunkType, MoveChunkRequest::ForceJumbo::kDoNotForce);
MigrationType migrationType(migrateInfo, kWaitForDelete);
BSONObjBuilder builder;
@@ -72,6 +72,8 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) {
builder.append(MigrationType::toShard(), kToShard.toString());
version.appendWithField(&builder, "chunkVersion");
builder.append(MigrationType::waitForDelete(), kWaitForDelete);
+ builder.append(MigrationType::forceJumbo(),
+ MoveChunkRequest::forceJumboToString(MoveChunkRequest::ForceJumbo::kDoNotForce));
BSONObj obj = builder.obj();
@@ -89,6 +91,8 @@ TEST(MigrationTypeTest, FromAndToBSON) {
builder.append(MigrationType::toShard(), kToShard.toString());
version.appendWithField(&builder, "chunkVersion");
builder.append(MigrationType::waitForDelete(), kWaitForDelete);
+ builder.append(MigrationType::forceJumbo(),
+ MoveChunkRequest::forceJumboToString(MoveChunkRequest::ForceJumbo::kDoNotForce));
BSONObj obj = builder.obj();
diff --git a/src/mongo/db/s/config/configsvr_move_chunk_command.cpp b/src/mongo/db/s/config/configsvr_move_chunk_command.cpp
index 9ed7f4f292d..ab8c8e752ec 100644
--- a/src/mongo/db/s/config/configsvr_move_chunk_command.cpp
+++ b/src/mongo/db/s/config/configsvr_move_chunk_command.cpp
@@ -100,7 +100,8 @@ public:
request.getToShardId(),
request.getMaxChunkSizeBytes(),
request.getSecondaryThrottle(),
- request.getWaitForDelete()));
+ request.getWaitForDelete(),
+ request.getForceJumbo()));
} else {
uassertStatusOK(Balancer::get(opCtx)->rebalanceSingleChunk(opCtx, request.getChunk()));
}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 6a5ce99c1c7..6162fc725ca 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -40,7 +40,6 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_descriptor.h"
-#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/collection_sharding_runtime.h"
@@ -69,6 +68,9 @@ const char kRecvChunkCommit[] = "_recvChunkCommit";
const char kRecvChunkAbort[] = "_recvChunkAbort";
const int kMaxObjectPerChunk{250000};
+const Hours kMaxWaitToCommitCloneForJumboChunk(6);
+
+MONGO_FAIL_POINT_DEFINE(failTooMuchMemoryUsed);
bool isInRange(const BSONObj& obj,
const BSONObj& min,
@@ -227,7 +229,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ
_sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(),
_args.getToShardId().toString())),
_donorConnStr(std::move(donorConnStr)),
- _recipientHost(std::move(recipientHost)) {}
+ _recipientHost(std::move(recipientHost)),
+ _forceJumbo(_args.getForceJumbo() != MoveChunkRequest::ForceJumbo::kDoNotForce) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -263,7 +266,10 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
PrepareConflictBehavior::kIgnoreConflicts);
auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx);
- if (!storeCurrentLocsStatus.isOK()) {
+ if (storeCurrentLocsStatus == ErrorCodes::ChunkTooBig && _forceJumbo) {
+ stdx::lock_guard<Latch> sl(_mutex);
+ _jumboChunkCloneState.emplace();
+ } else if (!storeCurrentLocsStatus.isOK()) {
return storeCurrentLocsStatus;
}
}
@@ -302,89 +308,30 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
OperationContext* opCtx, Milliseconds maxTimeToWait) {
invariant(_state == kCloning);
invariant(!opCtx->lockState()->isLocked());
-
- const auto startTime = Date_t::now();
-
- int iteration = 0;
- while ((Date_t::now() - startTime) < maxTimeToWait) {
- auto responseStatus = _callRecipient(
- createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true));
- if (!responseStatus.isOK()) {
- return responseStatus.getStatus().withContext(
- "Failed to contact recipient shard to monitor data transfer");
- }
-
- const BSONObj& res = responseStatus.getValue();
-
- if (!res["waited"].boolean()) {
- sleepmillis(1LL << std::min(iteration, 10));
- }
- iteration++;
-
- stdx::lock_guard<Latch> sl(_mutex);
-
- const std::size_t cloneLocsRemaining = _cloneLocs.size();
-
- log() << "moveChunk data transfer progress: " << redact(res) << " mem used: " << _memoryUsed
- << " documents remaining to clone: " << cloneLocsRemaining;
-
- if (res["state"].String() == "steady") {
- if (cloneLocsRemaining != 0) {
- return {ErrorCodes::OperationIncomplete,
- str::stream() << "Unable to enter critical section because the recipient "
- "shard thinks all data is cloned while there are still "
- << cloneLocsRemaining << " documents remaining"};
- }
-
- return Status::OK();
- }
-
- if (res["state"].String() == "fail") {
- return {ErrorCodes::OperationFailed,
- str::stream() << "Data transfer error: " << res["errmsg"].str()};
- }
-
- auto migrationSessionIdStatus = MigrationSessionId::extractFromBSON(res);
- if (!migrationSessionIdStatus.isOK()) {
- return {ErrorCodes::OperationIncomplete,
- str::stream() << "Unable to retrieve the id of the migration session due to "
- << migrationSessionIdStatus.getStatus().toString()};
- }
-
- if (res["ns"].str() != _args.getNss().ns() ||
- (res.hasField("fromShardId")
- ? (res["fromShardId"].str() != _args.getFromShardId().toString())
- : (res["from"].str() != _donorConnStr.toString())) ||
- !res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 ||
- !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 ||
- !_sessionId.matches(migrationSessionIdStatus.getValue())) {
- // This can happen when the destination aborted the migration and received another
- // recvChunk before this thread sees the transition to the abort state. This is
- // currently possible only if multiple migrations are happening at once. This is an
- // unfortunate consequence of the shards not being able to keep track of multiple
- // incoming and outgoing migrations.
- return {ErrorCodes::OperationIncomplete,
- "Destination shard aborted migration because a new one is running"};
- }
-
- if (_memoryUsed > 500 * 1024 * 1024) {
- // This is too much memory for us to use so we're going to abort the migration
- return {ErrorCodes::ExceededMemoryLimit,
- "Aborting migration because of high memory usage"};
- }
-
- Status interruptStatus = opCtx->checkForInterruptNoAssert();
- if (!interruptStatus.isOK()) {
- return interruptStatus;
- }
+ // If this migration is manual migration that specified "force", enter the critical section
+ // immediately. This means the entire cloning phase will be done under the critical section.
+ if (_jumboChunkCloneState &&
+ _args.getForceJumbo() == MoveChunkRequest::ForceJumbo::kForceManual) {
+ return Status::OK();
}
- return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"};
+ return _checkRecipientCloningStatus(opCtx, maxTimeToWait);
}
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
invariant(_state == kCloning);
invariant(!opCtx->lockState()->isLocked());
+ if (_jumboChunkCloneState && _forceJumbo) {
+ if (_args.getForceJumbo() == MoveChunkRequest::ForceJumbo::kForceManual) {
+ auto status = _checkRecipientCloningStatus(opCtx, kMaxWaitToCommitCloneForJumboChunk);
+ if (!status.isOK()) {
+ return status;
+ }
+ } else {
+ invariant(PlanExecutor::IS_EOF == _jumboChunkCloneState->clonerState);
+ invariant(_cloneLocs.empty());
+ }
+ }
if (_sessionCatalogSource) {
_sessionCatalogSource->onCommitCloneStarted();
@@ -605,18 +552,78 @@ void MigrationChunkClonerSourceLegacy::_decrementOutstandingOperationTrackReques
}
}
-uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
- stdx::lock_guard<Latch> sl(_mutex);
+void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationContext* opCtx,
+ Collection* collection,
+ BSONArrayBuilder* arrBuilder) {
+ ElapsedTracker tracker(opCtx->getServiceContext()->getFastClockSource(),
+ internalQueryExecYieldIterations.load(),
+ Milliseconds(internalQueryExecYieldPeriodMS.load()));
- return std::min(static_cast<uint64_t>(BSONObjMaxUserSize),
- _averageObjectSizeForCloneLocs * _cloneLocs.size());
-}
+ if (!_jumboChunkCloneState->clonerExec) {
+ auto exec = uassertStatusOK(_getIndexScanExecutor(opCtx, collection));
+ _jumboChunkCloneState->clonerExec = std::move(exec);
+ } else {
+ _jumboChunkCloneState->clonerExec->reattachToOperationContext(opCtx);
+ _jumboChunkCloneState->clonerExec->restoreState();
+ }
-Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
- Collection* collection,
- BSONArrayBuilder* arrBuilder) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS));
+ BSONObj obj;
+ RecordId recordId;
+ PlanExecutor::ExecState execState;
+
+ while (PlanExecutor::ADVANCED ==
+ (execState = _jumboChunkCloneState->clonerExec->getNext(
+ &obj, _jumboChunkCloneState->stashedRecordId ? nullptr : &recordId))) {
+
+ stdx::unique_lock<Latch> lk(_mutex);
+ _jumboChunkCloneState->clonerState = execState;
+ lk.unlock();
+
+ opCtx->checkForInterrupt();
+
+ // Use the builder size instead of accumulating the document sizes directly so
+ // that we take into consideration the overhead of BSONArray indices.
+ if (arrBuilder->arrSize() &&
+ (arrBuilder->len() + obj.objsize() + 1024) > BSONObjMaxUserSize) {
+ _jumboChunkCloneState->clonerExec->enqueue(obj);
+
+ // Stash the recordId we just read to add to the next batch.
+ if (!recordId.isNull()) {
+ invariant(!_jumboChunkCloneState->stashedRecordId);
+ _jumboChunkCloneState->stashedRecordId = std::move(recordId);
+ }
+
+ break;
+ }
+
+ Snapshotted<BSONObj> doc;
+ invariant(collection->findDoc(
+ opCtx, _jumboChunkCloneState->stashedRecordId.value_or(recordId), &doc));
+ arrBuilder->append(doc.value());
+ _jumboChunkCloneState->stashedRecordId = boost::none;
+
+ lk.lock();
+ _jumboChunkCloneState->docsCloned++;
+ lk.unlock();
+
+ ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
+ }
+
+ stdx::unique_lock<Latch> lk(_mutex);
+ _jumboChunkCloneState->clonerState = execState;
+ lk.unlock();
+
+ _jumboChunkCloneState->clonerExec->saveState();
+ _jumboChunkCloneState->clonerExec->detachFromOperationContext();
+
+ if (PlanExecutor::FAILURE == execState)
+ uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(obj).withContext(
+ "Executor error while scanning for documents belonging to chunk"));
+}
+void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx,
+ Collection* collection,
+ BSONArrayBuilder* arrBuilder) {
ElapsedTracker tracker(opCtx->getServiceContext()->getFastClockSource(),
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
@@ -625,8 +632,8 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
auto iter = _cloneLocs.begin();
for (; iter != _cloneLocs.end(); ++iter) {
- // We must always make progress in this method by at least one document because empty return
- // indicates there is no more initial clone data.
+ // We must always make progress in this method by at least one document because empty
+ // return indicates there is no more initial clone data.
if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) {
break;
}
@@ -641,6 +648,7 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
// that we take into consideration the overhead of BSONArray indices.
if (arrBuilder->arrSize() &&
(arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) {
+
break;
}
@@ -652,7 +660,34 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
}
_cloneLocs.erase(_cloneLocs.begin(), iter);
+}
+
+uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
+ stdx::lock_guard<Latch> sl(_mutex);
+ if (_jumboChunkCloneState && _forceJumbo)
+ return static_cast<uint64_t>(BSONObjMaxUserSize);
+
+ return std::min(static_cast<uint64_t>(BSONObjMaxUserSize),
+ _averageObjectSizeForCloneLocs * _cloneLocs.size());
+}
+
+Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
+ Collection* collection,
+ BSONArrayBuilder* arrBuilder) {
+ dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS));
+ // If this chunk is too large to store records in _cloneLocs and the command args specify to
+ // attempt to move it, scan the collection directly.
+ if (_jumboChunkCloneState && _forceJumbo) {
+ try {
+ _nextCloneBatchFromIndexScan(opCtx, collection, arrBuilder);
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ }
+
+ _nextCloneBatchFromCloneLocs(opCtx, collection, arrBuilder);
return Status::OK();
}
@@ -732,15 +767,9 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO
return responseStatus.data.getOwned();
}
-Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) {
- AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
-
- Collection* const collection = autoColl.getCollection();
- if (!collection) {
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "Collection " << _args.getNss().ns() << " does not exist."};
- }
-
+StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
+MigrationChunkClonerSourceLegacy::_getIndexScanExecutor(OperationContext* opCtx,
+ Collection* const collection) {
// Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any
// multi-key index prefixed by shard key cannot be multikey over the shard key fields.
const IndexDescriptor* idx =
@@ -761,13 +790,29 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
// We can afford to yield here because any change to the base data that we might miss is already
// being queued and will migrate in the 'transferMods' stage.
- auto exec = InternalPlanner::indexScan(opCtx,
- collection,
- idx,
- min,
- max,
- BoundInclusion::kIncludeStartKeyOnly,
- PlanExecutor::YIELD_AUTO);
+ return InternalPlanner::indexScan(opCtx,
+ collection,
+ idx,
+ min,
+ max,
+ BoundInclusion::kIncludeStartKeyOnly,
+ PlanExecutor::YIELD_AUTO);
+}
+
+Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) {
+ AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
+
+ Collection* const collection = autoColl.getCollection();
+ if (!collection) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "Collection " << _args.getNss().ns() << " does not exist."};
+ }
+
+ auto swExec = _getIndexScanExecutor(opCtx, collection);
+ if (!swExec.isOK()) {
+ return swExec.getStatus();
+ }
+ auto exec = std::move(swExec.getValue());
// Use the average object size to estimate how many objects a full chunk would carry do that
// while traversing the chunk's range using the sharding index, below there's a fair amount of
@@ -812,8 +857,11 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
if (++recCount > maxRecsWhenFull) {
isLargeChunk = true;
- // Continue on despite knowing that it will fail, just to get the correct value for
- // recCount
+
+ if (_forceJumbo) {
+ _cloneLocs.clear();
+ break;
+ }
}
}
@@ -898,6 +946,96 @@ long long MigrationChunkClonerSourceLegacy::_xferUpdates(OperationContext* opCtx
return totalSize;
}
+Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationContext* opCtx,
+ Milliseconds maxTimeToWait) {
+ const auto startTime = Date_t::now();
+ int iteration = 0;
+ while ((Date_t::now() - startTime) < maxTimeToWait) {
+ auto responseStatus = _callRecipient(
+ createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true));
+ if (!responseStatus.isOK()) {
+ return responseStatus.getStatus().withContext(
+ "Failed to contact recipient shard to monitor data transfer");
+ }
+
+ const BSONObj& res = responseStatus.getValue();
+ if (!res["waited"].boolean()) {
+ sleepmillis(1LL << std::min(iteration, 10));
+ }
+ iteration++;
+
+ stdx::lock_guard<Latch> sl(_mutex);
+
+ const std::size_t cloneLocsRemaining = _cloneLocs.size();
+
+ if (_forceJumbo && _jumboChunkCloneState) {
+ log() << "moveChunk data transfer progress: " << redact(res)
+ << " mem used: " << _memoryUsed
+ << " documents cloned so far: " << _jumboChunkCloneState->docsCloned;
+ } else {
+ log() << "moveChunk data transfer progress: " << redact(res)
+ << " mem used: " << _memoryUsed
+ << " documents remaining to clone: " << cloneLocsRemaining;
+ }
+
+ if (res["state"].String() == "steady") {
+ if (cloneLocsRemaining != 0 ||
+ (_jumboChunkCloneState && _forceJumbo &&
+ PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) {
+ return {ErrorCodes::OperationIncomplete,
+ str::stream() << "Unable to enter critical section because the recipient "
+ "shard thinks all data is cloned while there are still "
+ "documents remaining"};
+ }
+
+ return Status::OK();
+ }
+
+ if (res["state"].String() == "fail") {
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "Data transfer error: " << res["errmsg"].str()};
+ }
+
+ auto migrationSessionIdStatus = MigrationSessionId::extractFromBSON(res);
+ if (!migrationSessionIdStatus.isOK()) {
+ return {ErrorCodes::OperationIncomplete,
+ str::stream() << "Unable to retrieve the id of the migration session due to "
+ << migrationSessionIdStatus.getStatus().toString()};
+ }
+
+ if (res["ns"].str() != _args.getNss().ns() ||
+ (res.hasField("fromShardId")
+ ? (res["fromShardId"].str() != _args.getFromShardId().toString())
+ : (res["from"].str() != _donorConnStr.toString())) ||
+ !res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 ||
+ !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 ||
+ !_sessionId.matches(migrationSessionIdStatus.getValue())) {
+ // This can happen when the destination aborted the migration and received another
+ // recvChunk before this thread sees the transition to the abort state. This is
+ // currently possible only if multiple migrations are happening at once. This is an
+ // unfortunate consequence of the shards not being able to keep track of multiple
+ // incoming and outgoing migrations.
+ return {ErrorCodes::OperationIncomplete,
+ "Destination shard aborted migration because a new one is running"};
+ }
+
+ if (_args.getForceJumbo() != MoveChunkRequest::ForceJumbo::kForceManual &&
+ (_memoryUsed > 500 * 1024 * 1024 ||
+ (_jumboChunkCloneState && MONGO_unlikely(failTooMuchMemoryUsed.shouldFail())))) {
+ // This is too much memory for us to use so we're going to abort the migration
+ return {ErrorCodes::ExceededMemoryLimit,
+ "Aborting migration because of high memory usage"};
+ }
+
+ Status interruptStatus = opCtx->checkForInterruptNoAssert();
+ if (!interruptStatus.isOK()) {
+ return interruptStatus;
+ }
+ }
+
+ return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"};
+}
+
boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(
OperationContext* opCtx, BSONArrayBuilder* arrBuilder) {
if (!_sessionCatalogSource) {
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 8e5809093af..2653f401ef1 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -36,6 +36,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/client/connection_string.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_session_id.h"
@@ -220,6 +221,17 @@ private:
*/
StatusWith<BSONObj> _callRecipient(const BSONObj& cmdObj);
+ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getIndexScanExecutor(
+ OperationContext* opCtx, Collection* const collection);
+
+ void _nextCloneBatchFromIndexScan(OperationContext* opCtx,
+ Collection* collection,
+ BSONArrayBuilder* arrBuilder);
+
+ void _nextCloneBatchFromCloneLocs(OperationContext* opCtx,
+ Collection* collection,
+ BSONArrayBuilder* arrBuilder);
+
/**
* Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs (to avoid
* seeking disk later).
@@ -307,6 +319,12 @@ private:
std::list<BSONObj>* updateList,
long long initialSize);
+ /**
+ * Sends _recvChunkStatus to the recipient shard until it receives 'steady' from the recipient,
+ * an error has occurred, or a timeout is hit.
+ */
+ Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait);
+
// The original move chunk request
const MoveChunkRequest _args;
@@ -356,6 +374,27 @@ private:
// Total bytes in _reload + _deleted (xfer mods)
uint64_t _memoryUsed{0};
+
+ // False if the move chunk request specified ForceJumbo::kDoNotForce, true otherwise.
+ const bool _forceJumbo;
+
+ struct JumboChunkCloneState {
+ // Plan executor for collection scan used to clone docs.
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> clonerExec;
+
+ // The current state of 'clonerExec'.
+ PlanExecutor::ExecState clonerState;
+
+ // RecordId of the last doc read in by 'clonerExec' if collection scan yields during
+ // cloning.
+ boost::optional<RecordId> stashedRecordId;
+
+ // Number docs in jumbo chunk cloned so far
+ int docsCloned = 0;
+ };
+
+ // Set only once its discovered a chunk is jumbo
+ boost::optional<JumboChunkCloneState> _jumboChunkCloneState;
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 678557c5a1e..c117aa5b26a 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -154,7 +154,8 @@ protected:
chunkRange,
1024 * 1024,
MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault),
- false);
+ false,
+ MoveChunkRequest::ForceJumbo::kDoNotForce);
return assertGet(MoveChunkRequest::createFromCommand(kNss, cmdBuilder.obj()));
}