summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2021-12-02 15:43:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-02 16:13:45 +0000
commit93a31c914c819aeabcc0b16299782f1fb87b1089 (patch)
tree9fa4c2edbf91ecae0572969620f8cbe27f85a8c5
parentc3402c98def4ce8b25609429ccb9e24fb4fe7cd0 (diff)
downloadmongo-93a31c914c819aeabcc0b16299782f1fb87b1089.tar.gz
SERVER-61555 Link the defragmentation policy to ConfigureCollectionAutoSplit
-rw-r--r--jstests/sharding/balancer_defragmentation_merge_chunks.js141
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp83
-rw-r--r--src/mongo/db/s/balancer/balancer.h6
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy.h29
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp200
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h58
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp103
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp2
-rw-r--r--src/mongo/s/catalog/type_collection.h3
-rw-r--r--src/mongo/s/catalog/type_collection.idl1
10 files changed, 478 insertions, 148 deletions
diff --git a/jstests/sharding/balancer_defragmentation_merge_chunks.js b/jstests/sharding/balancer_defragmentation_merge_chunks.js
new file mode 100644
index 00000000000..23a7c49273f
--- /dev/null
+++ b/jstests/sharding/balancer_defragmentation_merge_chunks.js
@@ -0,0 +1,141 @@
+/**
+ * Test the setCollectionAutosplitter command and balancerCollectionStatus command
+ *
+ * @tags: [
+ * requires_fcv_51,
+ * featureFlagPerCollectionAutoSplitter,
+ * ]
+ */
+
+(function() {
+'use strict';
+
+load("jstests/libs/fail_point_util.js");
+load('jstests/sharding/autosplit_include.js');
+
+var st = new ShardingTest(
+ {mongos: 1, shards: 3, config: 1, other: {enableBalancer: true, enableAutoSplit: true}});
+
+// setup the database for the test
+assert.commandWorked(st.s.adminCommand({enableSharding: 'db'}));
+var db = st.getDB('db');
+var coll = db['test'];
+var fullNs = coll.getFullName();
+var configPrimary = st.configRS.getPrimary();
+
+const defaultChunkSize = 2 * 1024 * 1024;
+const bigString = "X".repeat(32 * 1024); // 32 KB
+assert.commandWorked(st.s.adminCommand({shardCollection: fullNs, key: {key: 1}}));
+
+var bulk = coll.initializeUnorderedBulkOp();
+for (let i = 0; i < 32 * 128; i++) {
+ bulk.insert({key: i, str: bigString});
+}
+assert.commandWorked(bulk.execute());
+waitForOngoingChunkSplits(st);
+
+jsTest.log("Balance cluster before beginning defragmentation");
+
+function waitForBalanced() {
+ assert.soon(function() {
+ st.awaitBalancerRound();
+ balancerStatus =
+ assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs}));
+ return balancerStatus.balancerCompliant;
+ });
+ jsTest.log("Balancer status of " + fullNs + " : \n" + tojson(balancerStatus));
+}
+
+waitForBalanced();
+
+jsTest.log("Begin and end defragmentation with balancer off.");
+{
+ st.stopBalancer();
+ assert.commandWorked(st.s.adminCommand({
+ configureCollectionAutoSplitter: fullNs,
+ enableAutoSplitter: false,
+ balancerShouldMergeChunks: true,
+ defaultChunkSize: defaultChunkSize,
+ }));
+ // Defragmentation should not start with the balancer stopped
+ var balancerStatus =
+ assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs}));
+ assert.eq(balancerStatus.balancerCompliant, true);
+ assert.commandWorked(st.s.adminCommand({
+ configureCollectionAutoSplitter: fullNs,
+ enableAutoSplitter: false,
+ balancerShouldMergeChunks: false,
+ defaultChunkSize: defaultChunkSize,
+ }));
+ st.startBalancer();
+ st.awaitBalancerRound();
+ assert.eq(balancerStatus.balancerCompliant, true);
+ st.stopBalancer();
+}
+
+jsTest.log("Begin and end defragmentation with balancer on");
+{
+ st.startBalancer();
+ var phaseTransitionFailpoint = configureFailPoint(configPrimary, "skipPhaseTransition");
+ assert.commandWorked(st.s.adminCommand({
+ configureCollectionAutoSplitter: fullNs,
+ enableAutoSplitter: false,
+ balancerShouldMergeChunks: true,
+ defaultChunkSize: defaultChunkSize,
+ }));
+ st.awaitBalancerRound();
+ var currStatus = assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs}));
+ assert.eq(currStatus.balancerCompliant, false);
+ assert.eq(currStatus.firstComplianceViolation, 'chunksMerging');
+ assert.commandWorked(st.s.adminCommand({
+ configureCollectionAutoSplitter: fullNs,
+ enableAutoSplitter: false,
+ balancerShouldMergeChunks: false,
+ defaultChunkSize: defaultChunkSize,
+ }));
+ st.awaitBalancerRound();
+ assert.eq(balancerStatus.balancerCompliant, true);
+ st.stopBalancer();
+ phaseTransitionFailpoint.off();
+}
+
+jsTest.log("Begin defragmentation with balancer off, end with it on");
+{
+ var phaseTransitionFailpoint = configureFailPoint(configPrimary, "skipPhaseTransition");
+ assert.commandWorked(st.s.adminCommand({
+ configureCollectionAutoSplitter: fullNs,
+ enableAutoSplitter: false,
+ balancerShouldMergeChunks: true,
+ defaultChunkSize: defaultChunkSize,
+ }));
+ st.startBalancer();
+ st.awaitBalancerRound();
+ var currStatus = assert.commandWorked(st.s.adminCommand({balancerCollectionStatus: fullNs}));
+ assert.eq(currStatus.balancerCompliant, false);
+ assert.eq(currStatus.firstComplianceViolation, 'chunksMerging');
+ assert.commandWorked(st.s.adminCommand({
+ configureCollectionAutoSplitter: fullNs,
+ enableAutoSplitter: false,
+ balancerShouldMergeChunks: false,
+ defaultChunkSize: defaultChunkSize,
+ }));
+ st.awaitBalancerRound();
+ assert.eq(balancerStatus.balancerCompliant, true);
+ st.stopBalancer();
+ phaseTransitionFailpoint.off();
+}
+
+jsTest.log("Balancer on, begin defragmentation and let it complete");
+{
+ st.startBalancer();
+ assert.commandWorked(st.s.adminCommand({
+ configureCollectionAutoSplitter: fullNs,
+ enableAutoSplitter: false,
+ balancerShouldMergeChunks: true,
+ defaultChunkSize: defaultChunkSize,
+ }));
+ waitForBalanced();
+}
+
+st.stop();
+})();
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index 6b75e4a3bbc..2970498a606 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -379,22 +379,24 @@ void Balancer::_consumeActionStreamLoop() {
Grid::get(opCtx.get())->getExecutorPool()->getFixedExecutor());
while (!_stopRequested()) {
// Blocking call
- DefragmentationAction action = _defragmentationPolicy->getNextStreamingAction().get();
+ DefragmentationAction action =
+ _defragmentationPolicy->getNextStreamingAction(opCtx.get()).get();
// Non-blocking call, assumes the requests are returning a SemiFuture<>
stdx::visit(
visit_helper::Overloaded{
[&](MergeInfo mergeAction) {
- auto result =
- _commandScheduler
- ->requestMergeChunks(opCtx.get(),
- mergeAction.nss,
- mergeAction.shardId,
- mergeAction.chunkRange,
- mergeAction.collectionVersion)
- .thenRunOn(*executor)
- .onCompletion([this, mergeAction](const Status& status) {
- _defragmentationPolicy->acknowledgeMergeResult(mergeAction, status);
- });
+ auto result = _commandScheduler
+ ->requestMergeChunks(opCtx.get(),
+ mergeAction.nss,
+ mergeAction.shardId,
+ mergeAction.chunkRange,
+ mergeAction.collectionVersion)
+ .thenRunOn(*executor)
+ .onCompletion([this, mergeAction](const Status& status) {
+ auto opCtx = cc().makeOperationContext();
+ _defragmentationPolicy->acknowledgeMergeResult(
+ opCtx.get(), mergeAction, status);
+ });
},
[&](DataSizeInfo dataSizeAction) {
auto result =
@@ -407,8 +409,9 @@ void Balancer::_consumeActionStreamLoop() {
dataSizeAction.keyPattern,
dataSizeAction.estimatedValue)
.thenRunOn(*executor)
- .onCompletion([this, dataSizeAction, &opCtx](
+ .onCompletion([this, dataSizeAction](
const StatusWith<DataSizeResponse>& swDataSize) {
+ auto opCtx = cc().makeOperationContext();
_defragmentationPolicy->acknowledgeDataSizeResult(
opCtx.get(), dataSizeAction, swDataSize);
});
@@ -427,25 +430,27 @@ void Balancer::_consumeActionStreamLoop() {
.onCompletion(
[this, splitVectorAction](
const StatusWith<std::vector<BSONObj>>& swSplitPoints) {
+ auto opCtx = cc().makeOperationContext();
_defragmentationPolicy->acknowledgeAutoSplitVectorResult(
- splitVectorAction, swSplitPoints);
+ opCtx.get(), splitVectorAction, swSplitPoints);
});
},
[&](SplitInfoWithKeyPattern splitAction) {
- auto result =
- _commandScheduler
- ->requestSplitChunk(opCtx.get(),
- splitAction.info.nss,
- splitAction.info.shardId,
- splitAction.info.collectionVersion,
- splitAction.keyPattern,
- splitAction.info.minKey,
- splitAction.info.maxKey,
- splitAction.info.splitKeys)
- .thenRunOn(*executor)
- .onCompletion([this, splitAction](const Status& status) {
- _defragmentationPolicy->acknowledgeSplitResult(splitAction, status);
- });
+ auto result = _commandScheduler
+ ->requestSplitChunk(opCtx.get(),
+ splitAction.info.nss,
+ splitAction.info.shardId,
+ splitAction.info.collectionVersion,
+ splitAction.keyPattern,
+ splitAction.info.minKey,
+ splitAction.info.maxKey,
+ splitAction.info.splitKeys)
+ .thenRunOn(*executor)
+ .onCompletion([this, splitAction](const Status& status) {
+ auto opCtx = cc().makeOperationContext();
+ _defragmentationPolicy->acknowledgeSplitResult(
+ opCtx.get(), splitAction, status);
+ });
},
[](EndOfActionStream eoa) {}},
action);
@@ -544,6 +549,8 @@ void Balancer::_mainThread() {
warnOnMultiVersion(uassertStatusOK(_clusterStats->getStats(opCtx.get())));
}
+ _initializeDefragmentations(opCtx.get());
+
Status status = _splitChunksIfNeeded(opCtx.get());
if (!status.isOK()) {
LOGV2_WARNING(21878,
@@ -749,6 +756,13 @@ bool Balancer::_checkOIDs(OperationContext* opCtx) {
return true;
}
+void Balancer::_initializeDefragmentations(OperationContext* opCtx) {
+ auto collections = Grid::get(opCtx)->catalogClient()->getCollections(opCtx, {});
+ for (const auto& coll : collections) {
+ _defragmentationPolicy->refreshCollectionDefragmentationStatus(opCtx, coll);
+ }
+}
+
Status Balancer::_splitChunksIfNeeded(OperationContext* opCtx) {
auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(opCtx);
if (!chunksToSplitStatus.isOK()) {
@@ -867,10 +881,15 @@ void Balancer::notifyPersistedBalancerSettingsChanged() {
Balancer::BalancerStatus Balancer::getBalancerStatusForNs(OperationContext* opCtx,
const NamespaceString& ns) {
- const auto uuid = CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, ns);
- bool isMerging = uuid && _defragmentationPolicy->isDefragmentingCollection(*uuid);
- if (isMerging) {
- return {false, kBalancerPolicyStatusChunksMerging.toString()};
+ // TODO (SERVER-61727) update this with phase 2
+ try {
+ auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, ns, {});
+ bool isMerging = _defragmentationPolicy->isDefragmentingCollection(coll.getUuid());
+ if (isMerging) {
+ return {false, kBalancerPolicyStatusChunksMerging.toString()};
+ }
+ } catch (DBException&) {
+ // Catch exceptions to keep consistency with errors thrown before defragmentation
}
auto splitChunks = uassertStatusOK(_chunkSelectionPolicy->selectChunksToSplit(opCtx, ns));
diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h
index 48c167e7ec0..45008947c8d 100644
--- a/src/mongo/db/s/balancer/balancer.h
+++ b/src/mongo/db/s/balancer/balancer.h
@@ -258,6 +258,12 @@ private:
bool _checkOIDs(OperationContext* opCtx);
/**
+ * Queries config.collections for all collections that should be running defragmentation and
+ * passes this information to the defragmentation policy.
+ */
+ void _initializeDefragmentations(OperationContext* opCtx);
+
+ /**
* Iterates through all chunks in all collections. If the collection is the sessions collection,
* checks if the number of chunks is greater than or equal to the configured minimum number of
* chunks for the sessions collection (minNumChunksForSessionsCollection). If it isn't,
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy.h
index 61e9fc67c4a..017380fb1e1 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy.h
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/s/balancer/balancer_policy.h"
+#include "mongo/s/catalog/type_collection.h"
namespace mongo {
/**
@@ -44,16 +45,12 @@ public:
virtual ~BalancerDefragmentationPolicy() {}
/**
- * Sets the "begin of defragmentation" state on the specified collection. New actions concerning
- * the collection will be included in the stream.
+ * Checks if the collection should be running defragmentation. If a new defragmentation should
+ * be started, this will initialize the defragmentation. If defragmentation has been turned off
+ * on a collection, this will stop the defragmentation.
*/
- virtual void beginNewCollection(OperationContext* opCtx, const UUID& uuid) = 0;
-
- /**
- * Removes the specified collection from the list of namespaces to be defragmented. Actions
- * concerning the collection will stop appearing in the stream.
- */
- virtual void removeCollection(OperationContext* opCtx, const UUID& uuid) = 0;
+ virtual void refreshCollectionDefragmentationStatus(OperationContext* opCtx,
+ const CollectionType& coll) = 0;
/**
* Returns true if the specified collection is currently being defragmented.
@@ -72,7 +69,7 @@ public:
* or when there are too many outstanding actions (too many calls to getStreamingAction() that
* have not been acknowledged).
*/
- virtual SemiFuture<DefragmentationAction> getNextStreamingAction() = 0;
+ virtual SemiFuture<DefragmentationAction> getNextStreamingAction(OperationContext* opCtx) = 0;
/**
* Stops the generation of new actions: any new call to (or currently blocked ones on)
@@ -81,12 +78,18 @@ public:
*/
virtual void closeActionStream() = 0;
- virtual void acknowledgeMergeResult(MergeInfo action, const Status& result) = 0;
+ virtual void acknowledgeMergeResult(OperationContext* opCtx,
+ MergeInfo action,
+ const Status& result) = 0;
virtual void acknowledgeAutoSplitVectorResult(
- AutoSplitVectorInfo action, const StatusWith<std::vector<BSONObj>>& result) = 0;
+ OperationContext* opCtx,
+ AutoSplitVectorInfo action,
+ const StatusWith<std::vector<BSONObj>>& result) = 0;
- virtual void acknowledgeSplitResult(SplitInfoWithKeyPattern action, const Status& result) = 0;
+ virtual void acknowledgeSplitResult(OperationContext* opCtx,
+ SplitInfoWithKeyPattern action,
+ const Status& result) = 0;
virtual void acknowledgeDataSizeResult(OperationContext* opCtx,
DataSizeInfo action,
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
index 2f253255176..00b825de023 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
@@ -36,50 +36,55 @@
#include "mongo/s/grid.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(skipPhaseTransition);
-void BalancerDefragmentationPolicyImpl::beginNewCollection(OperationContext* opCtx,
- const UUID& uuid) {
- _persistPhaseUpdate(opCtx, DefragmentationPhaseEnum::kNotStarted, uuid);
- _initializeCollectionState(opCtx, uuid);
-}
+void BalancerDefragmentationPolicyImpl::refreshCollectionDefragmentationStatus(
+ OperationContext* opCtx, const CollectionType& coll) {
+ stdx::lock_guard<Latch> lk(_streamingMutex);
+ const auto& uuid = coll.getUuid();
+ if (coll.getBalancerShouldMergeChunks() && !_defragmentationStates.contains(uuid)) {
+ _initializeCollectionState(lk, opCtx, coll);
-void BalancerDefragmentationPolicyImpl::removeCollection(OperationContext* opCtx,
- const UUID& uuid) {
- _persistPhaseUpdate(opCtx, boost::none, uuid);
- _clearDataSizeInformation(opCtx, uuid);
- _defragmentationStates.erase(uuid);
+ // Load first action, this will trigger move to phase 2 if there are no phase 1 actions
+ _queueNextAction(opCtx, uuid, _defragmentationStates[uuid]);
+ // Fulfill promise if needed
+ if (_nextStreamingActionPromise) {
+ auto nextStreamingAction = _nextStreamingAction(opCtx);
+ if (nextStreamingAction) {
+ _concurrentStreamingOps++;
+ _nextStreamingActionPromise.get().setWith([&] { return *nextStreamingAction; });
+ _nextStreamingActionPromise = boost::none;
+ return;
+ }
+ }
+ } else if (!coll.getBalancerShouldMergeChunks() && _defragmentationStates.contains(uuid)) {
+ _clearDataSizeInformation(opCtx, uuid);
+ _defragmentationStates.erase(uuid);
+ _persistPhaseUpdate(opCtx, boost::none, uuid);
+ }
}
-SemiFuture<DefragmentationAction> BalancerDefragmentationPolicyImpl::getNextStreamingAction() {
+SemiFuture<DefragmentationAction> BalancerDefragmentationPolicyImpl::getNextStreamingAction(
+ OperationContext* opCtx) {
stdx::lock_guard<Latch> lk(_streamingMutex);
if (_concurrentStreamingOps < kMaxConcurrentOperations) {
- _concurrentStreamingOps++;
- if (auto action = _nextStreamingAction()) {
+ if (auto action = _nextStreamingAction(opCtx)) {
+ _concurrentStreamingOps++;
return SemiFuture<DefragmentationAction>::makeReady(*action);
}
}
- auto&& [promise, future] = makePromiseFuture<DefragmentationAction>();
+ auto [promise, future] = makePromiseFuture<DefragmentationAction>();
_nextStreamingActionPromise = std::move(promise);
return std::move(future).semi();
}
-boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextStreamingAction() {
+boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextStreamingAction(
+ OperationContext* opCtx) {
// TODO (SERVER-61635) validate fairness through collections
for (auto& [uuid, collectionData] : _defragmentationStates) {
- if (!collectionData.queuedActions.empty()) {
- auto action = collectionData.queuedActions.front();
- collectionData.queuedActions.pop();
- return action;
- }
- if (collectionData.phase == DefragmentationPhaseEnum::kMergeChunks) {
- if (auto mergeAction = _getCollectionMergeAction(collectionData)) {
- return boost::optional<DefragmentationAction>(*mergeAction);
- }
- }
- if (collectionData.phase == DefragmentationPhaseEnum::kSplitChunks) {
- if (auto splitAction = _getCollectionSplitAction(collectionData)) {
- return boost::optional<DefragmentationAction>(*splitAction);
- }
+ if (!collectionData.queuedActions.empty() ||
+ _queueNextAction(opCtx, uuid, collectionData)) {
+ return collectionData.popFromActionQueue();
}
}
boost::optional<DefragmentationAction> noAction = boost::none;
@@ -89,9 +94,43 @@ boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextS
return noAction;
}
-void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(MergeInfo action,
+bool BalancerDefragmentationPolicyImpl::_queueNextAction(
+ OperationContext* opCtx, const UUID& uuid, CollectionDefragmentationState& collectionData) {
+ // get next action within the current phase
+ switch (collectionData.phase) {
+ case DefragmentationPhaseEnum::kMergeChunks:
+ if (auto mergeAction = _getCollectionMergeAction(collectionData)) {
+ collectionData.queuedActions.push(*mergeAction);
+ return true;
+ }
+ break;
+ case DefragmentationPhaseEnum::kSplitChunks:
+ if (auto splitAction = _getCollectionSplitAction(collectionData)) {
+ collectionData.queuedActions.push(*splitAction);
+ return true;
+ }
+ break;
+ default:
+ uasserted(ErrorCodes::BadValue, "Unsupported phase type");
+ }
+ // If no action for the current phase is available, check the conditions for transitioning to
+ // the next phase
+ if (collectionData.queuedActions.empty() && collectionData.outstandingActions == 0) {
+ _transitionPhases(opCtx, uuid, collectionData);
+ }
+ return false;
+}
+
+void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext* opCtx,
+ MergeInfo action,
const Status& result) {
stdx::lock_guard<Latch> lk(_streamingMutex);
+ // Check if collection defragmentation has been canceled
+ if (!_defragmentationStates.contains(action.uuid)) {
+ return;
+ }
+ if (result.isOK())
+ _defragmentationStates[action.uuid].outstandingActions--;
boost::optional<DefragmentationAction> nextActionOnNamespace = result.isOK()
? boost::optional<DefragmentationAction>(
DataSizeInfo(action.shardId,
@@ -102,13 +141,18 @@ void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(MergeInfo action,
_defragmentationStates.at(action.uuid).collectionShardKey,
false))
: boost::optional<DefragmentationAction>(action);
- _processEndOfAction(lk, action.uuid, nextActionOnNamespace);
+ _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace);
}
void BalancerDefragmentationPolicyImpl::acknowledgeDataSizeResult(
OperationContext* opCtx, DataSizeInfo action, const StatusWith<DataSizeResponse>& result) {
stdx::lock_guard<Latch> lk(_streamingMutex);
+ // Check if collection defragmentation has been canceled
+ if (!_defragmentationStates.contains(action.uuid)) {
+ return;
+ }
if (result.isOK()) {
+ _defragmentationStates[action.uuid].outstandingActions--;
ChunkType chunk(action.uuid, action.chunkRange, action.version, action.shardId);
ShardingCatalogManager* catalogManager = ShardingCatalogManager::get(opCtx);
catalogManager->setChunkEstimatedSize(opCtx,
@@ -118,12 +162,20 @@ void BalancerDefragmentationPolicyImpl::acknowledgeDataSizeResult(
}
boost::optional<DefragmentationAction> nextActionOnNamespace =
result.isOK() ? boost::none : boost::optional<DefragmentationAction>(action);
- _processEndOfAction(lk, action.uuid, nextActionOnNamespace);
+ _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace);
}
void BalancerDefragmentationPolicyImpl::acknowledgeAutoSplitVectorResult(
- AutoSplitVectorInfo action, const StatusWith<std::vector<BSONObj>>& result) {
+ OperationContext* opCtx,
+ AutoSplitVectorInfo action,
+ const StatusWith<std::vector<BSONObj>>& result) {
stdx::lock_guard<Latch> lk(_streamingMutex);
+ // Check if collection defragmentation has been canceled
+ if (!_defragmentationStates.contains(action.uuid)) {
+ return;
+ }
+ if (result.isOK())
+ _defragmentationStates[action.uuid].outstandingActions--;
boost::optional<DefragmentationAction> nextActionOnNamespace = result.isOK()
? boost::optional<DefragmentationAction>(SplitInfoWithKeyPattern(action.shardId,
action.nss,
@@ -134,15 +186,22 @@ void BalancerDefragmentationPolicyImpl::acknowledgeAutoSplitVectorResult(
action.uuid,
action.keyPattern))
: boost::optional<DefragmentationAction>(action);
- _processEndOfAction(lk, action.uuid, nextActionOnNamespace);
+ _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace);
}
-void BalancerDefragmentationPolicyImpl::acknowledgeSplitResult(SplitInfoWithKeyPattern action,
+void BalancerDefragmentationPolicyImpl::acknowledgeSplitResult(OperationContext* opCtx,
+ SplitInfoWithKeyPattern action,
const Status& result) {
stdx::lock_guard<Latch> lk(_streamingMutex);
+ // Check if collection defragmentation has been canceled
+ if (!_defragmentationStates.contains(action.uuid)) {
+ return;
+ }
+ if (result.isOK())
+ _defragmentationStates[action.uuid].outstandingActions--;
boost::optional<DefragmentationAction> nextActionOnNamespace =
result.isOK() ? boost::none : boost::optional<DefragmentationAction>(action);
- _processEndOfAction(lk, action.uuid, nextActionOnNamespace);
+ _processEndOfAction(lk, opCtx, action.uuid, nextActionOnNamespace);
}
void BalancerDefragmentationPolicyImpl::closeActionStream() {
@@ -157,20 +216,21 @@ void BalancerDefragmentationPolicyImpl::closeActionStream() {
void BalancerDefragmentationPolicyImpl::_processEndOfAction(
WithLock,
+ OperationContext* opCtx,
const UUID& uuid,
const boost::optional<DefragmentationAction>& nextActionOnNamespace) {
- // If the end of the current action implies a next step and the related collection is still
- // being defragmented, store it
+
+ // If the end of the current action implies a next step, store it
if (nextActionOnNamespace) {
- auto collectionDefragmentationStateIt = _defragmentationStates.find(uuid);
- if (collectionDefragmentationStateIt != _defragmentationStates.end()) {
- collectionDefragmentationStateIt->second.queuedActions.push(*nextActionOnNamespace);
- }
+ _defragmentationStates.at(uuid).queuedActions.push(*nextActionOnNamespace);
}
- // If there is a client blocked on the stream, serve it now with a new action...
+ // Load next action, this will trigger phase change if needed
+ _queueNextAction(opCtx, uuid, _defragmentationStates[uuid]);
+
+ // Fulfill promise if needed
if (_nextStreamingActionPromise) {
- auto nextStreamingAction = _nextStreamingAction();
+ auto nextStreamingAction = _nextStreamingAction(opCtx);
if (nextStreamingAction) {
_nextStreamingActionPromise.get().setWith([&] { return *nextStreamingAction; });
_nextStreamingActionPromise = boost::none;
@@ -181,14 +241,53 @@ void BalancerDefragmentationPolicyImpl::_processEndOfAction(
--_concurrentStreamingOps;
}
-void BalancerDefragmentationPolicyImpl::_initializeCollectionState(OperationContext* opCtx,
- const UUID& uuid) {
+void BalancerDefragmentationPolicyImpl::_transitionPhases(
+ OperationContext* opCtx, const UUID& uuid, CollectionDefragmentationState& collectionInfo) {
+ boost::optional<DefragmentationPhaseEnum> nextPhase;
+ switch (collectionInfo.phase) {
+ case DefragmentationPhaseEnum::kMergeChunks:
+ if (MONGO_unlikely(skipPhaseTransition.shouldFail())) {
+ nextPhase = DefragmentationPhaseEnum::kMergeChunks;
+ break;
+ }
+ // TODO (SERVER-60459) Change to kMoveAndMergeChunks
+ nextPhase = boost::none;
+ break;
+ case DefragmentationPhaseEnum::kMoveAndMergeChunks:
+ // TODO (SERVER-60479) Change to kSplitChunks
+ nextPhase = boost::none;
+ break;
+ case DefragmentationPhaseEnum::kSplitChunks:
+ nextPhase = boost::none;
+ break;
+ }
+ if (nextPhase) {
+ collectionInfo.phase = nextPhase.get();
+ } else {
+ _clearDataSizeInformation(opCtx, uuid);
+ _defragmentationStates.erase(uuid);
+ }
+ _persistPhaseUpdate(opCtx, nextPhase, uuid);
+}
+
+void BalancerDefragmentationPolicyImpl::_initializeCollectionState(WithLock,
+ OperationContext* opCtx,
+ const CollectionType& coll) {
try {
- _defragmentationStates[uuid].phase = DefragmentationPhaseEnum::kNotStarted;
+ CollectionDefragmentationState newState;
+ newState.nss = coll.getNss();
+ newState.phase = coll.getDefragmentationPhase() ? coll.getDefragmentationPhase().get()
+ : DefragmentationPhaseEnum::kMergeChunks;
+ newState.collectionShardKey = coll.getKeyPattern().toBSON();
+ _persistPhaseUpdate(opCtx, newState.phase, coll.getUuid());
+ auto [_, inserted] =
+ _defragmentationStates.insert_or_assign(coll.getUuid(), std::move(newState));
+ dassert(inserted);
} catch (const DBException& e) {
LOGV2_ERROR(6153101,
"Error while starting defragmentation on collection",
- "uuid"_attr = uuid,
+ "namespace"_attr = coll.getNss(),
+ "uuid"_attr = coll.getUuid(),
"error"_attr = e);
}
}
@@ -205,8 +304,9 @@ void BalancerDefragmentationPolicyImpl::_persistPhaseUpdate(
BSON("$set" << BSON(CollectionType::kDefragmentationPhaseFieldName
<< DefragmentationPhase_serializer(*phase)))));
} else {
- entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(
- BSON("$unset" << BSON(CollectionType::kDefragmentationPhaseFieldName << ""))));
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(BSON(
+ "$unset" << BSON(CollectionType::kBalancerShouldMergeChunksFieldName
+ << "" << CollectionType::kDefragmentationPhaseFieldName << ""))));
}
return entry;
}()});
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
index 2a329a8bbfb..6adc0d8734f 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
@@ -48,14 +48,19 @@ public:
return _defragmentationStates.contains(uuid);
}
- SemiFuture<DefragmentationAction> getNextStreamingAction() override;
+ SemiFuture<DefragmentationAction> getNextStreamingAction(OperationContext* opCtx) override;
- void acknowledgeMergeResult(MergeInfo action, const Status& result) override;
+ void acknowledgeMergeResult(OperationContext* opCtx,
+ MergeInfo action,
+ const Status& result) override;
- void acknowledgeAutoSplitVectorResult(AutoSplitVectorInfo action,
+ void acknowledgeAutoSplitVectorResult(OperationContext* opCtx,
+ AutoSplitVectorInfo action,
const StatusWith<std::vector<BSONObj>>& result) override;
- void acknowledgeSplitResult(SplitInfoWithKeyPattern action, const Status& result) override;
+ void acknowledgeSplitResult(OperationContext* opCtx,
+ SplitInfoWithKeyPattern action,
+ const Status& result) override;
void acknowledgeDataSizeResult(OperationContext* opCtx,
DataSizeInfo action,
@@ -63,25 +68,48 @@ public:
void closeActionStream() override;
- void beginNewCollection(OperationContext* opCtx, const UUID& uuid) override;
-
- void removeCollection(OperationContext* opCtx, const UUID& uuid) override;
+ void refreshCollectionDefragmentationStatus(OperationContext* opCtx,
+ const CollectionType& coll) override;
private:
static constexpr int kMaxConcurrentOperations = 50;
// Data structures used to keep track of the defragmentation state.
struct CollectionDefragmentationState {
+ DefragmentationAction popFromActionQueue() {
+ auto action = queuedActions.front();
+ queuedActions.pop();
+ outstandingActions++;
+ return action;
+ };
+
NamespaceString nss;
DefragmentationPhaseEnum phase;
int64_t maxChunkSizeBytes;
BSONObj collectionShardKey;
std::queue<DefragmentationAction> queuedActions;
+ unsigned outstandingActions{0};
std::vector<ChunkType> chunkList;
ZoneInfo zones;
};
- boost::optional<DefragmentationAction> _nextStreamingAction();
+ /**
+ * Returns the next action from any collection in phase 1 or 3 or boost::none if there are no
+ * actions to perform.
+ * Must be called while holding the _streamingMutex.
+ */
+ boost::optional<DefragmentationAction> _nextStreamingAction(OperationContext* opCtx);
+
+ /**
+ * Adds next action to the collection's action queue if there is one. If there are no further
+ * actions, the queue is empty, and there are no outstanding actions for this collection, this
+ * will call _transitionPhases. Returns true if there is a new action for the collection and
+ * false otherwise.
+ * Must be called while holding the _streamingMutex.
+ */
+ bool _queueNextAction(OperationContext* opCtx,
+ const UUID& uuid,
+ CollectionDefragmentationState& collectionData);
/**
* Returns next phase 1 merge action for the collection if there is one and boost::none
@@ -102,14 +130,24 @@ private:
}
/**
+ * Move to the next phase and persist the phase change. This will end defragmentation if the
+ * current phase is the last phase.
+ * Must be called while holding the _streamingMutex.
+ */
+ void _transitionPhases(OperationContext* opCtx,
+ const UUID& uuid,
+ CollectionDefragmentationState& collectionInfo);
+
+ /**
* Build the shardToChunk map for the namespace. Requires a scan of the config.chunks
* collection.
*/
- void _initializeCollectionState(OperationContext* opCtx, const UUID& uuid);
+ void _initializeCollectionState(WithLock, OperationContext* opCtx, const CollectionType& coll);
/**
* Write the new phase to the defragmentationPhase field in config.collections. If phase is not
* set, the field will be removed.
+ * Must be called while holding the _streamingMutex.
*/
void _persistPhaseUpdate(OperationContext* opCtx,
boost::optional<DefragmentationPhaseEnum> phase,
@@ -117,10 +155,12 @@ private:
/**
* Remove all datasize fields from config.chunks for the given namespace.
+ * Must be called while holding the _streamingMutex.
*/
void _clearDataSizeInformation(OperationContext* opCtx, const UUID& uuid);
void _processEndOfAction(WithLock,
+ OperationContext* opCtx,
const UUID& uuid,
const boost::optional<DefragmentationAction>& nextActionOnNamespace);
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
index 9330be19bc5..196736e38be 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
@@ -51,11 +51,13 @@ protected:
_clusterStats(std::make_unique<ClusterStatisticsImpl>(_random)),
_defragmentationPolicy(_clusterStats.get()) {}
- void makeConfigCollectionEntry() {
+ CollectionType makeConfigCollectionEntry() {
CollectionType shardedCollection(kNss, OID::gen(), Timestamp(1, 1), Date_t::now(), kUuid);
shardedCollection.setKeyPattern(kShardKeyPattern);
+ shardedCollection.setBalancerShouldMergeChunks(true);
ASSERT_OK(insertToConfigCollection(
operationContext(), CollectionType::ConfigNS, shardedCollection.toBSON()));
+ return shardedCollection;
}
void makeConfigChunkEntry() {
@@ -81,8 +83,9 @@ protected:
};
TEST_F(BalancerDefragmentationPolicyTest, TestAddCollection) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
// Test for persistence
auto configDoc = findOneOnConfigCollection(operationContext(),
CollectionType::ConfigNS,
@@ -91,14 +94,12 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAddCollection) {
auto storedDefragmentationPhase = DefragmentationPhase_parse(
IDLParserErrorContext("BalancerDefragmentationPolicyTest"),
configDoc.getStringField(CollectionType::kDefragmentationPhaseFieldName));
- ASSERT_TRUE(storedDefragmentationPhase == DefragmentationPhaseEnum::kNotStarted);
+ ASSERT_TRUE(storedDefragmentationPhase == DefragmentationPhaseEnum::kMergeChunks);
}
-TEST_F(BalancerDefragmentationPolicyTest, TestRemoveCollection) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
- _defragmentationPolicy.removeCollection(operationContext(), kUuid);
- // Test for persistence removal
+TEST_F(BalancerDefragmentationPolicyTest, TestAddCollectionNoActions) {
+ auto coll = makeConfigCollectionEntry();
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
auto configDoc = findOneOnConfigCollection(operationContext(),
CollectionType::ConfigNS,
BSON(CollectionType::kUuidFieldName << kUuid))
@@ -107,25 +108,29 @@ TEST_F(BalancerDefragmentationPolicyTest, TestRemoveCollection) {
}
TEST_F(BalancerDefragmentationPolicyTest, TestIsDefragmentingCollection) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
ASSERT_TRUE(_defragmentationPolicy.isDefragmentingCollection(kUuid));
ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(UUID::gen()));
}
TEST_F(BalancerDefragmentationPolicyTest, TestGetNextActionNoReadyActions) {
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
ASSERT_FALSE(future.isReady());
}
TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedMergeResult) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto mergeInfo =
MergeInfo(kShardId, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey));
_defragmentationPolicy.acknowledgeMergeResult(
- mergeInfo, Status(ErrorCodes::NetworkTimeout, "Testing error response"));
+ operationContext(),
+ mergeInfo,
+ Status(ErrorCodes::NetworkTimeout, "Testing error response"));
ASSERT_TRUE(future.isReady());
DefragmentationAction streamingAction = future.get();
MergeInfo mergeAction = stdx::get<MergeInfo>(streamingAction);
@@ -133,35 +138,42 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedMergeResult) {
}
TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedSplitVectorResponse) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto splitVectorInfo = AutoSplitVectorInfo(
kShardId, kNss, kUuid, kCollectionVersion, BSONObj(), kMinKey, kMaxKey, 120);
_defragmentationPolicy.acknowledgeAutoSplitVectorResult(
- splitVectorInfo, Status(ErrorCodes::NetworkTimeout, "Testing error response"));
+ operationContext(),
+ splitVectorInfo,
+ Status(ErrorCodes::NetworkTimeout, "Testing error response"));
ASSERT_TRUE(future.isReady());
AutoSplitVectorInfo splitVectorAction = stdx::get<AutoSplitVectorInfo>(future.get());
ASSERT_EQ(splitVectorInfo.nss, splitVectorAction.nss);
}
TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedSplitAction) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto splitInfo = SplitInfoWithKeyPattern(
kShardId, kNss, kCollectionVersion, kMinKey, kMaxKey, {}, kUuid, kShardKeyPattern.toBSON());
_defragmentationPolicy.acknowledgeSplitResult(
- splitInfo, Status(ErrorCodes::NetworkTimeout, "Testing error response"));
+ operationContext(),
+ splitInfo,
+ Status(ErrorCodes::NetworkTimeout, "Testing error response"));
ASSERT_TRUE(future.isReady());
SplitInfoWithKeyPattern splitAction = stdx::get<SplitInfoWithKeyPattern>(future.get());
ASSERT_EQ(splitInfo.info.nss, splitAction.info.nss);
}
TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedDataSizeAction) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto dataSizeInfo = DataSizeInfo(kShardId,
kNss,
kUuid,
@@ -179,12 +191,13 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFailedDataSizeAction) {
}
TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulMergeAction) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto mergeInfo =
MergeInfo(kShardId, kNss, kUuid, kCollectionVersion, ChunkRange(kMinKey, kMaxKey));
- _defragmentationPolicy.acknowledgeMergeResult(mergeInfo, Status::OK());
+ _defragmentationPolicy.acknowledgeMergeResult(operationContext(), mergeInfo, Status::OK());
ASSERT_TRUE(future.isReady());
DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(future.get());
ASSERT_EQ(mergeInfo.nss, dataSizeAction.nss);
@@ -194,9 +207,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulMergeAction)
TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulAutoSplitVectorAction) {
std::vector<BSONObj> splitPoints = {BSON("x" << 4)};
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto splitVectorInfo = AutoSplitVectorInfo(kShardId,
kNss,
kUuid,
@@ -205,8 +219,9 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulAutoSplitVect
kMinKey,
kMaxKey,
2048);
- _defragmentationPolicy.acknowledgeAutoSplitVectorResult(splitVectorInfo,
- StatusWith(splitPoints));
+
+ _defragmentationPolicy.acknowledgeAutoSplitVectorResult(
+ operationContext(), splitVectorInfo, StatusWith(splitPoints));
ASSERT_TRUE(future.isReady());
SplitInfoWithKeyPattern splitAction = stdx::get<SplitInfoWithKeyPattern>(future.get());
ASSERT_EQ(splitVectorInfo.nss, splitAction.info.nss);
@@ -216,9 +231,10 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulAutoSplitVect
TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulSplitAction) {
std::vector<BSONObj> splitPoints = {BSON("x" << 4)};
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto splitInfo = SplitInfoWithKeyPattern(kShardId,
kNss,
kCollectionVersion,
@@ -227,15 +243,16 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulSplitAction)
splitPoints,
kUuid,
kShardKeyPattern.toBSON());
- _defragmentationPolicy.acknowledgeSplitResult(splitInfo, Status::OK());
+ _defragmentationPolicy.acknowledgeSplitResult(operationContext(), splitInfo, Status::OK());
ASSERT_FALSE(future.isReady());
}
TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeSuccessfulDataSizeAction) {
- makeConfigCollectionEntry();
- _defragmentationPolicy.beginNewCollection(operationContext(), kUuid);
+ auto coll = makeConfigCollectionEntry();
+ FailPointEnableBlock failpoint("skipPhaseTransition");
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
makeConfigChunkEntry();
- auto future = _defragmentationPolicy.getNextStreamingAction();
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
auto dataSizeInfo = DataSizeInfo(kShardId,
kNss,
kUuid,
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index 1393a95851e..1d038352e3f 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -631,6 +631,8 @@ void ShardingCatalogManager::configureCollectionAutoSplit(
{std::make_move_iterator(shardsIds.begin()), std::make_move_iterator(shardsIds.end())},
nss,
executor);
+
+ Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged();
}
void ShardingCatalogManager::renameShardedMetadata(
diff --git a/src/mongo/s/catalog/type_collection.h b/src/mongo/s/catalog/type_collection.h
index 2c9950750c5..d4978c16325 100644
--- a/src/mongo/s/catalog/type_collection.h
+++ b/src/mongo/s/catalog/type_collection.h
@@ -98,6 +98,7 @@ public:
using CollectionTypeBase::kUpdatedAtFieldName;
// Make getters and setters accessible.
+ using CollectionTypeBase::getDefragmentationPhase;
using CollectionTypeBase::getMaxChunkSizeBytes;
using CollectionTypeBase::getNss;
using CollectionTypeBase::getReshardingFields;
@@ -105,6 +106,8 @@ public:
using CollectionTypeBase::getTimestamp;
using CollectionTypeBase::getUnique;
using CollectionTypeBase::getUpdatedAt;
+ using CollectionTypeBase::setBalancerShouldMergeChunks;
+ using CollectionTypeBase::setDefragmentationPhase;
using CollectionTypeBase::setNss;
using CollectionTypeBase::setReshardingFields;
using CollectionTypeBase::setTimeseriesFields;
diff --git a/src/mongo/s/catalog/type_collection.idl b/src/mongo/s/catalog/type_collection.idl
index 9bb9284e5a3..bb35e7daec3 100644
--- a/src/mongo/s/catalog/type_collection.idl
+++ b/src/mongo/s/catalog/type_collection.idl
@@ -41,7 +41,6 @@ enums:
description: "The list of phases composing the Collection Chunks Defragmentation Algorithm"
type: string
values:
- kNotStarted: "notStarted"
kMergeChunks : "mergeChunks"
kMoveAndMergeChunks: "moveAndMergeChunks"
kSplitChunks: "splitChunks"