diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-26 13:27:29 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-01 17:14:20 -0400 |
commit | e83d715afbec0ba89f041ed53e7d18c8eff587e5 (patch) | |
tree | 3f085dd6e7882ca2ccde7e0ac02f35b0e3ff7f9a /src/mongo/s | |
parent | c9a70c9a28a0b2c9ddbd66b5dee2308405a6eb68 (diff) | |
download | mongo-e83d715afbec0ba89f041ed53e7d18c8eff587e5.tar.gz |
SERVER-34695: Move aggregation retry logic to command processing layer
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 234 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 73 |
3 files changed, 135 insertions, 184 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 919f3318c7e..80c45f7c5b8 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -321,28 +321,12 @@ std::vector<RemoteCursor> establishShardCursors( } } - // If we reach this point, we're either trying to establish cursors on a sharded execution - // namespace, or handling the case where a sharded collection was dropped and recreated as - // unsharded. Since views cannot be sharded, and because we will return an error rather than - // attempting to continue in the event that a recreated namespace is a view, we do not handle - // ErrorCodes::CommandOnShardedViewNotSupportedOnMongod here. - try { - return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - readPref, - requests, - false /* do not allow partial results */); - - } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>&) { - // If any shard returned a stale shardVersion error, invalidate the routing table cache. - // This will cause the cache to be refreshed the next time it is accessed. - Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*routingInfo)); - throw; - } catch (const ExceptionForCat<ErrorCategory::SnapshotError>&) { - // If any shard returned a snapshot error, recompute the atClusterTime. - throw; - } + return establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */); } struct DispatchShardPipelineResults { @@ -384,12 +368,8 @@ DispatchShardPipelineResults dispatchShardPipeline( // - First, determine whether we need to target more than one shard. If so, we split the // pipeline; if not, we retain the existing pipeline. // - Call establishShardCursors to dispatch the aggregation to the targeted shards. - // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with - // the refreshed routing table data. - // - If the pipeline is not split and we now need to target multiple shards, split it. If the - // pipeline is already split and we now only need to target a single shard, reassemble the - // original pipeline. - // - After exhausting 10 attempts to establish the cursors, we give up and throw. + // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the + // entire aggregation commmand. auto cursors = std::vector<RemoteCursor>(); auto shardResults = std::vector<AsyncRequestsSender::Response>(); auto opCtx = expCtx->opCtx; @@ -403,129 +383,102 @@ DispatchShardPipelineResults dispatchShardPipeline( auto pipelineForTargetedShards = std::move(pipeline); std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; - BSONObj targetedCommand; - int numAttempts = 0; + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); - while (++numAttempts <= kMaxNumStaleVersionRetries) { - // We need to grab a new routing table at the start of each iteration, since a stale config - // exception will invalidate the previous one. - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. + // Otherwise, uassert on all exceptions here. + if (!(liteParsedPipeline.hasChangeStream() && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + uassertStatusOK(executionNsRoutingInfoStatus); + } - // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. - // Otherwise, uassert on all exceptions here. - if (!(liteParsedPipeline.hasChangeStream() && - executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { - uassertStatusOK(executionNsRoutingInfoStatus); - } + auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() + ? std::move(executionNsRoutingInfoStatus.getValue()) + : boost::optional<CachedCollectionRoutingInfo>{}; - auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() - ? std::move(executionNsRoutingInfoStatus.getValue()) - : boost::optional<CachedCollectionRoutingInfo>{}; - - // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = - mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline); - std::set<ShardId> shardIds = getTargetedShards( - opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - - auto atClusterTime = computeAtClusterTime( - opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); - - invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized); - - // Don't need to split the pipeline if we are only targeting a single shard, unless: - // - There is a stage that needs to be run on the primary shard and the single target shard - // is not the primary. - // - The pipeline contains one or more stages which must always merge on mongoS. - const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || - (needsPrimaryShardMerge && executionNsRoutingInfo && - *shardIds.begin() != executionNsRoutingInfo->db().primaryId())); - - const bool isSplit = pipelineForTargetedShards->isSplitForShards(); - - // If we have to run on multiple shards and the pipeline is not yet split, split it. If we - // can run on a single shard and the pipeline is already split, reassemble it. - if (needsSplit && !isSplit) { - pipelineForMerging = std::move(pipelineForTargetedShards); - pipelineForTargetedShards = pipelineForMerging->splitForSharded(); - } else if (!needsSplit && isSplit) { - pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); - } + // Determine whether we can run the entire aggregation on a single shard. + const bool mustRunOnAll = + mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline); + std::set<ShardId> shardIds = getTargetedShards( + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - // Generate the command object for the targeted shards. - targetedCommand = createCommandForTargetedShards( - opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime); + auto atClusterTime = computeAtClusterTime( + opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); - // Refresh the shard registry if we're targeting all shards. We need the shard registry - // to be at least as current as the logical time used when creating the command for - // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAll) { - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { - shardRegistry->reload(opCtx); - } - } + invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized); - // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. - try { - if (expCtx->explain) { - if (mustRunOnAll) { - // Some stages (such as $currentOp) need to be broadcast to all shards, and - // should not participate in the shard version protocol. - shardResults = - scatterGatherUnversionedTargetAllShards(opCtx, - executionNss.db(), - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - } else { - // Aggregations on a real namespace should use the routing table to target - // shards, and should participate in the shard version protocol. - invariant(executionNsRoutingInfo); - shardResults = scatterGatherVersionedTargetByRoutingTable( - opCtx, - executionNss.db(), - executionNss, - *executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - shardQuery, - aggRequest.getCollation()); - } - } else { - cursors = establishShardCursors(opCtx, - executionNss, - liteParsedPipeline, - executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - shardQuery, - aggRequest.getCollation()); - } - } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) { - LOG(1) << "got stale shardVersion error " << redact(ex) << " while dispatching " - << redact(targetedCommand) << " after " << (numAttempts + 1) - << " dispatch attempts"; - continue; // Try again if allowed. - } catch (const ExceptionForCat<ErrorCategory::SnapshotError>& ex) { - LOG(1) << "got snapshot error " << redact(ex) << " while dispatching " - << redact(targetedCommand) << " after " << (numAttempts + 1) - << " dispatch attempts"; - continue; // Try again if allowed. - } + // Don't need to split the pipeline if we are only targeting a single shard, unless: + // - There is a stage that needs to be run on the primary shard and the single target shard + // is not the primary. + // - The pipeline contains one or more stages which must always merge on mongoS. + const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || + (needsPrimaryShardMerge && executionNsRoutingInfo && + *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); + + if (needsSplit) { + pipelineForMerging = std::move(pipelineForTargetedShards); + pipelineForTargetedShards = pipelineForMerging->splitForSharded(); + } - // Record the number of shards involved in the aggregation. If we are required to merge on - // the primary shard, but the primary shard was not in the set of targeted shards, then we - // must increment the number of involved shards. - CurOp::get(opCtx)->debug().nShards = - shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && - !shardIds.count(executionNsRoutingInfo->db().primaryId())); + // Generate the command object for the targeted shards. + BSONObj targetedCommand = createCommandForTargetedShards( + opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime); - break; // Success! + // Refresh the shard registry if we're targeting all shards. We need the shard registry + // to be at least as current as the logical time used when creating the command for + // $changeStream to work reliably, so we do a "hard" reload. + if (mustRunOnAll) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + shardRegistry->reload(opCtx); + } } + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAll) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and + // should not participate in the shard version protocol. + shardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db(), + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target + // shards, and should participate in the shard version protocol. + invariant(executionNsRoutingInfo); + shardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db(), + executionNss, + *executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation()); + } + } else { + cursors = establishShardCursors(opCtx, + executionNss, + liteParsedPipeline, + executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + shardQuery, + aggRequest.getCollation()); + } + + // Record the number of shards involved in the aggregation. If we are required to merge on + // the primary shard, but the primary shard was not in the set of targeted shards, then we + // must increment the number of involved shards. + CurOp::get(opCtx)->debug().nShards = + shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && + !shardIds.count(executionNsRoutingInfo->db().primaryId())); + return DispatchShardPipelineResults{needsPrimaryShardMerge, std::move(cursors), std::move(shardResults), @@ -1010,7 +963,6 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // error and hence shardId history does not need to be verified. auto atClusterTime = computeAtClusterTimeForOneShard(opCtx, shardId); - // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. cmdObj = CommandHelpers::filterCommandRequestForPassthrough( diff --git a/src/mongo/s/commands/cluster_aggregate_test.cpp b/src/mongo/s/commands/cluster_aggregate_test.cpp index 1ccf194b629..c6b79c7d346 100644 --- a/src/mongo/s/commands/cluster_aggregate_test.cpp +++ b/src/mongo/s/commands/cluster_aggregate_test.cpp @@ -194,18 +194,6 @@ TEST_F(ClusterAggregateTest, NoErrors) { runAggCommandSuccessful(kAggregateCmdScatterGather, false); } -// Verify ClusterFind::runQuery will retry on a snapshot error. -TEST_F(ClusterAggregateTest, RetryOnSnapshotError) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - // Target one shard. - runAggCommandOneError(kAggregateCmdTargeted, ErrorCodes::SnapshotUnavailable, true); - runAggCommandOneError(kAggregateCmdTargeted, ErrorCodes::SnapshotTooOld, true); - - // Target all shards - runAggCommandOneError(kAggregateCmdScatterGather, ErrorCodes::SnapshotUnavailable, false); - runAggCommandOneError(kAggregateCmdScatterGather, ErrorCodes::SnapshotTooOld, false); -} TEST_F(ClusterAggregateTest, AttachesAtClusterTimeForSnapshotReadConcern) { loadRoutingTableWithTwoChunksAndTwoShards(kNss); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 9dc6e9c50a3..c0edc37f1f3 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -72,6 +72,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_find.h" #include "mongo/s/stale_exception.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/op_msg.h" @@ -281,6 +282,8 @@ void execCommandClient(OperationContext* opCtx, } } +MONGO_FP_DECLARE(doNotRefreshShardsOnRetargettingError); + void runCommand(OperationContext* opCtx, const OpMsgRequest& request, const NetworkOp opType, @@ -358,44 +361,52 @@ void runCommand(OperationContext* opCtx, try { execCommandClient(opCtx, invocation.get(), request, &crb); return; - } catch (ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) { - const auto staleNs = [&] { - if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { - return NamespaceString(staleInfo->getns()); - } else if (auto implicitCreateInfo = - ex.extraInfo<CannotImplicitlyCreateCollectionInfo>()) { - return NamespaceString(implicitCreateInfo->getNss()); - } else { + } catch (const DBException& ex) { + if (ErrorCodes::isNeedRetargettingError(ex.code()) || + ErrorCodes::isSnapshotError(ex.code())) { + const auto staleNs = [&] { + if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { + return NamespaceString(staleInfo->getns()); + } else if (auto implicitCreateInfo = + ex.extraInfo<CannotImplicitlyCreateCollectionInfo>()) { + return NamespaceString(implicitCreateInfo->getNss()); + } else { + throw; + } + }(); + + if (staleNs.isEmpty()) { + // This should be impossible but older versions tried incorrectly to handle + // it here. + log() << "Received a stale config error with an empty namespace while " + "executing " + << redact(request.body) << " : " << redact(ex); throw; } - }(); - - if (staleNs.isEmpty()) { - // This should be impossible but older versions tried incorrectly to handle it - // here. - log() - << "Received a stale config error with an empty namespace while executing " - << redact(request.body) << " : " << redact(ex); - throw; - } - if (!canRetry) - throw; + if (!canRetry) + throw; - log() << "Retrying command " << redact(request.body) << causedBy(ex); + LOG(2) << "Retrying command " << redact(request.body) << causedBy(ex); - ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns()); - if (staleNs.isValid()) { - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); - } + if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError)) { + ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns()); + } + + if (staleNs.isValid()) { + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); + } - continue; - } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>& e) { - Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(e->getDb(), - e->getVersionReceived()); - if (!canRetry) + continue; + } else if (auto sce = ex.extraInfo<StaleDbRoutingVersion>()) { + Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion( + sce->getDb(), sce->getVersionReceived()); + if (!canRetry) + throw; + continue; + } else { throw; - continue; + } } MONGO_UNREACHABLE; } |