diff options
author | Allison Easton <allison.easton@mongodb.com> | 2021-12-02 15:43:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-02 16:13:45 +0000 |
commit | 93a31c914c819aeabcc0b16299782f1fb87b1089 (patch) | |
tree | 9fa4c2edbf91ecae0572969620f8cbe27f85a8c5 | |
parent | c3402c98def4ce8b25609429ccb9e24fb4fe7cd0 (diff) | |
download | mongo-93a31c914c819aeabcc0b16299782f1fb87b1089.tar.gz |
SERVER-61555 Link the defragmentation policy to ConfigureCollectionAutoSplit
10 files changed, 478 insertions, 148 deletions
diff --git a/jstests/sharding/balancer_defragmentation_merge_chunks.js b/jstests/sharding/balancer_defragmentation_merge_chunks.js new file mode 100644 index 00000000000..23a7c49273f --- /dev/null +++ b/jstests/sharding/balancer_defragmentation_merge_chunks.js @@ -0,0 +1,141 @@ +/** + * Test the setCollectionAutosplitter command and balancerCollectionStatus command + * + * @tags: [ + * requires_fcv_51, + * featureFlagPerCollectionAutoSplitter, + * ] + */ + +(function() { +'use strict'; + +load("jstests/libs/fail_point_util.js"); +load('jstests/sharding/autosplit_include.js'); + +var st = new ShardingTest( + {mongos: 1, shards: 3, config: 1, other: {enableBalancer: true, enableAutoSplit: true}}); + +// setup the database for the test +assert.commandWorked(st.s.adminCommand({enableSharding: 'db'})); +var db = st.getDB('db'); +var coll = db['test']; +var fullNs = coll.getFullName(); +var configPrimary = st.configRS.getPrimary(); + +const defaultChunkSize = 2 * 1024 * 1024; +const bigString = "X".repeat(32 * 1024); // 32 KB +assert.commandWorked(st.s.adminCommand({shardCollection: fullNs, key: {key: 1}})); + +var bulk = coll.initializeUnorderedBulkOp(); +for (let i = 0; i < 32 * 128; i++) { + bulk.insert({key: i, str: bigString}); +} +assert.commandWorked(bulk.execute()); +waitForOngoingChunkSplits(st); + +jsTest.log("Balance cluster before beginning defragmentation"); + +function waitForBalanced() { + assert.soon(function() { + st.awaitBalancerRound(); + balancerStatus = + assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs})); + return balancerStatus.balancerCompliant; + }); + jsTest.log("Balancer status of " + fullNs + " : \n" + tojson(balancerStatus)); +} + +waitForBalanced(); + +jsTest.log("Begin and end defragmentation with balancer off."); +{ + st.stopBalancer(); + assert.commandWorked(st.s.adminCommand({ + configureCollectionAutoSplitter: fullNs, + enableAutoSplitter: false, + balancerShouldMergeChunks: true, + defaultChunkSize: defaultChunkSize, + })); + // Defragmentation should not start with the balancer stopped + var balancerStatus = + assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs})); + assert.eq(balancerStatus.balancerCompliant, true); + assert.commandWorked(st.s.adminCommand({ + configureCollectionAutoSplitter: fullNs, + enableAutoSplitter: false, + balancerShouldMergeChunks: false, + defaultChunkSize: defaultChunkSize, + })); + st.startBalancer(); + st.awaitBalancerRound(); + assert.eq(balancerStatus.balancerCompliant, true); + st.stopBalancer(); +} + +jsTest.log("Begin and end defragmentation with balancer on"); +{ + st.startBalancer(); + var phaseTransitionFailpoint = configureFailPoint(configPrimary, "skipPhaseTransition"); + assert.commandWorked(st.s.adminCommand({ + configureCollectionAutoSplitter: fullNs, + enableAutoSplitter: false, + balancerShouldMergeChunks: true, + defaultChunkSize: defaultChunkSize, + })); + st.awaitBalancerRound(); + var currStatus = assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs})); + assert.eq(currStatus.balancerCompliant, false); + assert.eq(currStatus.firstComplianceViolation, 'chunksMerging'); + assert.commandWorked(st.s.adminCommand({ + configureCollectionAutoSplitter: fullNs, + enableAutoSplitter: false, + balancerShouldMergeChunks: false, + defaultChunkSize: defaultChunkSize, + })); + st.awaitBalancerRound(); + assert.eq(balancerStatus.balancerCompliant, true); + st.stopBalancer(); + phaseTransitionFailpoint.off(); +} + +jsTest.log("Begin defragmentation with balancer off, end with it on"); +{ + var phaseTransitionFailpoint = configureFailPoint(configPrimary, "skipPhaseTransition"); + assert.commandWorked(st.s.adminCommand({ + configureCollectionAutoSplitter: fullNs, + enableAutoSplitter: false, + balancerShouldMergeChunks: true, + defaultChunkSize: defaultChunkSize, + })); + st.startBalancer(); + st.awaitBalancerRound(); + var currStatus = assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs})); + assert.eq(currStatus.balancerCompliant, false); + assert.eq(currStatus.firstComplianceViolation, 'chunksMerging'); + assert.commandWorked(st.s.adminCommand({ + configureCollectionAutoSplitter: fullNs, + enableAutoSplitter: false, + balancerShouldMergeChunks: false, + defaultChunkSize: defaultChunkSize, + })); + st.awaitBalancerRound(); + assert.eq(balancerStatus.balancerCompliant, true); + st.stopBalancer(); + phaseTransitionFailpoint.off(); +} + +jsTest.log("Balancer on, begin defragmentation and let it complete"); +{ + st.startBalancer(); + assert.commandWorked(st.s.adminCommand({ + configureCollectionAutoSplitter: fullNs, + enableAutoSplitter: false, + balancerShouldMergeChunks: true, + defaultChunkSize: defaultChunkSize, + })); + waitForBalanced(); +} + +st.stop(); +})(); diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 6b75e4a3bbc..2970498a606 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -379,22 +379,24 @@ void Balancer::_consumeActionStreamLoop() { Grid::get(opCtx.get())->getExecutorPool()->getFixedExecutor()); while (!_stopRequested()) { // Blocking call - DefragmentationAction action = _defragmentationPolicy->getNextStreamingAction().get(); + DefragmentationAction action = + _defragmentationPolicy->getNextStreamingAction(opCtx.get()).get(); // Non-blocking call, assumes the requests are returning a SemiFuture<> stdx::visit( visit_helper::Overloaded{ [&](MergeInfo mergeAction) { - auto result = - _commandScheduler - ->requestMergeChunks(opCtx.get(), - mergeAction.nss, - mergeAction.shardId, - mergeAction.chunkRange, - mergeAction.collectionVersion) - .thenRunOn(*executor) - .onCompletion([this, mergeAction](const Status& status) { - _defragmentationPolicy->acknowledgeMergeResult(mergeAction, status); - }); + auto result = _commandScheduler + ->requestMergeChunks(opCtx.get(), + mergeAction.nss, + mergeAction.shardId, + mergeAction.chunkRange, + mergeAction.collectionVersion) + .thenRunOn(*executor) + .onCompletion([this, mergeAction](const Status& status) { + auto opCtx = cc().makeOperationContext(); + _defragmentationPolicy->acknowledgeMergeResult( + opCtx.get(), mergeAction, status); + }); }, [&](DataSizeInfo dataSizeAction) { auto result = @@ -407,8 +409,9 @@ void Balancer::_consumeActionStreamLoop() { dataSizeAction.keyPattern, dataSizeAction.estimatedValue) .thenRunOn(*executor) - .onCompletion([this, dataSizeAction, &opCtx]( + .onCompletion([this, dataSizeAction]( const StatusWith<DataSizeResponse>& swDataSize) { + auto opCtx = cc().makeOperationContext(); _defragmentationPolicy->acknowledgeDataSizeResult( opCtx.get(), dataSizeAction, swDataSize); }); @@ -427,25 +430,27 @@ void Balancer::_consumeActionStreamLoop() { .onCompletion( [this, splitVectorAction]( const StatusWith<std::vector<BSONObj>>& swSplitPoints) { + auto opCtx = cc().makeOperationContext(); _defragmentationPolicy->acknowledgeAutoSplitVectorResult( - splitVectorAction, swSplitPoints); + opCtx.get(), splitVectorAction, swSplitPoints); }); }, [&](SplitInfoWithKeyPattern splitAction) { - auto result = - _commandScheduler - ->requestSplitChunk(opCtx.get(), - splitAction.info.nss, - splitAction.info.shardId, - splitAction.info.collectionVersion, - splitAction.keyPattern, - splitAction.info.minKey, - splitAction.info.maxKey, - splitAction.info.splitKeys) - .thenRunOn(*executor) - .onCompletion([this, splitAction](const Status& status) { - _defragmentationPolicy->acknowledgeSplitResult(splitAction, status); - }); + auto result = _commandScheduler + ->requestSplitChunk(opCtx.get(), + splitAction.info.nss, + splitAction.info.shardId, + splitAction.info.collectionVersion, + splitAction.keyPattern, + splitAction.info.minKey, + splitAction.info.maxKey, + splitAction.info.splitKeys) + .thenRunOn(*executor) + .onCompletion([this, splitAction](const Status& status) { + auto opCtx = cc().makeOperationContext(); + _defragmentationPolicy->acknowledgeSplitResult( + opCtx.get(), splitAction, status); + }); }, [](EndOfActionStream eoa) {}}, action); @@ -544,6 +549,8 @@ void Balancer::_mainThread() { warnOnMultiVersion(uassertStatusOK(_clusterStats->getStats(opCtx.get()))); } + _initializeDefragmentations(opCtx.get()); + Status status = _splitChunksIfNeeded(opCtx.get()); if (!status.isOK()) { LOGV2_WARNING(21878, @@ -749,6 +756,13 @@ bool Balancer::_checkOIDs(OperationContext* opCtx) { return true; } +void Balancer::_initializeDefragmentations(OperationContext* opCtx) { + auto collections = Grid::get(opCtx)->catalogClient()->getCollections(opCtx, {}); + for (const auto& coll : collections) { + _defragmentationPolicy->refreshCollectionDefragmentationStatus(opCtx, coll); + } +} + Status Balancer::_splitChunksIfNeeded(OperationContext* opCtx) { auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(opCtx); if (!chunksToSplitStatus.isOK()) { @@ -867,10 +881,15 @@ void Balancer::notifyPersistedBalancerSettingsChanged() { Balancer::BalancerStatus Balancer::getBalancerStatusForNs(OperationContext* opCtx, const NamespaceString& ns) { - const auto uuid = CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, ns); - bool isMerging = uuid && _defragmentationPolicy->isDefragmentingCollection(*uuid); - if (isMerging) { - return {false, kBalancerPolicyStatusChunksMerging.toString()}; + // TODO (SERVER-61727) update this with phase 2 + try { + auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, ns, {}); + bool isMerging = _defragmentationPolicy->isDefragmentingCollection(coll.getUuid()); + if (isMerging) { + return {false, kBalancerPolicyStatusChunksMerging.toString()}; + } + } catch (DBException&) { + // Catch exceptions to keep consistency with errors thrown before defragmentation } auto splitChunks = uassertStatusOK(_chunkSelectionPolicy->selectChunksToSplit(opCtx, ns)); diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index 48c167e7ec0..45008947c8d 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -258,6 +258,12 @@ private: bool _checkOIDs(OperationContext* opCtx); /** + * Queries config.collections for all collections that should be running defragmentation and + * passes this information to the defragmentation policy. + */ + void _initializeDefragmentations(OperationContext* opCtx); + + /** * Iterates through all chunks in all collections. If the collection is the sessions collection, * checks if the number of chunks is greater than or equal to the configured minimum number of * chunks for the sessions collection (minNumChunksForSessionsCollection). If it isn't, diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy.h index 61e9fc67c4a..017380fb1e1 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy.h +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/s/balancer/balancer_policy.h" +#include "mongo/s/catalog/type_collection.h" namespace mongo { /** @@ -44,16 +45,12 @@ public: virtual ~BalancerDefragmentationPolicy() {} /** - * Sets the "begin of defragmentation" state on the specified collection. New actions concerning - * the collection will be included in the stream. + * Checks if the collection should be running defragmentation. If a new defragmentation should + * be started, this will initialize the defragmentation. If defragmentation has been turned off + * on a collection, this will stop the defragmentation. */ - virtual void beginNewCollection(OperationContext* opCtx, const UUID& uuid) = 0; - - /** - * Removes the specified collection from the list of namespaces to be defragmented. Actions - * concerning the collection will stop appearing in the stream. - */ - virtual void removeCollection(OperationContext* opCtx, const UUID& uuid) = 0; + virtual void refreshCollectionDefragmentationStatus(OperationContext* opCtx, + const CollectionType& coll) = 0; /** * Returns true if the specified collection is currently being defragmented. @@ -72,7 +69,7 @@ public: * or when there are too many outstanding actions (too many calls to getStreamingAction() that * have not been acknowledged). */ - virtual SemiFuture<DefragmentationAction> getNextStreamingAction() = 0; + virtual SemiFuture<DefragmentationAction> getNextStreamingAction(OperationContext* opCtx) = 0; /** * Stops the generation of new actions: any new call to (or currently blocked ones on) @@ -81,12 +78,18 @@ public: */ virtual void closeActionStream() = 0; - virtual void acknowledgeMergeResult(MergeInfo action, const Status& result) = 0; + virtual void acknowledgeMergeResult(OperationContext* opCtx, + MergeInfo action, + const Status& result) = 0; virtual void acknowledgeAutoSplitVectorResult( - AutoSplitVectorInfo action, const StatusWith<std::vector<BSONObj>>& result) = 0; + OperationContext* opCtx, + AutoSplitVectorInfo action, + const StatusWith<std::vector<BSONObj>>& result) = 0; - virtual void acknowledgeSplitResult(SplitInfoWithKeyPattern action, const Status& result) = 0; + virtual void acknowledgeSplitResult(OperationContext* opCtx, + SplitInfoWithKeyPattern action, + const Status& result) = 0; virtual void acknowledgeDataSizeResult(OperationContext* opCtx, DataSizeInfo action, diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp index 2f253255176..00b825de023 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp @@ -36,50 +36,55 @@ #include "mongo/s/grid.h" namespace mongo { +MONGO_FAIL_POINT_DEFINE(skipPhaseTransition); -void BalancerDefragmentationPolicyImpl::beginNewCollection(OperationContext* opCtx, - const UUID& uuid) { - _persistPhaseUpdate(opCtx, DefragmentationPhaseEnum::kNotStarted, uuid); - _initializeCollectionState(opCtx, uuid); -} +void BalancerDefragmentationPolicyImpl::refreshCollectionDefragmentationStatus( + OperationContext* opCtx, const CollectionType& coll) { + stdx::lock_guard<Latch> lk(_streamingMutex); + const auto& uuid = coll.getUuid(); + if (coll.getBalancerShouldMergeChunks() && !_defragmentationStates.contains(uuid)) { + _initializeCollectionState(lk, opCtx, coll); -void BalancerDefragmentationPolicyImpl::removeCollection(OperationContext* opCtx, - const UUID& uuid) { - _persistPhaseUpdate(opCtx, boost::none, uuid); - _clearDataSizeInformation(opCtx, uuid); - _defragmentationStates.erase(uuid); + // Load first action, this will trigger move to phase 2 if there are no phase 1 actions + _queueNextAction(opCtx, uuid, _defragmentationStates[uuid]); + // Fulfill promise if needed + if (_nextStreamingActionPromise) { + auto nextStreamingAction = _nextStreamingAction(opCtx); + if (nextStreamingAction) { + _concurrentStreamingOps++; + _nextStreamingActionPromise.get().setWith([&] { return *nextStreamingAction; }); + _nextStreamingActionPromise = boost::none; + return; + } + } + } else if (!coll.getBalancerShouldMergeChunks() && _defragmentationStates.contains(uuid)) { + _clearDataSizeInformation(opCtx, uuid); + _defragmentationStates.erase(uuid); + _persistPhaseUpdate(opCtx, boost::none, uuid); + } } -SemiFuture<DefragmentationAction> BalancerDefragmentationPolicyImpl::getNextStreamingAction() { +SemiFuture<DefragmentationAction> BalancerDefragmentationPolicyImpl::getNextStreamingAction( + OperationContext* opCtx) { stdx::lock_guard<Latch> lk(_streamingMutex); if (_concurrentStreamingOps < kMaxConcurrentOperations) { - _concurrentStreamingOps++; - if (auto action = _nextStreamingAction()) { + if (auto action = _nextStreamingAction(opCtx)) { + _concurrentStreamingOps++; return SemiFuture<DefragmentationAction>::makeReady(*action); } } - auto&& [promise, future] = makePromiseFuture<DefragmentationAction>(); + auto [promise, future] = makePromiseFuture<DefragmentationAction>(); _nextStreamingActionPromise = std::move(promise); return std::move(future).semi(); } -boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextStreamingAction() { +boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextStreamingAction( + OperationContext* opCtx) { // TODO (SERVER-61635) validate fairness through collections for (auto& [uuid, collectionData] : _defragmentationStates) { - if (!collectionData.queuedActions.empty()) { - auto action = collectionData.queuedActions.front(); - collectionData.queuedActions.pop(); - return action; - } - if (collectionData.phase == DefragmentationPhaseEnum::kMergeChunks) { - if (auto mergeAction = _getCollectionMergeAction(collectionData)) { - return boost::optional<DefragmentationAction>(*mergeAction); - } - } - if (collectionData.phase == DefragmentationPhaseEnum::kSplitChunks) { - if (auto splitAction = _getCollectionSplitAction(collectionData)) { - return boost::optional<DefragmentationAction>(*splitAction); - } + if (!collectionData.queuedActions.empty() || + _queueNextAction(opCtx, uuid, collectionData)) { + return collectionData.popFromActionQueue(); } } boost::optional<DefragmentationAction> noAction = boost::none; @@ -89,9 +94,43 @@ boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextS return noAction; } -void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(MergeInfo action, +bool BalancerDefragmentationPolicyImpl::_queueNextAction( + OperationContext* opCtx, const UUID& uuid, CollectionDefragmentationState& collectionData) { + // get next action within the current phase + switch (collectionData.phase) { + case DefragmentationPhaseEnum::kMergeChunks: + if (auto mergeAction = _getCollectionMergeAction(collectionData)) { + collectionData.queuedActions.push(*mergeAction); + return true; + } + break; + case DefragmentationPhaseEnum::kSplitChunks: + if (auto splitAction = _getCollectionSplitAction(collectionData)) { + collectionData.queuedActions.push(*splitAction); + return true; + } + break; + default: + uasserted(ErrorCodes::BadValue, "Unsupported phase type"); + } + // If no action for the current phase is available, check the conditions for transitioning to + // the next phase + if (collectionData.queuedActions.empty() && collectionData.outstandingActions == 0) { + _transitionPhases(opCtx, uuid, collectionData); + } + return false; +} + +void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext* opCtx, + MergeInfo action, const Status& result) { stdx::lock_guard<Latch> lk(_streamingMutex); + // Check if collection defragmentation has been canceled + if (!_defragmentationStates.contains(action.uuid)) { + return; + } + if (result.isOK()) + _defragmentationStates[action.uuid].outstandingActions--; boost::optional<DefragmentationAction> nextActionOnNamespace = result.isOK() ? boost::optional<DefragmentationAction>( DataSizeInfo(action.shardId, @@ -102,13 +141,18 @@ void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(MergeInfo action, _defragmentationStates.at(action.uuid).collectionShardKey, false)) : boost::optional<DefragmentationAction>(action); - _processEndOfAction(lk, action.uuid, nextActionOnNamespace); + _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace); } void BalancerDefragmentationPolicyImpl::acknowledgeDataSizeResult( OperationContext* opCtx, DataSizeInfo action, const StatusWith<DataSizeResponse>& result) { stdx::lock_guard<Latch> lk(_streamingMutex); + // Check if collection defragmentation has been canceled + if (!_defragmentationStates.contains(action.uuid)) { + return; + } if (result.isOK()) { + _defragmentationStates[action.uuid].outstandingActions--; ChunkType chunk(action.uuid, action.chunkRange, action.version, action.shardId); ShardingCatalogManager* catalogManager = ShardingCatalogManager::get(opCtx); catalogManager->setChunkEstimatedSize(opCtx, @@ -118,12 +162,20 @@ void BalancerDefragmentationPolicyImpl::acknowledgeDataSizeResult( } boost::optional<DefragmentationAction> nextActionOnNamespace = result.isOK() ? boost::none : boost::optional<DefragmentationAction>(action); - _processEndOfAction(lk, action.uuid, nextActionOnNamespace); + _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace); } void BalancerDefragmentationPolicyImpl::acknowledgeAutoSplitVectorResult( - AutoSplitVectorInfo action, const StatusWith<std::vector<BSONObj>>& result) { + OperationContext* opCtx, + AutoSplitVectorInfo action, + const StatusWith<std::vector<BSONObj>>& result) { stdx::lock_guard<Latch> lk(_streamingMutex); + // Check if collection defragmentation has been canceled + if (!_defragmentationStates.contains(action.uuid)) { + return; + } + if (result.isOK()) + _defragmentationStates[action.uuid].outstandingActions--; boost::optional<DefragmentationAction> nextActionOnNamespace = result.isOK() ? boost::optional<DefragmentationAction>(SplitInfoWithKeyPattern(action.shardId, action.nss, @@ -134,15 +186,22 @@ void BalancerDefragmentationPolicyImpl::acknowledgeAutoSplitVectorResult( action.uuid, action.keyPattern)) : boost::optional<DefragmentationAction>(action); - _processEndOfAction(lk, action.uuid, nextActionOnNamespace); + _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace); } -void BalancerDefragmentationPolicyImpl::acknowledgeSplitResult(SplitInfoWithKeyPattern action, +void BalancerDefragmentationPolicyImpl::acknowledgeSplitResult(OperationContext* opCtx, + SplitInfoWithKeyPattern action, const Status& result) { stdx::lock_guard<Latch> lk(_streamingMutex); + // Check if collection defragmentation has been canceled + if (!_defragmentationStates.contains(action.uuid)) { + return; + } + if (result.isOK()) + _defragmentationStates[action.uuid].outstandingActions--; boost::optional<DefragmentationAction> nextActionOnNamespace = result.isOK() ? boost::none : boost::optional<DefragmentationAction>(action); - _processEndOfAction(lk, action.uuid, nextActionOnNamespace); + _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace); } void BalancerDefragmentationPolicyImpl::closeActionStream() { @@ -157,20 +216,21 @@ void BalancerDefragmentationPolicyImpl::closeActionStream() { void BalancerDefragmentationPolicyImpl::_processEndOfAction( WithLock, + OperationContext* opCtx, const UUID& uuid, const boost::optional<DefragmentationAction>& nextActionOnNamespace) { - // If the end of the current action implies a next step and the related collection is still - // being defragmented, store it + + // If the end of the current action implies a next step, store it if (nextActionOnNamespace) { - auto collectionDefragmentationStateIt = _defragmentationStates.find(uuid); - if (collectionDefragmentationStateIt != _defragmentationStates.end()) { - collectionDefragmentationStateIt->second.queuedActions.push(*nextActionOnNamespace); - } + _defragmentationStates.at(uuid).queuedActions.push(*nextActionOnNamespace); } - // If there is a client blocked on the stream, serve it now with a new action... + // Load next action, this will trigger phase change if needed + _queueNextAction(opCtx, uuid, _defragmentationStates[uuid]); + + // Fulfill promise if needed if (_nextStreamingActionPromise) { - auto nextStreamingAction = _nextStreamingAction(); + auto nextStreamingAction = _nextStreamingAction(opCtx); if (nextStreamingAction) { _nextStreamingActionPromise.get().setWith([&] { return *nextStreamingAction; }); _nextStreamingActionPromise = boost::none; @@ -181,14 +241,53 @@ void BalancerDefragmentationPolicyImpl::_processEndOfAction( --_concurrentStreamingOps; } -void BalancerDefragmentationPolicyImpl::_initializeCollectionState(OperationContext* opCtx, - const UUID& uuid) { +void BalancerDefragmentationPolicyImpl::_transitionPhases( + OperationContext* opCtx, const UUID& uuid, CollectionDefragmentationState& collectionInfo) { + boost::optional<DefragmentationPhaseEnum> nextPhase; + switch (collectionInfo.phase) { + case DefragmentationPhaseEnum::kMergeChunks: + if (MONGO_unlikely(skipPhaseTransition.shouldFail())) { + nextPhase = DefragmentationPhaseEnum::kMergeChunks; + break; + } + // TODO (SERVER-60459) Change to kMoveAndMergeChunks + nextPhase = boost::none; + break; + case DefragmentationPhaseEnum::kMoveAndMergeChunks: + // TODO (SERVER-60479) Change to kSplitChunks + nextPhase = boost::none; + break; + case DefragmentationPhaseEnum::kSplitChunks: + nextPhase = boost::none; + break; + } + if (nextPhase) { + collectionInfo.phase = nextPhase.get(); + } else { + _clearDataSizeInformation(opCtx, uuid); + _defragmentationStates.erase(uuid); + } + _persistPhaseUpdate(opCtx, nextPhase, uuid); +} + +void BalancerDefragmentationPolicyImpl::_initializeCollectionState(WithLock, + OperationContext* opCtx, + const CollectionType& coll) { try { - _defragmentationStates[uuid].phase = DefragmentationPhaseEnum::kNotStarted; + CollectionDefragmentationState newState; + newState.nss = coll.getNss(); + newState.phase = coll.getDefragmentationPhase() ? coll.getDefragmentationPhase().get() + : DefragmentationPhaseEnum::kMergeChunks; + newState.collectionShardKey = coll.getKeyPattern().toBSON(); + _persistPhaseUpdate(opCtx, newState.phase, coll.getUuid()); + auto [_, inserted] = + _defragmentationStates.insert_or_assign(coll.getUuid(), std::move(newState)); + dassert(inserted); } catch (const DBException& e) { LOGV2_ERROR(6153101, "Error while starting defragmentation on collection", - "uuid"_attr = uuid, + "namespace"_attr = coll.getNss(), + "uuid"_attr = coll.getUuid(), "error"_attr = e); } } @@ -205,8 +304,9 @@ void BalancerDefragmentationPolicyImpl::_persistPhaseUpdate( BSON("$set" << BSON(CollectionType::kDefragmentationPhaseFieldName << DefragmentationPhase_serializer(*phase))))); } else { - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( - BSON("$unset" << BSON(CollectionType::kDefragmentationPhaseFieldName << "")))); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(BSON( + "$unset" << BSON(CollectionType::kBalancerShouldMergeChunksFieldName + << "" << CollectionType::kDefragmentationPhaseFieldName << "")))); } return entry; }()}); diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h index 2a329a8bbfb..6adc0d8734f 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h @@ -48,14 +48,19 @@ public: return _defragmentationStates.contains(uuid); } - SemiFuture<DefragmentationAction> getNextStreamingAction() override; + SemiFuture<DefragmentationAction> getNextStreamingAction(OperationContext* opCtx) override; - void acknowledgeMergeResult(MergeInfo action, const Status& result) override; + void acknowledgeMergeResult(OperationContext* opCtx, + MergeInfo action, + const Status& result) override; - void acknowledgeAutoSplitVectorResult(AutoSplitVectorInfo action, + void acknowledgeAutoSplitVectorResult(OperationContext* opCtx, + AutoSplitVectorInfo action, const StatusWith<std::vector<BSONObj>>& result) override; - void acknowledgeSplitResult(SplitInfoWithKeyPattern action, const Status& result) override; + void acknowledgeSplitResult(OperationContext* opCtx, + SplitInfoWithKeyPattern action, + const Status& result) override; void acknowledgeDataSizeResult(OperationContext* opCtx, DataSizeInfo action, @@ -63,25 +68,48 @@ public: void closeActionStream() override; - void beginNewCollection(OperationContext* opCtx, const UUID& uuid) override; - - void removeCollection(OperationContext* opCtx, const UUID& uuid) override; + void refreshCollectionDefragmentationStatus(OperationContext* opCtx, + const CollectionType& coll) override; private: static constexpr int kMaxConcurrentOperations = 50; // Data structures used to keep track of the defragmentation state. struct CollectionDefragmentationState { + DefragmentationAction popFromActionQueue() { + auto action = queuedActions.front(); + queuedActions.pop(); + outstandingActions++; + return action; + }; + NamespaceString nss; DefragmentationPhaseEnum phase; int64_t maxChunkSizeBytes; BSONObj collectionShardKey; std::queue<DefragmentationAction> queuedActions; + unsigned outstandingActions{0}; std::vector<ChunkType> chunkList; ZoneInfo zones; }; - boost::optional<DefragmentationAction> _nextStreamingAction(); + /** + * Returns the next action from any collection in phase 1 or 3 or boost::none if there are no + * actions to perform. + * Must be called while holding the _streamingMutex. + */ + boost::optional<DefragmentationAction> _nextStreamingAction(OperationContext* opCtx); + + /** + * Adds next action to the collection's action queue if there is one. If there are no further + * actions, the queue is empty, and there are no outstanding actions for this collection, this + * will call _transitionPhases. Returns true if there is a new action for the collection and + * false otherwise. + * Must be called while holding the _streamingMutex. + */ + bool _queueNextAction(OperationContext* opCtx, + const UUID& uuid, + CollectionDefragmentationState& collectionData); /** * Returns next phase 1 merge action for the collection if there is one and boost::none @@ -102,14 +130,24 @@ private: } /** + * Move to the next phase and persist the phase change. This will end defragmentation if the + * current phase is the last phase. + * Must be called while holding the _streamingMutex. + */ + void _transitionPhases(OperationContext* opCtx, + const UUID& uuid, + CollectionDefragmentationState& collectionInfo); + + /** * Build the shardToChunk map for the namespace. Requires a scan of the config.chunks * collection. */ - void _initializeCollectionState(OperationContext* opCtx, const UUID& uuid); + void _initializeCollectionState(WithLock, OperationContext* opCtx, const CollectionType& coll); /** * Write the new phase to the defragmentationPhase field in config.collections. If phase is not * set, the field will be removed. + * Must be called while holding the _streamingMutex. */ void _persistPhaseUpdate(OperationContext* opCtx, boost::optional<DefragmentationPhaseEnum> phase, @@ -117,10 +155,12 @@ private: /** * Remove all datasize fields from config.chunks for the given namespace. + * Must be called while holding the _streamingMutex. */ void _clearDataSizeInformation(OperationContext* opCtx, const UUID& uuid); void _processEndOfAction(WithLock, + OperationContext* opCtx, const UUID& uuid, const boost::optional<DefragmentationAction>& nextActionOnNamespace); diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp index 9330be19bc5..196736e38be 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp @@ -51,11 +51,13 @@ protected: _clusterStats(std::make_unique<ClusterStatisticsImpl>(_random)), _defragmentationPolicy(_clusterStats.get()) {} - void makeConfigCollectionEntry() { + CollectionType makeConfigCollectionEntry() { CollectionType shardedCollection(kNss, OID::gen(), Timestamp(1, 1), Date_t::now(), kUuid); shardedCollection.setKeyPattern(kShardKeyPattern); + shardedCollection.setBalancerShouldMergeChunks(true); ASSERT_OK(insertToConfigCollection( operationContext(), CollectionType::ConfigNS, shardedCollection.toBSON())); + return shardedCollection; } void makeConfigChunkEntry() { @@ -81,8 +83,9 @@ protected: }; TEST_F(BalancerDefragmentationPolicyTest, TestAddCollection) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); // Test for persistence auto configDoc = findOneOnConfigCollection(operationContext(), CollectionType::ConfigNS, @@ -91,14 +94,12 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAddCollection) { auto storedDefragmentationPhase = DefragmentationPhase_parse( IDLParserErrorContext("BalancerDefragmentationPolicyTest"), configDoc.getStringField(CollectionType::kDefragmentationPhaseFieldName)); - ASSERT_TRUE(storedDefragmentationPhase == DefragmentationPhaseEnum::kNotStarted); + ASSERT_TRUE(storedDefragmentationPhase == DefragmentationPhaseEnum::kMergeChunks); } -TEST_F(BalancerDefragmentationPolicyTest, TestRemoveCollection) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); - _defragmentationPolicy.removeCollection(operationContext(), kUuid); - // Test for persistence removal +TEST_F(BalancerDefragmentationPolicyTest, TestAddCollectionNoActions) { + auto coll = makeConfigCollectionEntry(); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); auto configDoc = findOneOnConfigCollection(operationContext(), CollectionType::ConfigNS, BSON(CollectionType::kUuidFieldName << kUuid)) @@ -107,25 +108,29 @@ TEST_F(BalancerDefragmentationPolicyTest, TestRemoveCollection) { } TEST_F(BalancerDefragmentationPolicyTest, TestIsDefragmentingCollection) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); ASSERT_TRUE(_defragmentationPolicy.isDefragmentingCollection(kUuid)); ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(UUID::gen())); } TEST_F(BalancerDefragmentationPolicyTest, TestGetNextActionNoReadyActions) { - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); ASSERT_FALSE(future.isReady()); } TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedMergeResult) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto mergeInfo = MergeInfo(kShardId, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey)); _defragmentationPolicy.acknowledgeMergeResult( - mergeInfo, Status(ErrorCodes::NetworkTimeout, "Testing error response")); + operationContext(), + mergeInfo, + Status(ErrorCodes::NetworkTimeout, "Testing error response")); ASSERT_TRUE(future.isReady()); DefragmentationAction streamingAction = future.get(); MergeInfo mergeAction = stdx::get<MergeInfo>(streamingAction); @@ -133,35 +138,42 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedMergeResult) { } TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedSplitVectorResponse) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto splitVectorInfo = AutoSplitVectorInfo( kShardId, kNss, kUuid, kCollectionVersion, BSONObj(), kMinKey, kMaxKey, 120); _defragmentationPolicy.acknowledgeAutoSplitVectorResult( - splitVectorInfo, Status(ErrorCodes::NetworkTimeout, "Testing error response")); + operationContext(), + splitVectorInfo, + Status(ErrorCodes::NetworkTimeout, "Testing error response")); ASSERT_TRUE(future.isReady()); AutoSplitVectorInfo splitVectorAction = stdx::get<AutoSplitVectorInfo>(future.get()); ASSERT_EQ(splitVectorInfo.nss, splitVectorAction.nss); } TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedSplitAction) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto splitInfo = SplitInfoWithKeyPattern( kShardId, kNss, kCollectionVersion, kMinKey, kMaxKey, {}, kUuid, kShardKeyPattern.toBSON()); _defragmentationPolicy.acknowledgeSplitResult( - splitInfo, Status(ErrorCodes::NetworkTimeout, "Testing error response")); + operationContext(), + splitInfo, + Status(ErrorCodes::NetworkTimeout, "Testing error response")); ASSERT_TRUE(future.isReady()); SplitInfoWithKeyPattern splitAction = stdx::get<SplitInfoWithKeyPattern>(future.get()); ASSERT_EQ(splitInfo.info.nss, splitAction.info.nss); } TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedDataSizeAction) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto dataSizeInfo = DataSizeInfo(kShardId, kNss, kUuid, @@ -179,12 +191,13 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedDataSizeAction) { } TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulMergeAction) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto mergeInfo = MergeInfo(kShardId, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey)); - _defragmentationPolicy.acknowledgeMergeResult(mergeInfo, Status::OK()); + _defragmentationPolicy.acknowledgeMergeResult(operationContext(), mergeInfo, Status::OK()); ASSERT_TRUE(future.isReady()); DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(future.get()); ASSERT_EQ(mergeInfo.nss, dataSizeAction.nss); @@ -194,9 +207,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulMergeAction) TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulAutoSplitVectorAction) { std::vector<BSONObj> splitPoints = {BSON("x" << 4)}; - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto splitVectorInfo = AutoSplitVectorInfo(kShardId, kNss, kUuid, @@ -205,8 +219,9 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulAutoSplitVect kMinKey, kMaxKey, 2048); - _defragmentationPolicy.acknowledgeAutoSplitVectorResult(splitVectorInfo, - StatusWith(splitPoints)); + + _defragmentationPolicy.acknowledgeAutoSplitVectorResult( + operationContext(), splitVectorInfo, StatusWith(splitPoints)); ASSERT_TRUE(future.isReady()); SplitInfoWithKeyPattern splitAction = stdx::get<SplitInfoWithKeyPattern>(future.get()); ASSERT_EQ(splitVectorInfo.nss, splitAction.info.nss); @@ -216,9 +231,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulAutoSplitVect TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulSplitAction) { std::vector<BSONObj> splitPoints = {BSON("x" << 4)}; - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto splitInfo = SplitInfoWithKeyPattern(kShardId, kNss, kCollectionVersion, @@ -227,15 +243,16 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulSplitAction) splitPoints, kUuid, kShardKeyPattern.toBSON()); - _defragmentationPolicy.acknowledgeSplitResult(splitInfo, Status::OK()); + _defragmentationPolicy.acknowledgeSplitResult(operationContext(), splitInfo, Status::OK()); ASSERT_FALSE(future.isReady()); } TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulDataSizeAction) { - makeConfigCollectionEntry(); - _defragmentationPolicy.beginNewCollection(operationContext(), kUuid); + auto coll = makeConfigCollectionEntry(); + FailPointEnableBlock failpoint("skipPhaseTransition"); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); makeConfigChunkEntry(); - auto future = _defragmentationPolicy.getNextStreamingAction(); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); auto dataSizeInfo = DataSizeInfo(kShardId, kNss, kUuid, diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 1393a95851e..1d038352e3f 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -631,6 +631,8 @@ void ShardingCatalogManager::configureCollectionAutoSplit( {std::make_move_iterator(shardsIds.begin()), std::make_move_iterator(shardsIds.end())}, nss, executor); + + Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(); } void ShardingCatalogManager::renameShardedMetadata( diff --git a/src/mongo/s/catalog/type_collection.h b/src/mongo/s/catalog/type_collection.h index 2c9950750c5..d4978c16325 100644 --- a/src/mongo/s/catalog/type_collection.h +++ b/src/mongo/s/catalog/type_collection.h @@ -98,6 +98,7 @@ public: using CollectionTypeBase::kUpdatedAtFieldName; // Make getters and setters accessible. + using CollectionTypeBase::getDefragmentationPhase; using CollectionTypeBase::getMaxChunkSizeBytes; using CollectionTypeBase::getNss; using CollectionTypeBase::getReshardingFields; @@ -105,6 +106,8 @@ public: using CollectionTypeBase::getTimestamp; using CollectionTypeBase::getUnique; using CollectionTypeBase::getUpdatedAt; + using CollectionTypeBase::setBalancerShouldMergeChunks; + using CollectionTypeBase::setDefragmentationPhase; using CollectionTypeBase::setNss; using CollectionTypeBase::setReshardingFields; using CollectionTypeBase::setTimeseriesFields; diff --git a/src/mongo/s/catalog/type_collection.idl b/src/mongo/s/catalog/type_collection.idl index 9bb9284e5a3..bb35e7daec3 100644 --- a/src/mongo/s/catalog/type_collection.idl +++ b/src/mongo/s/catalog/type_collection.idl @@ -41,7 +41,6 @@ enums: description: "The list of phases composing the Collection Chunks Defragmentation Algorithm" type: string values: - kNotStarted: "notStarted" kMergeChunks : "mergeChunks" kMoveAndMergeChunks: "moveAndMergeChunks" kSplitChunks: "splitChunks" |