summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2022-01-11 14:54:15 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-11 16:01:40 +0000
commit8c6a0ab5795c16e7c236d96de17cbd6060657020 (patch)
tree2f0fc8373ad14a0d26d6a2f0eb51c93c642264b5 /src/mongo
parent550cfa96862fd47a6608deaab055f314754deab2 (diff)
downloadmongo-8c6a0ab5795c16e7c236d96de17cbd6060657020.tar.gz
SERVER-61533 Make the balancer defragmentation policy resilient to non-retryable errors
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp172
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h6
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp50
3 files changed, 178 insertions, 50 deletions
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
index 6a244a0f94e..c79df31a13e 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
@@ -35,11 +35,16 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/grid.h"
+#include <fmt/format.h>
+
+using namespace fmt::literals;
+
namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(beforeTransitioningDefragmentationPhase);
+MONGO_FAIL_POINT_DEFINE(afterBuildingNextDefragmentationPhase);
ChunkVersion getShardVersion(OperationContext* opCtx, const ShardId& shardId, const UUID& uuid) {
auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, uuid);
@@ -55,9 +60,31 @@ ChunkVersion getShardVersion(OperationContext* opCtx, const ShardId& shardId, co
coll.getTimestamp(),
repl::ReadConcernLevel::kLocalReadConcern,
boost::none));
+ uassert(ErrorCodes::BadValue,
+ "No chunks or chunk version in collection",
+ !chunkVector.empty() && chunkVector.front().isVersionSet());
return chunkVector.front().getVersion();
}
+static bool isRetriableForDefragmentation(const Status& error) {
+ return (ErrorCodes::isA<ErrorCategory::RetriableError>(error) ||
+ error == ErrorCodes::StaleShardVersion || error == ErrorCodes::StaleConfig);
+}
+
+static void handleActionResult(const Status& status,
+ std::function<void()> onSuccess,
+ std::function<void()> onRetriableError) {
+ if (status.isOK()) {
+ onSuccess();
+ return;
+ }
+ if (isRetriableForDefragmentation(status)) {
+ onRetriableError();
+ } else {
+ error_details::throwExceptionForStatus(status);
+ }
+}
+
class MergeChunksPhase : public DefragmentationPhase {
public:
static std::unique_ptr<MergeChunksPhase> build(OperationContext* opCtx,
@@ -165,32 +192,39 @@ public:
[&](const MergeInfo& mergeAction) {
auto& mergeResponse = stdx::get<Status>(response);
auto& shardingPendingActions = _pendingActionsByShards[mergeAction.shardId];
- if (mergeResponse.isOK()) {
- shardingPendingActions.rangesWithoutDataSize.emplace_back(
- mergeAction.chunkRange);
- } else {
- shardingPendingActions.rangesToMerge.emplace_back(mergeAction.chunkRange);
- }
+ handleActionResult(
+ mergeResponse,
+ [&]() {
+ shardingPendingActions.rangesWithoutDataSize.emplace_back(
+ mergeAction.chunkRange);
+ },
+ [&]() {
+ shardingPendingActions.rangesToMerge.emplace_back(
+ mergeAction.chunkRange);
+ });
},
[&](const DataSizeInfo& dataSizeAction) {
auto& dataSizeResponse = stdx::get<StatusWith<DataSizeResponse>>(response);
- if (dataSizeResponse.isOK()) {
- ChunkType chunk(dataSizeAction.uuid,
- dataSizeAction.chunkRange,
- dataSizeAction.version,
- dataSizeAction.shardId);
- auto catalogManager = ShardingCatalogManager::get(opCtx);
- catalogManager->setChunkEstimatedSize(
- opCtx,
- chunk,
- dataSizeResponse.getValue().sizeBytes,
- ShardingCatalogClient::kMajorityWriteConcern);
- } else {
- auto& shardingPendingActions =
- _pendingActionsByShards[dataSizeAction.shardId];
- shardingPendingActions.rangesWithoutDataSize.emplace_back(
- dataSizeAction.chunkRange);
- }
+ handleActionResult(
+ dataSizeResponse.getStatus(),
+ [&]() {
+ ChunkType chunk(dataSizeAction.uuid,
+ dataSizeAction.chunkRange,
+ dataSizeAction.version,
+ dataSizeAction.shardId);
+ auto catalogManager = ShardingCatalogManager::get(opCtx);
+ catalogManager->setChunkEstimatedSize(
+ opCtx,
+ chunk,
+ dataSizeResponse.getValue().sizeBytes,
+ ShardingCatalogClient::kMajorityWriteConcern);
+ },
+ [&]() {
+ auto& shardingPendingActions =
+ _pendingActionsByShards[dataSizeAction.shardId];
+ shardingPendingActions.rangesWithoutDataSize.emplace_back(
+ dataSizeAction.chunkRange);
+ });
},
[&](const AutoSplitVectorInfo& _) {
uasserted(ErrorCodes::BadValue, "Unexpected action type");
@@ -249,9 +283,8 @@ void BalancerDefragmentationPolicyImpl::refreshCollectionDefragmentationStatus(
}
}
} else if (!coll.getBalancerShouldMergeChunks() && _defragmentationStates.contains(uuid)) {
- _clearDataSizeInformation(opCtx, uuid);
+ _transitionPhases(opCtx, coll, DefragmentationPhaseEnum::kFinished);
_defragmentationStates.erase(uuid);
- _persistPhaseUpdate(opCtx, DefragmentationPhaseEnum::kFinished, uuid);
}
}
@@ -274,23 +307,32 @@ boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextS
// TODO (SERVER-61635) validate fairness through collections
for (auto it = _defragmentationStates.begin(); it != _defragmentationStates.end();) {
auto& currentCollectionDefragmentationState = it->second;
- // Phase transition if needed
- auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, it->first);
- while (currentCollectionDefragmentationState &&
- currentCollectionDefragmentationState->isComplete()) {
- currentCollectionDefragmentationState = _transitionPhases(
- opCtx, coll, _getNextPhase(currentCollectionDefragmentationState->getType()));
- }
- if (!currentCollectionDefragmentationState) {
+ try {
+ // Phase transition if needed
+ auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, it->first);
+ while (currentCollectionDefragmentationState &&
+ currentCollectionDefragmentationState->isComplete()) {
+ currentCollectionDefragmentationState = _transitionPhases(
+ opCtx, coll, _getNextPhase(currentCollectionDefragmentationState->getType()));
+ }
+ if (!currentCollectionDefragmentationState) {
+ it = _defragmentationStates.erase(it, std::next(it));
+ continue;
+ }
+ // Get next action
+ auto nextAction = currentCollectionDefragmentationState->popNextStreamableAction(opCtx);
+ if (nextAction) {
+ return nextAction;
+ }
+ ++it;
+ } catch (DBException& e) {
+ // Catch getCollection errors.
+ LOGV2_ERROR(6153301,
+ "Error while getting next defragmentation action",
+ "uuid"_attr = it->first,
+ "error"_attr = redact(e));
it = _defragmentationStates.erase(it, std::next(it));
- continue;
- }
- // Get next action
- auto nextAction = currentCollectionDefragmentationState->popNextStreamableAction(opCtx);
- if (nextAction) {
- return nextAction;
}
- ++it;
}
boost::optional<DefragmentationAction> noAction;
@@ -300,6 +342,26 @@ boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextS
return noAction;
}
+void BalancerDefragmentationPolicyImpl::_applyActionResult(
+ OperationContext* opCtx,
+ const UUID& uuid,
+ const NamespaceString& nss,
+ const DefragmentationAction& action,
+ const DefragmentationActionResponse& response) {
+ try {
+ _defragmentationStates[uuid]->applyActionResult(opCtx, action, response);
+ } catch (DBException& e) {
+ // Non-retriable error for stage found. Destroy the defragmentation state and remove from
+ // state without cleaning up.
+ LOGV2_ERROR(6153302,
+ "Defragmentation for collection ending because of non-retriable error",
+ "namespace"_attr = nss,
+ "uuid"_attr = uuid,
+ "error"_attr = redact(e));
+ _defragmentationStates.erase(uuid);
+ }
+}
+
void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext* opCtx,
MergeInfo action,
const Status& result) {
@@ -309,7 +371,8 @@ void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext*
return;
}
- _defragmentationStates[action.uuid]->applyActionResult(opCtx, action, result);
+ _applyActionResult(opCtx, action.uuid, action.nss, action, result);
+
_processEndOfAction(lk, opCtx);
}
@@ -321,7 +384,8 @@ void BalancerDefragmentationPolicyImpl::acknowledgeDataSizeResult(
return;
}
- _defragmentationStates[action.uuid]->applyActionResult(opCtx, action, result);
+ _applyActionResult(opCtx, action.uuid, action.nss, action, result);
+
_processEndOfAction(lk, opCtx);
}
@@ -333,7 +397,8 @@ void BalancerDefragmentationPolicyImpl::acknowledgeAutoSplitVectorResult(
return;
}
- _defragmentationStates[action.uuid]->applyActionResult(opCtx, action, result);
+ _applyActionResult(opCtx, action.uuid, action.nss, action, result);
+
_processEndOfAction(lk, opCtx);
}
@@ -346,7 +411,8 @@ void BalancerDefragmentationPolicyImpl::acknowledgeSplitResult(OperationContext*
return;
}
- _defragmentationStates[action.uuid]->applyActionResult(opCtx, action, result);
+ _applyActionResult(opCtx, action.uuid, action.info.nss, action, result);
+
_processEndOfAction(lk, opCtx);
}
@@ -382,6 +448,9 @@ std::unique_ptr<DefragmentationPhase> BalancerDefragmentationPolicyImpl::_transi
beforeTransitioningDefragmentationPhase.pauseWhileSet();
std::unique_ptr<DefragmentationPhase> nextPhaseObject(nullptr);
try {
+ if (shouldPersistPhase) {
+ _persistPhaseUpdate(opCtx, nextPhase, coll.getUuid());
+ }
switch (nextPhase) {
case DefragmentationPhaseEnum::kMergeChunks:
nextPhaseObject = MergeChunksPhase::build(opCtx, coll);
@@ -396,9 +465,7 @@ std::unique_ptr<DefragmentationPhase> BalancerDefragmentationPolicyImpl::_transi
_clearDataSizeInformation(opCtx, coll.getUuid());
break;
}
- if (shouldPersistPhase) {
- _persistPhaseUpdate(opCtx, nextPhase, coll.getUuid());
- }
+ afterBuildingNextDefragmentationPhase.pauseWhileSet();
} catch (const DBException& e) {
LOGV2_ERROR(6153101,
"Error while building defragmentation phase on collection",
@@ -462,7 +529,16 @@ void BalancerDefragmentationPolicyImpl::_persistPhaseUpdate(OperationContext* op
}
return entry;
}()});
- dbClient.update(updateOp);
+ auto response = dbClient.update(updateOp);
+ auto writeErrors = response.getWriteErrors();
+ if (writeErrors) {
+ BSONObj firstWriteError = writeErrors->front();
+ uasserted(ErrorCodes::Error(firstWriteError.getIntField("code")),
+ firstWriteError.getStringField("errmsg"));
+ }
+ uassert(ErrorCodes::NoMatchingDocument,
+ "Collection {} not found while persisting phase change"_format(uuid.toString()),
+ response.getN() > 0);
}
void BalancerDefragmentationPolicyImpl::_clearDataSizeInformation(OperationContext* opCtx,
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
index a1322e1a41d..1039fff331d 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
@@ -142,6 +142,12 @@ private:
*/
void _clearDataSizeInformation(OperationContext* opCtx, const UUID& uuid);
+ void _applyActionResult(OperationContext* opCtx,
+ const UUID& uuid,
+ const NamespaceString& nss,
+ const DefragmentationAction& action,
+ const DefragmentationActionResponse& response);
+
void _processEndOfAction(WithLock, OperationContext* opCtx);
Mutex _streamingMutex = MONGO_MAKE_LATCH("BalancerChunkMergerImpl::_streamingMutex");
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
index 206f40f870a..1994ac2993c 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
@@ -134,6 +134,23 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAddEmptyCollectionDoesNotTriggerDe
ASSERT_FALSE(future.isReady());
}
+TEST_F(BalancerDefragmentationPolicyTest, TestAddCollectionWhenCollectionRemovedFailsGracefully) {
+ CollectionType coll(kNss, OID::gen(), Timestamp(1, 1), Date_t::now(), kUuid);
+ coll.setKeyPattern(kShardKeyPattern);
+ coll.setBalancerShouldMergeChunks(true);
+ // Collection entry is not persisted (to simulate collection dropped), defragmentation should
+ // not begin.
+ ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid()));
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid()));
+ auto configDoc = findOneOnConfigCollection(operationContext(),
+ CollectionType::ConfigNS,
+ BSON(CollectionType::kUuidFieldName << kUuid));
+ ASSERT_EQ(configDoc.getStatus(), Status(ErrorCodes::NoMatchingDocument, "No document found"));
+}
+
+// Phase 1 tests.
+
TEST_F(BalancerDefragmentationPolicyTest, TestAddSingleChunkCollectionTriggersDataSize) {
auto coll = makeConfigCollectionEntry();
makeConfigChunkEntry();
@@ -211,8 +228,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFinalDataSizeActionEnds
ASSERT_FALSE(configCollDoc.hasField(CollectionType::kDefragmentationPhaseFieldName));
}
-// TODO (SERVER-61533) add tests to distinguish recoverable VS unrecoverable errors.
-TEST_F(BalancerDefragmentationPolicyTest, TestFailedDataSizeActionGetsReissued) {
+TEST_F(BalancerDefragmentationPolicyTest, TestRetriableFailedDataSizeActionGetsReissued) {
auto coll = makeConfigCollectionEntry();
makeConfigChunkEntry();
_defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
@@ -243,6 +259,36 @@ TEST_F(BalancerDefragmentationPolicyTest, TestFailedDataSizeActionGetsReissued)
ASSERT_FALSE(future.isReady());
}
+TEST_F(BalancerDefragmentationPolicyTest,
+ TestNonRetriableErrorEndsDefragmentationButLeavesPersistedFields) {
+ auto coll = makeConfigCollectionEntry();
+ makeConfigChunkEntry();
+ _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll);
+ auto future = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ DataSizeInfo failingDataSizeAction = stdx::get<DataSizeInfo>(future.get());
+
+ _defragmentationPolicy.acknowledgeDataSizeResult(
+ operationContext(),
+ failingDataSizeAction,
+ Status(ErrorCodes::NamespaceNotFound, "Testing error response"));
+
+ // Defragmentation should have stopped on the collection
+ ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid()));
+ // There should be no new actions.
+ future = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_FALSE(future.isReady());
+ // The defragmentation flags should still be present
+ auto configDoc = findOneOnConfigCollection(operationContext(),
+ CollectionType::ConfigNS,
+ BSON(CollectionType::kUuidFieldName << kUuid))
+ .getValue();
+ ASSERT_TRUE(configDoc.getBoolField(CollectionType::kBalancerShouldMergeChunksFieldName));
+ auto storedDefragmentationPhase = DefragmentationPhase_parse(
+ IDLParserErrorContext("BalancerDefragmentationPolicyTest"),
+ configDoc.getStringField(CollectionType::kDefragmentationPhaseFieldName));
+ ASSERT_TRUE(storedDefragmentationPhase == DefragmentationPhaseEnum::kMergeChunks);
+}
+
TEST_F(BalancerDefragmentationPolicyTest,
TestAcknowledgeMergeChunkActionsTriggersDataSizeOnResultingRange) {