diff options
author | Allison Easton <allison.easton@mongodb.com> | 2021-12-07 15:16:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-07 15:40:12 +0000 |
commit | f2b86c04cffb7124bafa50f363ccb260ebd0b854 (patch) | |
tree | 4f85292b10ef3b6ab291ef07919c06d3414c689b | |
parent | 5117fe4c17287501311df53d66335026e04a032e (diff) | |
download | mongo-f2b86c04cffb7124bafa50f363ccb260ebd0b854.tar.gz |
SERVER-59664 Implement merge chunks routine Phase I
8 files changed, 302 insertions, 52 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index 3ccdc5ab0af..8fa06d5ae10 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -235,6 +235,9 @@ selector: # SERVER-51805 splitChunk op is not idempotent - jstests/sharding/mongos_get_shard_version.js + # SERVER-61940 + - jstests/sharding/balancer_defragmentation_merge_chunks.js + exclude_with_any_tags: - does_not_support_stepdowns diff --git a/jstests/sharding/balancer_defragmentation_merge_chunks.js b/jstests/sharding/balancer_defragmentation_merge_chunks.js index 23a7c49273f..2aeeb64e7fb 100644 --- a/jstests/sharding/balancer_defragmentation_merge_chunks.js +++ b/jstests/sharding/balancer_defragmentation_merge_chunks.js @@ -12,9 +12,18 @@ load("jstests/libs/fail_point_util.js"); load('jstests/sharding/autosplit_include.js'); +load("jstests/sharding/libs/find_chunks_util.js"); -var st = new ShardingTest( - {mongos: 1, shards: 3, config: 1, other: {enableBalancer: true, enableAutoSplit: true}}); +var st = new ShardingTest({ + mongos: 1, + shards: 3, + config: 1, + other: { + enableBalancer: true, + enableAutoSplit: true, + configOptions: {setParameter: {logComponentVerbosity: tojson({sharding: {verbosity: 2}})}}, + } +}); // setup the database for the test assert.commandWorked(st.s.adminCommand({enableSharding: 'db'})); @@ -23,27 +32,32 @@ var coll = db['test']; var fullNs = coll.getFullName(); var configPrimary = st.configRS.getPrimary(); -const defaultChunkSize = 2 * 1024 * 1024; +const defaultChunkSize = 2; const bigString = "X".repeat(32 * 1024); // 32 KB assert.commandWorked(st.s.adminCommand({shardCollection: fullNs, key: {key: 1}})); +// TODO (SERVER-61848) remove this once the chunk size setting works +let configDB = st.s.getDB('config'); +assert.commandWorked(configDB["settings"].insertOne({_id: "chunksize", value: 1})); + var bulk = coll.initializeUnorderedBulkOp(); -for (let i = 0; i < 32 * 128; i++) { +for (let i = 0; i < 12 * 128; i++) { bulk.insert({key: i, str: bigString}); } assert.commandWorked(bulk.execute()); waitForOngoingChunkSplits(st); +const numChunksPrev = findChunksUtil.countChunksForNs(st.config, fullNs); +jsTest.log("Number of chunks before merging " + numChunksPrev); jsTest.log("Balance cluster before beginning defragmentation"); function waitForBalanced() { assert.soon(function() { st.awaitBalancerRound(); - balancerStatus = + var balancerStatus = assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs})); return balancerStatus.balancerCompliant; }); - jsTest.log("Balancer status of " + fullNs + " : \n" + tojson(balancerStatus)); } waitForBalanced(); @@ -127,14 +141,27 @@ jsTest.log("Begin defragmentation with balancer off, end with it on"); jsTest.log("Balancer on, begin defragmentation and let it complete"); { + // Reset collection before starting st.startBalancer(); + waitForBalanced(); + const numChunksPrev = findChunksUtil.countChunksForNs(st.config, fullNs); + jsTest.log("Number of chunks before merging " + numChunksPrev); assert.commandWorked(st.s.adminCommand({ configureCollectionAutoSplitter: fullNs, enableAutoSplitter: false, balancerShouldMergeChunks: true, defaultChunkSize: defaultChunkSize, })); - waitForBalanced(); + assert.soon(function() { + st.awaitBalancerRound(); + var balancerStatus = + assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs})); + return balancerStatus.firstComplianceViolation != 'chunksMerging'; + }); + st.stopBalancer(); + const numChunksPost = findChunksUtil.countChunksForNs(st.config, fullNs); + jsTest.log("Number of chunks after merging " + numChunksPost); + assert.lt(numChunksPost, numChunksPrev); } st.stop(); diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 57dc28380f2..ba6bfc7739d 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -385,18 +385,23 @@ void Balancer::_consumeActionStreamLoop() { stdx::visit( visit_helper::Overloaded{ [&](MergeInfo mergeAction) { - auto result = _commandScheduler - ->requestMergeChunks(opCtx.get(), - mergeAction.nss, - mergeAction.shardId, - mergeAction.chunkRange, - mergeAction.collectionVersion) - .thenRunOn(*executor) - .onCompletion([this, mergeAction](const Status& status) { - auto opCtx = cc().makeOperationContext(); - _defragmentationPolicy->acknowledgeMergeResult( - opCtx.get(), mergeAction, status); - }); + auto result = + _commandScheduler + ->requestMergeChunks(opCtx.get(), + mergeAction.nss, + mergeAction.shardId, + mergeAction.chunkRange, + mergeAction.collectionVersion) + .thenRunOn(*executor) + .onCompletion([this, mergeAction](const Status& status) { + // TODO (SERVER-61880) Remove this ThreadClient + ThreadClient tc( + "BalancerDefragmentationPolicy::acknowledgeMergeResult", + getGlobalServiceContext()); + auto opCtx = tc->makeOperationContext(); + _defragmentationPolicy->acknowledgeMergeResult( + opCtx.get(), mergeAction, status); + }); }, [&](DataSizeInfo dataSizeAction) { auto result = @@ -411,7 +416,11 @@ void Balancer::_consumeActionStreamLoop() { .thenRunOn(*executor) .onCompletion([this, dataSizeAction]( const StatusWith<DataSizeResponse>& swDataSize) { - auto opCtx = cc().makeOperationContext(); + // TODO (SERVER-61880) Remove this ThreadClient + ThreadClient tc( + "BalancerDefragmentationPolicy::acknowledgeDataSizeResult", + getGlobalServiceContext()); + auto opCtx = tc->makeOperationContext(); _defragmentationPolicy->acknowledgeDataSizeResult( opCtx.get(), dataSizeAction, swDataSize); }); diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index 7bffc116fb3..30cf39e0692 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -347,15 +347,16 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo for (const auto& coll : collections) { const NamespaceString& nss(coll.getNss()); - if (!coll.getAllowBalance() || !coll.getAllowMigrations() || !coll.getPermitMigrations()) { - LOGV2_DEBUG(21851, + if (!coll.getAllowBalance() || !coll.getAllowMigrations() || !coll.getPermitMigrations() || + coll.getBalancerShouldMergeChunks()) { + LOGV2_DEBUG(5966401, 1, - "Not balancing collection {namespace}; explicitly disabled.", "Not balancing explicitly disabled collection", "namespace"_attr = nss, "allowBalance"_attr = coll.getAllowBalance(), "allowMigrations"_attr = coll.getAllowMigrations(), - "timeseriesFields"_attr = coll.getTimeseriesFields()); + "permitMigrations"_attr = coll.getPermitMigrations(), + "balancerShouldMergeChunks"_attr = coll.getBalancerShouldMergeChunks()); continue; } diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp index 00b825de023..84b1688c90d 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp @@ -99,8 +99,8 @@ bool BalancerDefragmentationPolicyImpl::_queueNextAction( // get next action within the current phase switch (collectionData.phase) { case DefragmentationPhaseEnum::kMergeChunks: - if (auto mergeAction = _getCollectionMergeAction(collectionData)) { - collectionData.queuedActions.push(*mergeAction); + if (auto phase1Action = _getCollectionPhase1Action(opCtx, uuid, collectionData)) { + collectionData.queuedActions.push(*phase1Action); return true; } break; @@ -121,6 +121,88 @@ bool BalancerDefragmentationPolicyImpl::_queueNextAction( return false; } +ChunkVersion _getShardVersion(OperationContext* opCtx, const ShardId& shardId, const UUID& uuid) { + auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, uuid); + + auto chunkVector = + Grid::get(opCtx) + ->catalogClient() + ->getChunks(opCtx, + BSON(ChunkType::collectionUUID() + << coll.getUuid() << ChunkType::shard(shardId.toString())) /*query*/, + BSON(ChunkType::lastmod << -1) /*sort*/, + 1 /*limit*/, + nullptr /*opTime*/, + coll.getEpoch(), + coll.getTimestamp(), + repl::ReadConcernLevel::kLocalReadConcern, + boost::none) + .getValue(); + return chunkVector.front().getVersion(); +} + +boost::optional<DefragmentationAction> +BalancerDefragmentationPolicyImpl::_getCollectionPhase1Action( + OperationContext* opCtx, const UUID& uuid, CollectionDefragmentationState& collectionInfo) { + auto isConsecutive = [&](const ChunkType& firstChunk, const ChunkType& secondChunk) -> bool { + return SimpleBSONObjComparator::kInstance.evaluate(firstChunk.getMax() == + secondChunk.getMin()) && + collectionInfo.zones.getZoneForChunk(firstChunk.getRange()) == + collectionInfo.zones.getZoneForChunk(secondChunk.getRange()); + }; + + auto getActionFromRange = + [&](std::vector<ChunkType>& chunks) -> boost::optional<DefragmentationAction> { + ChunkVersion shardVersion = _getShardVersion(opCtx, chunks.front().getShard(), uuid); + if (chunks.size() == 1) { + auto currentChunk = chunks.front(); + if (currentChunk.getEstimatedSizeBytes()) { + return boost::none; + } else { + return boost::optional<DefragmentationAction>( + DataSizeInfo(currentChunk.getShard(), + collectionInfo.nss, + uuid, + currentChunk.getRange(), + shardVersion, + collectionInfo.collectionShardKey, + false)); + } + } else { + return boost::optional<DefragmentationAction>( + MergeInfo(chunks.front().getShard(), + collectionInfo.nss, + uuid, + shardVersion, + ChunkRange(chunks.front().getMin(), chunks.back().getMax()))); + } + }; + + while (collectionInfo.chunkList.size() > 0) { + auto& currentChunk = collectionInfo.chunkList.back(); + auto& currentMergeList = collectionInfo.shardToChunkMap[currentChunk.getShard()]; + boost::optional<DefragmentationAction> nextAction = boost::none; + if (!currentMergeList.empty() && !isConsecutive(currentMergeList.back(), currentChunk)) { + nextAction = getActionFromRange(currentMergeList); + currentMergeList.clear(); + } + currentMergeList.push_back(std::move(currentChunk)); + collectionInfo.chunkList.pop_back(); + if (nextAction) { + return nextAction; + } + } + auto it = collectionInfo.shardToChunkMap.begin(); + if (it != collectionInfo.shardToChunkMap.end()) { + boost::optional<DefragmentationAction> nextAction = getActionFromRange(it->second); + collectionInfo.shardToChunkMap.erase(it); + if (nextAction) { + return nextAction; + } + } + return boost::none; +} + void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext* opCtx, MergeInfo action, const Status& result) { @@ -137,10 +219,15 @@ void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext* action.nss, action.uuid, action.chunkRange, - action.collectionVersion, + _getShardVersion(opCtx, action.shardId, action.uuid), _defragmentationStates.at(action.uuid).collectionShardKey, false)) - : boost::optional<DefragmentationAction>(action); + : boost::optional<DefragmentationAction>( + MergeInfo(action.shardId, + action.nss, + action.uuid, + _getShardVersion(opCtx, action.shardId, action.uuid), + action.chunkRange)); _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace); } @@ -160,8 +247,16 @@ void BalancerDefragmentationPolicyImpl::acknowledgeDataSizeResult( result.getValue().sizeBytes, ShardingCatalogClient::kMajorityWriteConcern); } - boost::optional<DefragmentationAction> nextActionOnNamespace = - result.isOK() ? boost::none : boost::optional<DefragmentationAction>(action); + boost::optional<DefragmentationAction> nextActionOnNamespace = result.isOK() + ? boost::none + : boost::optional<DefragmentationAction>( + DataSizeInfo(action.shardId, + action.nss, + action.uuid, + action.chunkRange, + _getShardVersion(opCtx, action.shardId, action.uuid), + action.keyPattern, + false)); _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace); } @@ -223,11 +318,11 @@ void BalancerDefragmentationPolicyImpl::_processEndOfAction( // If the end of the current action implies a next step, store it if (nextActionOnNamespace) { _defragmentationStates.at(uuid).queuedActions.push(*nextActionOnNamespace); + } else { + // Load next action, this will trigger phase change if needed + _queueNextAction(opCtx, uuid, _defragmentationStates[uuid]); } - // Load next action, this will trigger phase change if needed - _queueNextAction(opCtx, uuid, _defragmentationStates[uuid]); - // Fulfill promise if needed if (_nextStreamingActionPromise) { auto nextStreamingAction = _nextStreamingAction(opCtx); @@ -279,6 +374,21 @@ void BalancerDefragmentationPolicyImpl::_initializeCollectionState(WithLock, newState.phase = coll.getDefragmentationPhase() ? coll.getDefragmentationPhase().get() : DefragmentationPhaseEnum::kMergeChunks; newState.collectionShardKey = coll.getKeyPattern().toBSON(); + newState.chunkList = + Grid::get(opCtx) + ->catalogClient() + ->getChunks(opCtx, + BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/, + BSON(ChunkType::max() << -1) /*sort*/, + boost::none /*limit*/, + nullptr /*opTime*/, + coll.getEpoch(), + coll.getTimestamp(), + repl::ReadConcernLevel::kLocalReadConcern, + boost::none) + .getValue(); + uassertStatusOK(ZoneInfo::addTagsFromCatalog( + opCtx, coll.getNss(), coll.getKeyPattern(), newState.zones)); _persistPhaseUpdate(opCtx, newState.phase, coll.getUuid()); auto [_, inserted] = _defragmentationStates.insert_or_assign(coll.getUuid(), std::move(newState)); diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h index 6adc0d8734f..bb7a148a3b4 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h @@ -89,6 +89,7 @@ private: BSONObj collectionShardKey; std::queue<DefragmentationAction> queuedActions; unsigned outstandingActions{0}; + ShardToChunksMap shardToChunkMap; std::vector<ChunkType> chunkList; ZoneInfo zones; }; @@ -112,13 +113,11 @@ private: CollectionDefragmentationState& collectionData); /** - * Returns next phase 1 merge action for the collection if there is one and boost::none - * otherwise. + * Returns next phase 1 merge or datasize action for the collection if there is one and + * boost::none otherwise. */ - boost::optional<MergeInfo> _getCollectionMergeAction( - CollectionDefragmentationState& collectionInfo) { - return boost::none; - } + boost::optional<DefragmentationAction> _getCollectionPhase1Action( + OperationContext* opCtx, const UUID& uuid, CollectionDefragmentationState& collectionInfo); /** * Returns next phase 3 split action for the collection if there is one and boost::none diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp index 196736e38be..3df05d8cfe6 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp @@ -39,12 +39,19 @@ class BalancerDefragmentationPolicyTest : public ConfigServerTestFixture { protected: const NamespaceString kNss{"testDb.testColl"}; const UUID kUuid = UUID::gen(); - const ShardId kShardId = ShardId("testShard"); + const ShardId kShardId0 = ShardId("shard0"); + const ShardId kShardId1 = ShardId("shard1"); const ChunkVersion kCollectionVersion = ChunkVersion(1, 1, OID::gen(), Timestamp(10)); const KeyPattern kShardKeyPattern = KeyPattern(BSON("x" << 1)); const BSONObj kMinKey = BSON("x" << 0); const BSONObj kMaxKey = BSON("x" << 10); const long long kMaxChunkSizeBytes{2048}; + const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345); + const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346); + + const std::vector<ShardType> kShardList{ + ShardType(kShardId0.toString(), kShardHost0.toString()), + ShardType(kShardId1.toString(), kShardHost1.toString())}; BalancerDefragmentationPolicyTest() : _random(std::random_device{}()), @@ -60,8 +67,22 @@ protected: return shardedCollection; } + CollectionType setupCollectionForPhase1(std::vector<ChunkType>& chunkList) { + setupShards(kShardList); + setupCollection(kNss, kShardKeyPattern, chunkList); + ASSERT_OK(updateToConfigCollection( + operationContext(), + CollectionType::ConfigNS, + BSON(CollectionType::kUuidFieldName << kUuid), + BSON("$set" << BSON(CollectionType::kBalancerShouldMergeChunksFieldName << true)), + false)); + return Grid::get(operationContext()) + ->catalogClient() + ->getCollection(operationContext(), kUuid); + } + void makeConfigChunkEntry() { - ChunkType chunk(kUuid, ChunkRange(kMinKey, kMaxKey), kCollectionVersion, kShardId); + ChunkType chunk(kUuid, ChunkRange(kMinKey, kMaxKey), kCollectionVersion, kShardId0); ASSERT_OK(insertToConfigCollection( operationContext(), ChunkType::ConfigNS, chunk.toConfigBSON())); } @@ -124,9 +145,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedMergeResult) { auto coll = makeConfigCollectionEntry(); FailPointEnableBlock failpoint("skipPhaseTransition"); _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + makeConfigChunkEntry(); auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto mergeInfo = - MergeInfo(kShardId, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey)); + MergeInfo(kShardId0, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey)); _defragmentationPolicy.acknowledgeMergeResult( operationContext(), mergeInfo, @@ -141,9 +163,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedSplitVectorRespon auto coll = makeConfigCollectionEntry(); FailPointEnableBlock failpoint("skipPhaseTransition"); _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + makeConfigChunkEntry(); auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto splitVectorInfo = AutoSplitVectorInfo( - kShardId, kNss, kUuid, kCollectionVersion, BSONObj(), kMinKey, kMaxKey, 120); + kShardId0, kNss, kUuid, kCollectionVersion, BSONObj(), kMinKey, kMaxKey, 120); _defragmentationPolicy.acknowledgeAutoSplitVectorResult( operationContext(), splitVectorInfo, @@ -157,9 +180,16 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedSplitAction) { auto coll = makeConfigCollectionEntry(); FailPointEnableBlock failpoint("skipPhaseTransition"); _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + makeConfigChunkEntry(); auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); - auto splitInfo = SplitInfoWithKeyPattern( - kShardId, kNss, kCollectionVersion, kMinKey, kMaxKey, {}, kUuid, kShardKeyPattern.toBSON()); + auto splitInfo = SplitInfoWithKeyPattern(kShardId0, + kNss, + kCollectionVersion, + kMinKey, + kMaxKey, + {}, + kUuid, + kShardKeyPattern.toBSON()); _defragmentationPolicy.acknowledgeSplitResult( operationContext(), splitInfo, @@ -173,8 +203,9 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedDataSizeAction) { auto coll = makeConfigCollectionEntry(); FailPointEnableBlock failpoint("skipPhaseTransition"); _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + makeConfigChunkEntry(); auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); - auto dataSizeInfo = DataSizeInfo(kShardId, + auto dataSizeInfo = DataSizeInfo(kShardId0, kNss, kUuid, ChunkRange(kMinKey, kMaxKey), @@ -194,9 +225,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulMergeAction) auto coll = makeConfigCollectionEntry(); FailPointEnableBlock failpoint("skipPhaseTransition"); _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + makeConfigChunkEntry(); auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto mergeInfo = - MergeInfo(kShardId, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey)); + MergeInfo(kShardId0, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey)); _defragmentationPolicy.acknowledgeMergeResult(operationContext(), mergeInfo, Status::OK()); ASSERT_TRUE(future.isReady()); DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(future.get()); @@ -211,7 +243,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulAutoSplitVect FailPointEnableBlock failpoint("skipPhaseTransition"); _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); - auto splitVectorInfo = AutoSplitVectorInfo(kShardId, + auto splitVectorInfo = AutoSplitVectorInfo(kShardId0, kNss, kUuid, kCollectionVersion, @@ -235,7 +267,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulSplitAction) FailPointEnableBlock failpoint("skipPhaseTransition"); _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); - auto splitInfo = SplitInfoWithKeyPattern(kShardId, + auto splitInfo = SplitInfoWithKeyPattern(kShardId0, kNss, kCollectionVersion, kMinKey, @@ -253,7 +285,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulDataSizeActio _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); makeConfigChunkEntry(); auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); - auto dataSizeInfo = DataSizeInfo(kShardId, + auto dataSizeInfo = DataSizeInfo(kShardId0, kNss, kUuid, ChunkRange(kMinKey, kMaxKey), @@ -269,5 +301,74 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulDataSizeActio ASSERT_EQ(configDoc.getIntField(ChunkType::estimatedSizeBytes.name()), 2000); } +TEST_F(BalancerDefragmentationPolicyTest, TestPhase1AllConsecutive) { + // Set up collection with all mergeable chunks + std::vector<ChunkType> chunkList; + for (int i = 0; i < 5; i++) { + ChunkType chunk( + kUuid, + ChunkRange(BSON("x" << i), BSON("x" << i + 1)), + ChunkVersion(1, i, kCollectionVersion.epoch(), kCollectionVersion.getTimestamp()), + kShardId0); + chunkList.push_back(chunk); + } + for (int i = 5; i < 10; i++) { + ChunkType chunk( + kUuid, + ChunkRange(BSON("x" << i), BSON("x" << i + 1)), + ChunkVersion(1, i, kCollectionVersion.epoch(), kCollectionVersion.getTimestamp()), + kShardId1); + chunkList.push_back(chunk); + } + auto coll = setupCollectionForPhase1(chunkList); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + // Test + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_TRUE(future.isReady()); + MergeInfo mergeAction = stdx::get<MergeInfo>(future.get()); + ASSERT_BSONOBJ_EQ(mergeAction.chunkRange.getMin(), BSON("x" << 0)); + ASSERT_BSONOBJ_EQ(mergeAction.chunkRange.getMax(), BSON("x" << 5)); + auto future2 = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_TRUE(future2.isReady()); + MergeInfo mergeAction2 = stdx::get<MergeInfo>(future2.get()); + ASSERT_BSONOBJ_EQ(mergeAction2.chunkRange.getMin(), BSON("x" << 5)); + ASSERT_BSONOBJ_EQ(mergeAction2.chunkRange.getMax(), BSON("x" << 10)); + auto future3 = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_FALSE(future3.isReady()); +} + +TEST_F(BalancerDefragmentationPolicyTest, Phase1NotConsecutive) { + std::vector<ChunkType> chunkList; + for (int i = 0; i < 10; i++) { + ShardId chosenShard = (i == 5) ? kShardId1 : kShardId0; + ChunkType chunk( + kUuid, + ChunkRange(BSON("x" << i), BSON("x" << i + 1)), + ChunkVersion(1, i, kCollectionVersion.epoch(), kCollectionVersion.getTimestamp()), + chosenShard); + chunkList.push_back(chunk); + } + auto coll = setupCollectionForPhase1(chunkList); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + // Test + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_TRUE(future.isReady()); + MergeInfo mergeAction = stdx::get<MergeInfo>(future.get()); + ASSERT_BSONOBJ_EQ(mergeAction.chunkRange.getMin(), BSON("x" << 0)); + ASSERT_BSONOBJ_EQ(mergeAction.chunkRange.getMax(), BSON("x" << 5)); + auto future2 = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_TRUE(future2.isReady()); + MergeInfo mergeAction2 = stdx::get<MergeInfo>(future2.get()); + ASSERT_BSONOBJ_EQ(mergeAction2.chunkRange.getMin(), BSON("x" << 6)); + ASSERT_BSONOBJ_EQ(mergeAction2.chunkRange.getMax(), BSON("x" << 10)); + auto future3 = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_TRUE(future3.isReady()); + DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(future3.get()); + ASSERT_BSONOBJ_EQ(dataSizeAction.chunkRange.getMin(), BSON("x" << 5)); + ASSERT_BSONOBJ_EQ(dataSizeAction.chunkRange.getMax(), BSON("x" << 6)); + auto future4 = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_FALSE(future4.isReady()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 1d038352e3f..5ece6c49dec 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -600,10 +600,10 @@ void ShardingCatalogManager::configureCollectionAutoSplit( std::set<ShardId> shardsIds; cm.getAllShardIds(&shardsIds); + const auto update = updateCmd.obj(); + withTransaction( opCtx, CollectionType::ConfigNS, [&](OperationContext* opCtx, TxnNumber txnNumber) { - const auto update = updateCmd.obj(); - const auto query = BSON(CollectionType::kNssFieldName << nss.ns() << CollectionType::kUuidFieldName << uuid); const auto res = writeToConfigDocumentInTxn( |