summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2021-12-07 15:16:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-07 15:40:12 +0000
commitf2b86c04cffb7124bafa50f363ccb260ebd0b854 (patch)
tree4f85292b10ef3b6ab291ef07919c06d3414c689b
parent5117fe4c17287501311df53d66335026e04a032e (diff)
downloadmongo-f2b86c04cffb7124bafa50f363ccb260ebd0b854.tar.gz
SERVER-59664 Implement merge chunks routine Phase I
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml3
-rw-r--r--jstests/sharding/balancer_defragmentation_merge_chunks.js41
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp35
-rw-r--r--src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp9
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp128
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h11
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp123
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp4
8 files changed, 302 insertions, 52 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
index 3ccdc5ab0af..8fa06d5ae10 100644
--- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
@@ -235,6 +235,9 @@ selector:
# SERVER-51805 splitChunk op is not idempotent
- jstests/sharding/mongos_get_shard_version.js
+ # SERVER-61940
+ - jstests/sharding/balancer_defragmentation_merge_chunks.js
+
exclude_with_any_tags:
- does_not_support_stepdowns
diff --git a/jstests/sharding/balancer_defragmentation_merge_chunks.js b/jstests/sharding/balancer_defragmentation_merge_chunks.js
index 23a7c49273f..2aeeb64e7fb 100644
--- a/jstests/sharding/balancer_defragmentation_merge_chunks.js
+++ b/jstests/sharding/balancer_defragmentation_merge_chunks.js
@@ -12,9 +12,18 @@
load("jstests/libs/fail_point_util.js");
load('jstests/sharding/autosplit_include.js');
+load("jstests/sharding/libs/find_chunks_util.js");
-var st = new ShardingTest(
- {mongos: 1, shards: 3, config: 1, other: {enableBalancer: true, enableAutoSplit: true}});
+var st = new ShardingTest({
+ mongos: 1,
+ shards: 3,
+ config: 1,
+ other: {
+ enableBalancer: true,
+ enableAutoSplit: true,
+ configOptions: {setParameter: {logComponentVerbosity: tojson({sharding: {verbosity: 2}})}},
+ }
+});
// setup the database for the test
assert.commandWorked(st.s.adminCommand({enableSharding: 'db'}));
@@ -23,27 +32,32 @@ var coll = db['test'];
var fullNs = coll.getFullName();
var configPrimary = st.configRS.getPrimary();
-const defaultChunkSize = 2 * 1024 * 1024;
+const defaultChunkSize = 2;
const bigString = "X".repeat(32 * 1024); // 32 KB
assert.commandWorked(st.s.adminCommand({shardCollection: fullNs, key: {key: 1}}));
+// TODO (SERVER-61848) remove this once the chunk size setting works
+let configDB = st.s.getDB('config');
+assert.commandWorked(configDB["settings"].insertOne({_id: "chunksize", value: 1}));
+
var bulk = coll.initializeUnorderedBulkOp();
-for (let i = 0; i < 32 * 128; i++) {
+for (let i = 0; i < 12 * 128; i++) {
bulk.insert({key: i, str: bigString});
}
assert.commandWorked(bulk.execute());
waitForOngoingChunkSplits(st);
+const numChunksPrev = findChunksUtil.countChunksForNs(st.config, fullNs);
+jsTest.log("Number of chunks before merging " + numChunksPrev);
jsTest.log("Balance cluster before beginning defragmentation");
function waitForBalanced() {
assert.soon(function() {
st.awaitBalancerRound();
- balancerStatus =
+ var balancerStatus =
assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs}));
return balancerStatus.balancerCompliant;
});
- jsTest.log("Balancer status of " + fullNs + " : \n" + tojson(balancerStatus));
}
waitForBalanced();
@@ -127,14 +141,27 @@ jsTest.log("Begin defragmentation with balancer off, end with it on");
jsTest.log("Balancer on, begin defragmentation and let it complete");
{
+ // Reset collection before starting
st.startBalancer();
+ waitForBalanced();
+ const numChunksPrev = findChunksUtil.countChunksForNs(st.config, fullNs);
+ jsTest.log("Number of chunks before merging " + numChunksPrev);
assert.commandWorked(st.s.adminCommand({
configureCollectionAutoSplitter: fullNs,
enableAutoSplitter: false,
balancerShouldMergeChunks: true,
defaultChunkSize: defaultChunkSize,
}));
- waitForBalanced();
+ assert.soon(function() {
+ st.awaitBalancerRound();
+ var balancerStatus =
+ assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs}));
+ return balancerStatus.firstComplianceViolation != 'chunksMerging';
+ });
+ st.stopBalancer();
+ const numChunksPost = findChunksUtil.countChunksForNs(st.config, fullNs);
+ jsTest.log("Number of chunks after merging " + numChunksPost);
+ assert.lt(numChunksPost, numChunksPrev);
}
st.stop();
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index 57dc28380f2..ba6bfc7739d 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -385,18 +385,23 @@ void Balancer::_consumeActionStreamLoop() {
stdx::visit(
visit_helper::Overloaded{
[&](MergeInfo mergeAction) {
- auto result = _commandScheduler
- ->requestMergeChunks(opCtx.get(),
- mergeAction.nss,
- mergeAction.shardId,
- mergeAction.chunkRange,
- mergeAction.collectionVersion)
- .thenRunOn(*executor)
- .onCompletion([this, mergeAction](const Status& status) {
- auto opCtx = cc().makeOperationContext();
- _defragmentationPolicy->acknowledgeMergeResult(
- opCtx.get(), mergeAction, status);
- });
+ auto result =
+ _commandScheduler
+ ->requestMergeChunks(opCtx.get(),
+ mergeAction.nss,
+ mergeAction.shardId,
+ mergeAction.chunkRange,
+ mergeAction.collectionVersion)
+ .thenRunOn(*executor)
+ .onCompletion([this, mergeAction](const Status& status) {
+ // TODO (SERVER-61880) Remove this ThreadClient
+ ThreadClient tc(
+ "BalancerDefragmentationPolicy::acknowledgeMergeResult",
+ getGlobalServiceContext());
+ auto opCtx = tc->makeOperationContext();
+ _defragmentationPolicy->acknowledgeMergeResult(
+ opCtx.get(), mergeAction, status);
+ });
},
[&](DataSizeInfo dataSizeAction) {
auto result =
@@ -411,7 +416,11 @@ void Balancer::_consumeActionStreamLoop() {
.thenRunOn(*executor)
.onCompletion([this, dataSizeAction](
const StatusWith<DataSizeResponse>& swDataSize) {
- auto opCtx = cc().makeOperationContext();
+ // TODO (SERVER-61880) Remove this ThreadClient
+ ThreadClient tc(
+ "BalancerDefragmentationPolicy::acknowledgeDataSizeResult",
+ getGlobalServiceContext());
+ auto opCtx = tc->makeOperationContext();
_defragmentationPolicy->acknowledgeDataSizeResult(
opCtx.get(), dataSizeAction, swDataSize);
});
diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
index 7bffc116fb3..30cf39e0692 100644
--- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
@@ -347,15 +347,16 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo
for (const auto& coll : collections) {
const NamespaceString& nss(coll.getNss());
- if (!coll.getAllowBalance() || !coll.getAllowMigrations() || !coll.getPermitMigrations()) {
- LOGV2_DEBUG(21851,
+ if (!coll.getAllowBalance() || !coll.getAllowMigrations() || !coll.getPermitMigrations() ||
+ coll.getBalancerShouldMergeChunks()) {
+ LOGV2_DEBUG(5966401,
1,
- "Not balancing collection {namespace}; explicitly disabled.",
"Not balancing explicitly disabled collection",
"namespace"_attr = nss,
"allowBalance"_attr = coll.getAllowBalance(),
"allowMigrations"_attr = coll.getAllowMigrations(),
- "timeseriesFields"_attr = coll.getTimeseriesFields());
+ "permitMigrations"_attr = coll.getPermitMigrations(),
+ "balancerShouldMergeChunks"_attr = coll.getBalancerShouldMergeChunks());
continue;
}
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
index 00b825de023..84b1688c90d 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
@@ -99,8 +99,8 @@ bool BalancerDefragmentationPolicyImpl::_queueNextAction(
// get next action within the current phase
switch (collectionData.phase) {
case DefragmentationPhaseEnum::kMergeChunks:
- if (auto mergeAction = _getCollectionMergeAction(collectionData)) {
- collectionData.queuedActions.push(*mergeAction);
+ if (auto phase1Action = _getCollectionPhase1Action(opCtx, uuid, collectionData)) {
+ collectionData.queuedActions.push(*phase1Action);
return true;
}
break;
@@ -121,6 +121,88 @@ bool BalancerDefragmentationPolicyImpl::_queueNextAction(
return false;
}
+ChunkVersion _getShardVersion(OperationContext* opCtx, const ShardId& shardId, const UUID& uuid) {
+ auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, uuid);
+
+ auto chunkVector =
+ Grid::get(opCtx)
+ ->catalogClient()
+ ->getChunks(opCtx,
+ BSON(ChunkType::collectionUUID()
+ << coll.getUuid() << ChunkType::shard(shardId.toString())) /*query*/,
+ BSON(ChunkType::lastmod << -1) /*sort*/,
+ 1 /*limit*/,
+ nullptr /*opTime*/,
+ coll.getEpoch(),
+ coll.getTimestamp(),
+ repl::ReadConcernLevel::kLocalReadConcern,
+ boost::none)
+ .getValue();
+ return chunkVector.front().getVersion();
+}
+
+boost::optional<DefragmentationAction>
+BalancerDefragmentationPolicyImpl::_getCollectionPhase1Action(
+ OperationContext* opCtx, const UUID& uuid, CollectionDefragmentationState& collectionInfo) {
+ auto isConsecutive = [&](const ChunkType& firstChunk, const ChunkType& secondChunk) -> bool {
+ return SimpleBSONObjComparator::kInstance.evaluate(firstChunk.getMax() ==
+ secondChunk.getMin()) &&
+ collectionInfo.zones.getZoneForChunk(firstChunk.getRange()) ==
+ collectionInfo.zones.getZoneForChunk(secondChunk.getRange());
+ };
+
+ auto getActionFromRange =
+ [&](std::vector<ChunkType>& chunks) -> boost::optional<DefragmentationAction> {
+ ChunkVersion shardVersion = _getShardVersion(opCtx, chunks.front().getShard(), uuid);
+ if (chunks.size() == 1) {
+ auto currentChunk = chunks.front();
+ if (currentChunk.getEstimatedSizeBytes()) {
+ return boost::none;
+ } else {
+ return boost::optional<DefragmentationAction>(
+ DataSizeInfo(currentChunk.getShard(),
+ collectionInfo.nss,
+ uuid,
+ currentChunk.getRange(),
+ shardVersion,
+ collectionInfo.collectionShardKey,
+ false));
+ }
+ } else {
+ return boost::optional<DefragmentationAction>(
+ MergeInfo(chunks.front().getShard(),
+ collectionInfo.nss,
+ uuid,
+ shardVersion,
+ ChunkRange(chunks.front().getMin(), chunks.back().getMax())));
+ }
+ };
+
+ while (collectionInfo.chunkList.size() > 0) {
+ auto& currentChunk = collectionInfo.chunkList.back();
+ auto& currentMergeList = collectionInfo.shardToChunkMap[currentChunk.getShard()];
+ boost::optional<DefragmentationAction> nextAction = boost::none;
+ if (!currentMergeList.empty() && !isConsecutive(currentMergeList.back(), currentChunk)) {
+ nextAction = getActionFromRange(currentMergeList);
+ currentMergeList.clear();
+ }
+ currentMergeList.push_back(std::move(currentChunk));
+ collectionInfo.chunkList.pop_back();
+ if (nextAction) {
+ return nextAction;
+ }
+ }
+ auto it = collectionInfo.shardToChunkMap.begin();
+ if (it != collectionInfo.shardToChunkMap.end()) {
+ boost::optional<DefragmentationAction> nextAction = getActionFromRange(it->second);
+ collectionInfo.shardToChunkMap.erase(it);
+ if (nextAction) {
+ return nextAction;
+ }
+ }
+ return boost::none;
+}
+
void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext* opCtx,
MergeInfo action,
const Status& result) {
@@ -137,10 +219,15 @@ void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext*
action.nss,
action.uuid,
action.chunkRange,
- action.collectionVersion,
+ _getShardVersion(opCtx, action.shardId, action.uuid),
_defragmentationStates.at(action.uuid).collectionShardKey,
false))
- : boost::optional<DefragmentationAction>(action);
+ : boost::optional<DefragmentationAction>(
+ MergeInfo(action.shardId,
+ action.nss,
+ action.uuid,
+ _getShardVersion(opCtx, action.shardId, action.uuid),
+ action.chunkRange));
_processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace);
}
@@ -160,8 +247,16 @@ void BalancerDefragmentationPolicyImpl::acknowledgeDataSizeResult(
result.getValue().sizeBytes,
ShardingCatalogClient::kMajorityWriteConcern);
}
- boost::optional<DefragmentationAction> nextActionOnNamespace =
- result.isOK() ? boost::none : boost::optional<DefragmentationAction>(action);
+ boost::optional<DefragmentationAction> nextActionOnNamespace = result.isOK()
+ ? boost::none
+ : boost::optional<DefragmentationAction>(
+ DataSizeInfo(action.shardId,
+ action.nss,
+ action.uuid,
+ action.chunkRange,
+ _getShardVersion(opCtx, action.shardId, action.uuid),
+ action.keyPattern,
+ false));
_processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace);
}
@@ -223,11 +318,11 @@ void BalancerDefragmentationPolicyImpl::_processEndOfAction(
// If the end of the current action implies a next step, store it
if (nextActionOnNamespace) {
_defragmentationStates.at(uuid).queuedActions.push(*nextActionOnNamespace);
+ } else {
+ // Load next action, this will trigger phase change if needed
+ _queueNextAction(opCtx, uuid, _defragmentationStates[uuid]);
}
- // Load next action, this will trigger phase change if needed
- _queueNextAction(opCtx, uuid, _defragmentationStates[uuid]);
-
// Fulfill promise if needed
if (_nextStreamingActionPromise) {
auto nextStreamingAction = _nextStreamingAction(opCtx);
@@ -279,6 +374,21 @@ void BalancerDefragmentationPolicyImpl::_initializeCollectionState(WithLock,
newState.phase = coll.getDefragmentationPhase() ? coll.getDefragmentationPhase().get()
: DefragmentationPhaseEnum::kMergeChunks;
newState.collectionShardKey = coll.getKeyPattern().toBSON();
+ newState.chunkList =
+ Grid::get(opCtx)
+ ->catalogClient()
+ ->getChunks(opCtx,
+ BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/,
+ BSON(ChunkType::max() << -1) /*sort*/,
+ boost::none /*limit*/,
+ nullptr /*opTime*/,
+ coll.getEpoch(),
+ coll.getTimestamp(),
+ repl::ReadConcernLevel::kLocalReadConcern,
+ boost::none)
+ .getValue();
+ uassertStatusOK(ZoneInfo::addTagsFromCatalog(
+ opCtx, coll.getNss(), coll.getKeyPattern(), newState.zones));
_persistPhaseUpdate(opCtx, newState.phase, coll.getUuid());
auto [_, inserted] =
_defragmentationStates.insert_or_assign(coll.getUuid(), std::move(newState));
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
index 6adc0d8734f..bb7a148a3b4 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
@@ -89,6 +89,7 @@ private:
BSONObj collectionShardKey;
std::queue<DefragmentationAction> queuedActions;
unsigned outstandingActions{0};
+ ShardToChunksMap shardToChunkMap;
std::vector<ChunkType> chunkList;
ZoneInfo zones;
};
@@ -112,13 +113,11 @@ private:
CollectionDefragmentationState& collectionData);
/**
- * Returns next phase 1 merge action for the collection if there is one and boost::none
- * otherwise.
+ * Returns next phase 1 merge or datasize action for the collection if there is one and
+ * boost::none otherwise.
*/
- boost::optional<MergeInfo> _getCollectionMergeAction(
- CollectionDefragmentationState& collectionInfo) {
- return boost::none;
- }
+ boost::optional<DefragmentationAction> _getCollectionPhase1Action(
+ OperationContext* opCtx, const UUID& uuid, CollectionDefragmentationState& collectionInfo);
/**
* Returns next phase 3 split action for the collection if there is one and boost::none
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
index 196736e38be..3df05d8cfe6 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
@@ -39,12 +39,19 @@ class BalancerDefragmentationPolicyTest : public ConfigServerTestFixture {
protected:
const NamespaceString kNss{"testDb.testColl"};
const UUID kUuid = UUID::gen();
- const ShardId kShardId = ShardId("testShard");
+ const ShardId kShardId0 = ShardId("shard0");
+ const ShardId kShardId1 = ShardId("shard1");
const ChunkVersion kCollectionVersion = ChunkVersion(1, 1, OID::gen(), Timestamp(10));
const KeyPattern kShardKeyPattern = KeyPattern(BSON("x" << 1));
const BSONObj kMinKey = BSON("x" << 0);
const BSONObj kMaxKey = BSON("x" << 10);
const long long kMaxChunkSizeBytes{2048};
+ const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345);
+ const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346);
+
+ const std::vector<ShardType> kShardList{
+ ShardType(kShardId0.toString(), kShardHost0.toString()),
+ ShardType(kShardId1.toString(), kShardHost1.toString())};
BalancerDefragmentationPolicyTest()
: _random(std::random_device{}()),
@@ -60,8 +67,22 @@ protected:
return shardedCollection;
}
+ CollectionType setupCollectionForPhase1(std::vector<ChunkType>& chunkList) {
+ setupShards(kShardList);
+ setupCollection(kNss, kShardKeyPattern, chunkList);
+ ASSERT_OK(updateToConfigCollection(
+ operationContext(),
+ CollectionType::ConfigNS,
+ BSON(CollectionType::kUuidFieldName << kUuid),
+ BSON("$set" << BSON(CollectionType::kBalancerShouldMergeChunksFieldName << true)),
+ false));
+ return Grid::get(operationContext())
+ ->catalogClient()
+ ->getCollection(operationContext(), kUuid);
+ }
+
void makeConfigChunkEntry() {
- ChunkType chunk(kUuid, ChunkRange(kMinKey, kMaxKey), kCollectionVersion, kShardId);
+ ChunkType chunk(kUuid, ChunkRange(kMinKey, kMaxKey), kCollectionVersion, kShardId0);
ASSERT_OK(insertToConfigCollection(
operationContext(), ChunkType::ConfigNS, chunk.toConfigBSON()));
}
@@ -124,9 +145,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedMergeResult) {
auto coll = makeConfigCollectionEntry();
FailPointEnableBlock failpoint("skipPhaseTransition");
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ makeConfigChunkEntry();
auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto mergeInfo =
- MergeInfo(kShardId, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey));
+ MergeInfo(kShardId0, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey));
_defragmentationPolicy.acknowledgeMergeResult(
operationContext(),
mergeInfo,
@@ -141,9 +163,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedSplitVectorRespon
auto coll = makeConfigCollectionEntry();
FailPointEnableBlock failpoint("skipPhaseTransition");
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ makeConfigChunkEntry();
auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto splitVectorInfo = AutoSplitVectorInfo(
- kShardId, kNss, kUuid, kCollectionVersion, BSONObj(), kMinKey, kMaxKey, 120);
+ kShardId0, kNss, kUuid, kCollectionVersion, BSONObj(), kMinKey, kMaxKey, 120);
_defragmentationPolicy.acknowledgeAutoSplitVectorResult(
operationContext(),
splitVectorInfo,
@@ -157,9 +180,16 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedSplitAction) {
auto coll = makeConfigCollectionEntry();
FailPointEnableBlock failpoint("skipPhaseTransition");
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ makeConfigChunkEntry();
auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
- auto splitInfo = SplitInfoWithKeyPattern(
- kShardId, kNss, kCollectionVersion, kMinKey, kMaxKey, {}, kUuid, kShardKeyPattern.toBSON());
+ auto splitInfo = SplitInfoWithKeyPattern(kShardId0,
+ kNss,
+ kCollectionVersion,
+ kMinKey,
+ kMaxKey,
+ {},
+ kUuid,
+ kShardKeyPattern.toBSON());
_defragmentationPolicy.acknowledgeSplitResult(
operationContext(),
splitInfo,
@@ -173,8 +203,9 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedDataSizeAction) {
auto coll = makeConfigCollectionEntry();
FailPointEnableBlock failpoint("skipPhaseTransition");
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ makeConfigChunkEntry();
auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
- auto dataSizeInfo = DataSizeInfo(kShardId,
+ auto dataSizeInfo = DataSizeInfo(kShardId0,
kNss,
kUuid,
ChunkRange(kMinKey, kMaxKey),
@@ -194,9 +225,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulMergeAction)
auto coll = makeConfigCollectionEntry();
FailPointEnableBlock failpoint("skipPhaseTransition");
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ makeConfigChunkEntry();
auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto mergeInfo =
- MergeInfo(kShardId, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey));
+ MergeInfo(kShardId0, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey));
_defragmentationPolicy.acknowledgeMergeResult(operationContext(), mergeInfo, Status::OK());
ASSERT_TRUE(future.isReady());
DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(future.get());
@@ -211,7 +243,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulAutoSplitVect
FailPointEnableBlock failpoint("skipPhaseTransition");
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
- auto splitVectorInfo = AutoSplitVectorInfo(kShardId,
+ auto splitVectorInfo = AutoSplitVectorInfo(kShardId0,
kNss,
kUuid,
kCollectionVersion,
@@ -235,7 +267,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulSplitAction)
FailPointEnableBlock failpoint("skipPhaseTransition");
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
- auto splitInfo = SplitInfoWithKeyPattern(kShardId,
+ auto splitInfo = SplitInfoWithKeyPattern(kShardId0,
kNss,
kCollectionVersion,
kMinKey,
@@ -253,7 +285,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulDataSizeActio
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
makeConfigChunkEntry();
auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
- auto dataSizeInfo = DataSizeInfo(kShardId,
+ auto dataSizeInfo = DataSizeInfo(kShardId0,
kNss,
kUuid,
ChunkRange(kMinKey, kMaxKey),
@@ -269,5 +301,74 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulDataSizeActio
ASSERT_EQ(configDoc.getIntField(ChunkType::estimatedSizeBytes.name()), 2000);
}
+TEST_F(BalancerDefragmentationPolicyTest, TestPhase1AllConsecutive) {
+ // Set up collection with all mergeable chunks
+ std::vector<ChunkType> chunkList;
+ for (int i = 0; i < 5; i++) {
+ ChunkType chunk(
+ kUuid,
+ ChunkRange(BSON("x" << i), BSON("x" << i + 1)),
+ ChunkVersion(1, i, kCollectionVersion.epoch(), kCollectionVersion.getTimestamp()),
+ kShardId0);
+ chunkList.push_back(chunk);
+ }
+ for (int i = 5; i < 10; i++) {
+ ChunkType chunk(
+ kUuid,
+ ChunkRange(BSON("x" << i), BSON("x" << i + 1)),
+ ChunkVersion(1, i, kCollectionVersion.epoch(), kCollectionVersion.getTimestamp()),
+ kShardId1);
+ chunkList.push_back(chunk);
+ }
+ auto coll = setupCollectionForPhase1(chunkList);
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ // Test
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_TRUE(future.isReady());
+ MergeInfo mergeAction = stdx::get<MergeInfo>(future.get());
+ ASSERT_BSONOBJ_EQ(mergeAction.chunkRange.getMin(), BSON("x" << 0));
+ ASSERT_BSONOBJ_EQ(mergeAction.chunkRange.getMax(), BSON("x" << 5));
+ auto future2 = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_TRUE(future2.isReady());
+ MergeInfo mergeAction2 = stdx::get<MergeInfo>(future2.get());
+ ASSERT_BSONOBJ_EQ(mergeAction2.chunkRange.getMin(), BSON("x" << 5));
+ ASSERT_BSONOBJ_EQ(mergeAction2.chunkRange.getMax(), BSON("x" << 10));
+ auto future3 = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_FALSE(future3.isReady());
+}
+
+TEST_F(BalancerDefragmentationPolicyTest, Phase1NotConsecutive) {
+ std::vector<ChunkType> chunkList;
+ for (int i = 0; i < 10; i++) {
+ ShardId chosenShard = (i == 5) ? kShardId1 : kShardId0;
+ ChunkType chunk(
+ kUuid,
+ ChunkRange(BSON("x" << i), BSON("x" << i + 1)),
+ ChunkVersion(1, i, kCollectionVersion.epoch(), kCollectionVersion.getTimestamp()),
+ chosenShard);
+ chunkList.push_back(chunk);
+ }
+ auto coll = setupCollectionForPhase1(chunkList);
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ // Test
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_TRUE(future.isReady());
+ MergeInfo mergeAction = stdx::get<MergeInfo>(future.get());
+ ASSERT_BSONOBJ_EQ(mergeAction.chunkRange.getMin(), BSON("x" << 0));
+ ASSERT_BSONOBJ_EQ(mergeAction.chunkRange.getMax(), BSON("x" << 5));
+ auto future2 = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_TRUE(future2.isReady());
+ MergeInfo mergeAction2 = stdx::get<MergeInfo>(future2.get());
+ ASSERT_BSONOBJ_EQ(mergeAction2.chunkRange.getMin(), BSON("x" << 6));
+ ASSERT_BSONOBJ_EQ(mergeAction2.chunkRange.getMax(), BSON("x" << 10));
+ auto future3 = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_TRUE(future3.isReady());
+ DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(future3.get());
+ ASSERT_BSONOBJ_EQ(dataSizeAction.chunkRange.getMin(), BSON("x" << 5));
+ ASSERT_BSONOBJ_EQ(dataSizeAction.chunkRange.getMax(), BSON("x" << 6));
+ auto future4 = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_FALSE(future4.isReady());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index 1d038352e3f..5ece6c49dec 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -600,10 +600,10 @@ void ShardingCatalogManager::configureCollectionAutoSplit(
std::set<ShardId> shardsIds;
cm.getAllShardIds(&shardsIds);
+ const auto update = updateCmd.obj();
+
withTransaction(
opCtx, CollectionType::ConfigNS, [&](OperationContext* opCtx, TxnNumber txnNumber) {
- const auto update = updateCmd.obj();
-
const auto query = BSON(CollectionType::kNssFieldName
<< nss.ns() << CollectionType::kUuidFieldName << uuid);
const auto res = writeToConfigDocumentInTxn(