diff options
author | Allison Easton <allison.easton@mongodb.com> | 2022-01-11 14:54:15 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-11 16:01:40 +0000 |
commit | 8c6a0ab5795c16e7c236d96de17cbd6060657020 (patch) | |
tree | 2f0fc8373ad14a0d26d6a2f0eb51c93c642264b5 /src/mongo | |
parent | 550cfa96862fd47a6608deaab055f314754deab2 (diff) | |
download | mongo-8c6a0ab5795c16e7c236d96de17cbd6060657020.tar.gz |
SERVER-61533 Make the balancer defragmentation policy resilient to non-retryable errors
Diffstat (limited to 'src/mongo')
3 files changed, 178 insertions, 50 deletions
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp index 6a244a0f94e..c79df31a13e 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp @@ -35,11 +35,16 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/grid.h" +#include <fmt/format.h> + +using namespace fmt::literals; + namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(beforeTransitioningDefragmentationPhase); +MONGO_FAIL_POINT_DEFINE(afterBuildingNextDefragmentationPhase); ChunkVersion getShardVersion(OperationContext* opCtx, const ShardId& shardId, const UUID& uuid) { auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, uuid); @@ -55,9 +60,31 @@ ChunkVersion getShardVersion(OperationContext* opCtx, const ShardId& shardId, co coll.getTimestamp(), repl::ReadConcernLevel::kLocalReadConcern, boost::none)); + uassert(ErrorCodes::BadValue, + "No chunks or chunk version in collection", + !chunkVector.empty() && chunkVector.front().isVersionSet()); return chunkVector.front().getVersion(); } +static bool isRetriableForDefragmentation(const Status& error) { + return (ErrorCodes::isA<ErrorCategory::RetriableError>(error) || + error == ErrorCodes::StaleShardVersion || error == ErrorCodes::StaleConfig); +} + +static void handleActionResult(const Status& status, + std::function<void()> onSuccess, + std::function<void()> onRetriableError) { + if (status.isOK()) { + onSuccess(); + return; + } + if (isRetriableForDefragmentation(status)) { + onRetriableError(); + } else { + error_details::throwExceptionForStatus(status); + } +} + class MergeChunksPhase : public DefragmentationPhase { public: static std::unique_ptr<MergeChunksPhase> build(OperationContext* opCtx, @@ -165,32 +192,39 @@ public: [&](const MergeInfo& mergeAction) { auto& mergeResponse = stdx::get<Status>(response); auto& shardingPendingActions = _pendingActionsByShards[mergeAction.shardId]; - if (mergeResponse.isOK()) { - shardingPendingActions.rangesWithoutDataSize.emplace_back( - mergeAction.chunkRange); - } else { - shardingPendingActions.rangesToMerge.emplace_back(mergeAction.chunkRange); - } + handleActionResult( + mergeResponse, + [&]() { + shardingPendingActions.rangesWithoutDataSize.emplace_back( + mergeAction.chunkRange); + }, + [&]() { + shardingPendingActions.rangesToMerge.emplace_back( + mergeAction.chunkRange); + }); }, [&](const DataSizeInfo& dataSizeAction) { auto& dataSizeResponse = stdx::get<StatusWith<DataSizeResponse>>(response); - if (dataSizeResponse.isOK()) { - ChunkType chunk(dataSizeAction.uuid, - dataSizeAction.chunkRange, - dataSizeAction.version, - dataSizeAction.shardId); - auto catalogManager = ShardingCatalogManager::get(opCtx); - catalogManager->setChunkEstimatedSize( - opCtx, - chunk, - dataSizeResponse.getValue().sizeBytes, - ShardingCatalogClient::kMajorityWriteConcern); - } else { - auto& shardingPendingActions = - _pendingActionsByShards[dataSizeAction.shardId]; - shardingPendingActions.rangesWithoutDataSize.emplace_back( - dataSizeAction.chunkRange); - } + handleActionResult( + dataSizeResponse.getStatus(), + [&]() { + ChunkType chunk(dataSizeAction.uuid, + dataSizeAction.chunkRange, + dataSizeAction.version, + dataSizeAction.shardId); + auto catalogManager = ShardingCatalogManager::get(opCtx); + catalogManager->setChunkEstimatedSize( + opCtx, + chunk, + dataSizeResponse.getValue().sizeBytes, + ShardingCatalogClient::kMajorityWriteConcern); + }, + [&]() { + auto& shardingPendingActions = + _pendingActionsByShards[dataSizeAction.shardId]; + shardingPendingActions.rangesWithoutDataSize.emplace_back( + dataSizeAction.chunkRange); + }); }, [&](const AutoSplitVectorInfo& _) { uasserted(ErrorCodes::BadValue, "Unexpected action type"); @@ -249,9 +283,8 @@ void BalancerDefragmentationPolicyImpl::refreshCollectionDefragmentationStatus( } } } else if (!coll.getBalancerShouldMergeChunks() && _defragmentationStates.contains(uuid)) { - _clearDataSizeInformation(opCtx, uuid); + _transitionPhases(opCtx, coll, DefragmentationPhaseEnum::kFinished); _defragmentationStates.erase(uuid); - _persistPhaseUpdate(opCtx, DefragmentationPhaseEnum::kFinished, uuid); } } @@ -274,23 +307,32 @@ boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextS // TODO (SERVER-61635) validate fairness through collections for (auto it = _defragmentationStates.begin(); it != _defragmentationStates.end();) { auto& currentCollectionDefragmentationState = it->second; - // Phase transition if needed - auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, it->first); - while (currentCollectionDefragmentationState && - currentCollectionDefragmentationState->isComplete()) { - currentCollectionDefragmentationState = _transitionPhases( - opCtx, coll, _getNextPhase(currentCollectionDefragmentationState->getType())); - } - if (!currentCollectionDefragmentationState) { + try { + // Phase transition if needed + auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, it->first); + while (currentCollectionDefragmentationState && + currentCollectionDefragmentationState->isComplete()) { + currentCollectionDefragmentationState = _transitionPhases( + opCtx, coll, _getNextPhase(currentCollectionDefragmentationState->getType())); + } + if (!currentCollectionDefragmentationState) { + it = _defragmentationStates.erase(it, std::next(it)); + continue; + } + // Get next action + auto nextAction = currentCollectionDefragmentationState->popNextStreamableAction(opCtx); + if (nextAction) { + return nextAction; + } + ++it; + } catch (DBException& e) { + // Catch getCollection errors. + LOGV2_ERROR(6153301, + "Error while getting next defragmentation action", + "uuid"_attr = it->first, + "error"_attr = redact(e)); it = _defragmentationStates.erase(it, std::next(it)); - continue; - } - // Get next action - auto nextAction = currentCollectionDefragmentationState->popNextStreamableAction(opCtx); - if (nextAction) { - return nextAction; } - ++it; } boost::optional<DefragmentationAction> noAction; @@ -300,6 +342,26 @@ boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::_nextS return noAction; } +void BalancerDefragmentationPolicyImpl::_applyActionResult( + OperationContext* opCtx, + const UUID& uuid, + const NamespaceString& nss, + const DefragmentationAction& action, + const DefragmentationActionResponse& response) { + try { + _defragmentationStates[uuid]->applyActionResult(opCtx, action, response); + } catch (DBException& e) { + // Non-retriable error for stage found. Destroy the defragmentation state and remove from + // state without cleaning up. + LOGV2_ERROR(6153302, + "Defragmentation for collection ending because of non-retriable error", + "namespace"_attr = nss, + "uuid"_attr = uuid, + "error"_attr = redact(e)); + _defragmentationStates.erase(uuid); + } +} + void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext* opCtx, MergeInfo action, const Status& result) { @@ -309,7 +371,8 @@ void BalancerDefragmentationPolicyImpl::acknowledgeMergeResult(OperationContext* return; } - _defragmentationStates[action.uuid]->applyActionResult(opCtx, action, result); + _applyActionResult(opCtx, action.uuid, action.nss, action, result); + _processEndOfAction(lk, opCtx); } @@ -321,7 +384,8 @@ void BalancerDefragmentationPolicyImpl::acknowledgeDataSizeResult( return; } - _defragmentationStates[action.uuid]->applyActionResult(opCtx, action, result); + _applyActionResult(opCtx, action.uuid, action.nss, action, result); + _processEndOfAction(lk, opCtx); } @@ -333,7 +397,8 @@ void BalancerDefragmentationPolicyImpl::acknowledgeAutoSplitVectorResult( return; } - _defragmentationStates[action.uuid]->applyActionResult(opCtx, action, result); + _applyActionResult(opCtx, action.uuid, action.nss, action, result); + _processEndOfAction(lk, opCtx); } @@ -346,7 +411,8 @@ void BalancerDefragmentationPolicyImpl::acknowledgeSplitResult(OperationContext* return; } - _defragmentationStates[action.uuid]->applyActionResult(opCtx, action, result); + _applyActionResult(opCtx, action.uuid, action.info.nss, action, result); + _processEndOfAction(lk, opCtx); } @@ -382,6 +448,9 @@ std::unique_ptr<DefragmentationPhase> BalancerDefragmentationPolicyImpl::_transi beforeTransitioningDefragmentationPhase.pauseWhileSet(); std::unique_ptr<DefragmentationPhase> nextPhaseObject(nullptr); try { + if (shouldPersistPhase) { + _persistPhaseUpdate(opCtx, nextPhase, coll.getUuid()); + } switch (nextPhase) { case DefragmentationPhaseEnum::kMergeChunks: nextPhaseObject = MergeChunksPhase::build(opCtx, coll); @@ -396,9 +465,7 @@ std::unique_ptr<DefragmentationPhase> BalancerDefragmentationPolicyImpl::_transi _clearDataSizeInformation(opCtx, coll.getUuid()); break; } - if (shouldPersistPhase) { - _persistPhaseUpdate(opCtx, nextPhase, coll.getUuid()); - } + afterBuildingNextDefragmentationPhase.pauseWhileSet(); } catch (const DBException& e) { LOGV2_ERROR(6153101, "Error while building defragmentation phase on collection", @@ -462,7 +529,16 @@ void BalancerDefragmentationPolicyImpl::_persistPhaseUpdate(OperationContext* op } return entry; }()}); - dbClient.update(updateOp); + auto response = dbClient.update(updateOp); + auto writeErrors = response.getWriteErrors(); + if (writeErrors) { + BSONObj firstWriteError = writeErrors->front(); + uasserted(ErrorCodes::Error(firstWriteError.getIntField("code")), + firstWriteError.getStringField("errmsg")); + } + uassert(ErrorCodes::NoMatchingDocument, + "Collection {} not found while persisting phase change"_format(uuid.toString()), + response.getN() > 0); } void BalancerDefragmentationPolicyImpl::_clearDataSizeInformation(OperationContext* opCtx, diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h index a1322e1a41d..1039fff331d 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h @@ -142,6 +142,12 @@ private: */ void _clearDataSizeInformation(OperationContext* opCtx, const UUID& uuid); + void _applyActionResult(OperationContext* opCtx, + const UUID& uuid, + const NamespaceString& nss, + const DefragmentationAction& action, + const DefragmentationActionResponse& response); + void _processEndOfAction(WithLock, OperationContext* opCtx); Mutex _streamingMutex = MONGO_MAKE_LATCH("BalancerChunkMergerImpl::_streamingMutex"); diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp index 206f40f870a..1994ac2993c 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp @@ -134,6 +134,23 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAddEmptyCollectionDoesNotTriggerDe ASSERT_FALSE(future.isReady()); } +TEST_F(BalancerDefragmentationPolicyTest, TestAddCollectionWhenCollectionRemovedFailsGracefully) { + CollectionType coll(kNss, OID::gen(), Timestamp(1, 1), Date_t::now(), kUuid); + coll.setKeyPattern(kShardKeyPattern); + coll.setBalancerShouldMergeChunks(true); + // Collection entry is not persisted (to simulate collection dropped), defragmentation should + // not begin. + ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid())); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid())); + auto configDoc = findOneOnConfigCollection(operationContext(), + CollectionType::ConfigNS, + BSON(CollectionType::kUuidFieldName << kUuid)); + ASSERT_EQ(configDoc.getStatus(), Status(ErrorCodes::NoMatchingDocument, "No document found")); +} + +// Phase 1 tests. + TEST_F(BalancerDefragmentationPolicyTest, TestAddSingleChunkCollectionTriggersDataSize) { auto coll = makeConfigCollectionEntry(); makeConfigChunkEntry(); @@ -211,8 +228,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeFinalDataSizeActionEnds ASSERT_FALSE(configCollDoc.hasField(CollectionType::kDefragmentationPhaseFieldName)); } -// TODO (SERVER-61533) add tests to distinguish recoverable VS unrecoverable errors. -TEST_F(BalancerDefragmentationPolicyTest, TestFailedDataSizeActionGetsReissued) { +TEST_F(BalancerDefragmentationPolicyTest, TestRetriableFailedDataSizeActionGetsReissued) { auto coll = makeConfigCollectionEntry(); makeConfigChunkEntry(); _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); @@ -243,6 +259,36 @@ TEST_F(BalancerDefragmentationPolicyTest, TestFailedDataSizeActionGetsReissued) ASSERT_FALSE(future.isReady()); } +TEST_F(BalancerDefragmentationPolicyTest, + TestNonRetriableErrorEndsDefragmentationButLeavesPersistedFields) { + auto coll = makeConfigCollectionEntry(); + makeConfigChunkEntry(); + _defragmentationPolicy.refreshCollectionDefragmentationStatus(operationContext(), coll); + auto future = _defragmentationPolicy.getNextStreamingAction(operationContext()); + DataSizeInfo failingDataSizeAction = stdx::get<DataSizeInfo>(future.get()); + + _defragmentationPolicy.acknowledgeDataSizeResult( + operationContext(), + failingDataSizeAction, + Status(ErrorCodes::NamespaceNotFound, "Testing error response")); + + // Defragmentation should have stopped on the collection + ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid())); + // There should be no new actions. + future = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_FALSE(future.isReady()); + // The defragmentation flags should still be present + auto configDoc = findOneOnConfigCollection(operationContext(), + CollectionType::ConfigNS, + BSON(CollectionType::kUuidFieldName << kUuid)) + .getValue(); + ASSERT_TRUE(configDoc.getBoolField(CollectionType::kBalancerShouldMergeChunksFieldName)); + auto storedDefragmentationPhase = DefragmentationPhase_parse( + IDLParserErrorContext("BalancerDefragmentationPolicyTest"), + configDoc.getStringField(CollectionType::kDefragmentationPhaseFieldName)); + ASSERT_TRUE(storedDefragmentationPhase == DefragmentationPhaseEnum::kMergeChunks); +} + TEST_F(BalancerDefragmentationPolicyTest, TestAcknowledgeMergeChunkActionsTriggersDataSizeOnResultingRange) { |