diff options
-rw-r--r-- | jstests/aggregation/shard_targeting.js | 88 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 234 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 73 |
8 files changed, 175 insertions, 291 deletions
diff --git a/jstests/aggregation/shard_targeting.js b/jstests/aggregation/shard_targeting.js index e6f9d95a84a..7ddf8ab55e1 100644 --- a/jstests/aggregation/shard_targeting.js +++ b/jstests/aggregation/shard_targeting.js @@ -45,6 +45,10 @@ assert.commandWorked(st.shard1.getDB('admin').runCommand( {configureFailPoint: 'doNotRefreshRecipientAfterCommit', mode: 'alwaysOn'})); + // Turn off automatic shard refresh in mongos when a stale config error is thrown. + assert.commandWorked(mongosForAgg.getDB('admin').runCommand( + {configureFailPoint: 'doNotRefreshShardsOnRetargettingError', mode: 'alwaysOn'})); + assert.commandWorked(mongosDB.dropDatabase()); // Enable sharding on the test DB and ensure its primary is st.shard0.shardName. @@ -164,14 +168,16 @@ // shard, get a stale config exception, and find that more than one shard is now involved. // Move the _id: [-100, 0) chunk from st.shard0.shardName to st.shard1.shardName via // mongosForMove. - assert.commandWorked(mongosForMove.getDB("admin").runCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: -50}, to: st.shard1.shardName})); + assert.commandWorked(mongosForMove.getDB("admin").runCommand({ + moveChunk: mongosColl.getFullName(), + find: {_id: -50}, + to: st.shard1.shardName, + })); // Run the same aggregation that targeted a single shard via the now-stale mongoS. It should // attempt to send the aggregation to st.shard0.shardName, hit a stale config exception, - // split the - // pipeline and redispatch. We append an $_internalSplitPipeline stage in order to force a - // shard merge rather than a mongoS merge. + // split the pipeline and redispatch. We append an $_internalSplitPipeline stage in order to + // force a shard merge rather than a mongoS merge. testName = "agg_shard_targeting_backout_passthrough_and_split_if_cache_is_stale"; assert.eq(mongosColl .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}] @@ -184,24 +190,21 @@ // Before the first dispatch: // - mongosForMove and st.shard0.shardName (the donor shard) are up to date. // - mongosForAgg and st.shard1.shardName are stale. mongosForAgg incorrectly believes that - // the - // necessary data is all on st.shard0.shardName. + // the necessary data is all on st.shard0.shardName. + // // We therefore expect that: // - mongosForAgg will throw a stale config error when it attempts to establish a // single-shard cursor on st.shard0.shardName (attempt 1). - // - mongosForAgg will back out, refresh itself, split the pipeline and redispatch to both - // shards. + // - mongosForAgg will back out, refresh itself, and redispatch to both shards. // - st.shard1.shardName will throw a stale config and refresh itself when the split - // pipeline is sent - // to it (attempt 2). - // - mongosForAgg will back out, retain the split pipeline and redispatch (attempt 3). + // pipeline is sent to it (attempt 2). + // - mongosForAgg will back out and redispatch (attempt 3). // - The aggregation will succeed on the third dispatch. // We confirm this behaviour via the following profiler results: // - One aggregation on st.shard0.shardName with a shard version exception (indicating that - // the mongoS - // was stale). + // the mongoS was stale). profilerHasSingleMatchingEntryOrThrow({ profileDB: shard0DB, filter: { @@ -213,8 +216,7 @@ }); // - One aggregation on st.shard1.shardName with a shard version exception (indicating that - // the shard - // was stale). + // the shard was stale). profilerHasSingleMatchingEntryOrThrow({ profileDB: shard1DB, filter: { @@ -226,11 +228,9 @@ }); // - At most two aggregations on st.shard0.shardName with no stale config exceptions. The - // first, if - // present, is an aborted cursor created if the command reaches st.shard0.shardName before - // st.shard1.shardName - // throws its stale config exception during attempt 2. The second profiler entry is from the - // aggregation which succeeded. + // first, if present, is an aborted cursor created if the command reaches + // st.shard0.shardName before st.shard1.shardName throws its stale config exception during + // attempt 2. The second profiler entry is from the aggregation which succeeded. profilerHasAtLeastOneAtMostNumMatchingEntriesOrThrow({ profileDB: shard0DB, filter: { @@ -254,8 +254,7 @@ }); // - One $mergeCursors aggregation on primary st.shard0.shardName, since we eventually - // target both - // shards after backing out the passthrough and splitting the pipeline. + // target both shards after backing out the passthrough and splitting the pipeline. profilerHasSingleMatchingEntryOrThrow({ profileDB: primaryShardDB, filter: { @@ -265,13 +264,14 @@ } }); - // Test that a split pipeline will back out and reassemble the pipeline if we target - // multiple shards, get a stale config exception, and find that we can now target a single - // shard. // Move the _id: [-100, 0) chunk back from st.shard1.shardName to st.shard0.shardName via - // mongosForMove. - assert.commandWorked(mongosForMove.getDB("admin").runCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: -50}, to: st.shard0.shardName})); + // mongosForMove. Shard0 and mongosForAgg are now stale. + assert.commandWorked(mongosForMove.getDB("admin").runCommand({ + moveChunk: mongosColl.getFullName(), + find: {_id: -50}, + to: st.shard0.shardName, + _waitForDelete: true + })); // Run the same aggregation via the now-stale mongoS. It should split the pipeline, hit a // stale config exception, and reset to the original single-shard pipeline upon refresh. We @@ -289,25 +289,21 @@ // Before the first dispatch: // - mongosForMove and st.shard1.shardName (the donor shard) are up to date. // - mongosForAgg and st.shard0.shardName are stale. mongosForAgg incorrectly believes that - // the - // necessary data is spread across both shards. + // the necessary data is spread across both shards. + // // We therefore expect that: // - mongosForAgg will throw a stale config error when it attempts to establish a cursor on // st.shard1.shardName (attempt 1). - // - mongosForAgg will back out, refresh itself, coalesce the split pipeline into a single - // pipeline and redispatch to st.shard0.shardName. + // - mongosForAgg will back out, refresh itself, and redispatch to st.shard0.shardName. // - st.shard0.shardName will throw a stale config and refresh itself when the pipeline is - // sent to it - // (attempt 2). - // - mongosForAgg will back out, retain the single-shard pipeline and redispatch (attempt - // 3). + // sent to it (attempt 2). + // - mongosForAgg will back out, and redispatch (attempt 3). // - The aggregation will succeed on the third dispatch. // We confirm this behaviour via the following profiler results: // - One aggregation on st.shard1.shardName with a shard version exception (indicating that - // the mongoS - // was stale). + // the mongoS was stale). profilerHasSingleMatchingEntryOrThrow({ profileDB: shard1DB, filter: { @@ -319,8 +315,7 @@ }); // - One aggregation on st.shard0.shardName with a shard version exception (indicating that - // the shard - // was stale). + // the shard was stale). profilerHasSingleMatchingEntryOrThrow({ profileDB: shard0DB, filter: { @@ -332,11 +327,9 @@ }); // - At most two aggregations on st.shard0.shardName with no stale config exceptions. The - // first, if - // present, is an aborted cursor created if the command reaches st.shard0.shardName before - // st.shard1.shardName - // throws its stale config exception during attempt 1. The second profiler entry is the - // aggregation which succeeded. + // first, if present, is an aborted cursor created if the command reaches + // st.shard0.shardName before st.shard1.shardName throws its stale config exception during + // attempt 1. The second profiler entry is the aggregation which succeeded. profilerHasAtLeastOneAtMostNumMatchingEntriesOrThrow({ profileDB: shard0DB, filter: { @@ -349,8 +342,7 @@ }); // No $mergeCursors aggregation on primary st.shard0.shardName, since after backing out the - // split - // pipeline we eventually target only st.shard0.shardName. + // split pipeline we eventually target only st.shard0.shardName. profilerHasZeroMatchingEntriesOrThrow({ profileDB: primaryShardDB, filter: { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index c784bdd624f..e63f15c147f 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -318,7 +318,6 @@ void Pipeline::dispose(OperationContext* opCtx) { std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() { invariant(!isSplitForShards()); invariant(!isSplitForMerge()); - invariant(!_unsplitSources); // Create and initialize the shard spec we'll return. We start with an empty pipeline on the // shards and all work being done in the merger. Optimizations can move operations between @@ -326,9 +325,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() { std::unique_ptr<Pipeline, PipelineDeleter> shardPipeline(new Pipeline(pCtx), PipelineDeleter(pCtx->opCtx)); - // Keep a copy of the original source list in case we need to reset the pipeline from split to - // unsplit later. - shardPipeline->_unsplitSources.emplace(_sources); cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this); shardPipeline->_splitState = SplitState::kSplitForShards; _splitState = SplitState::kSplitForMerge; @@ -338,28 +334,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() { return shardPipeline; } -void Pipeline::unsplitFromSharded( - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMergingShard) { - invariant(isSplitForShards()); - invariant(!isSplitForMerge()); - invariant(pipelineForMergingShard); - invariant(_unsplitSources); - - // Clear the merge source list so that destroying the pipeline object won't dispose of the - // stages. We still have a reference to each of the stages which will be moved back to the shard - // pipeline via '_unsplitSources'. - pipelineForMergingShard->_sources.clear(); - pipelineForMergingShard.reset(); - - // Set '_sources' to its original state, re-stitch, and clear the '_unsplitSources' optional. - _sources = *_unsplitSources; - _unsplitSources.reset(); - - _splitState = SplitState::kUnsplit; - - stitch(); -} - BSONObj Pipeline::getInitialQuery() const { if (_sources.empty()) return BSONObj(); diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 04b5769792d..22a8f8a8f88 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -165,14 +165,6 @@ public: std::unique_ptr<Pipeline, PipelineDeleter> splitForSharded(); /** - * Reassemble a split shard pipeline into its original form. Upon return, this pipeline will - * contain the original source list. Must be called on the shards part of a split pipeline - * returned by a call to splitForSharded(). It is an error to call this on the merge part of the - * pipeline, or on a pipeline that has not been split. - */ - void unsplitFromSharded(std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMergingShard); - - /** * Returns true if this pipeline has not been split. */ bool isUnsplit() const { @@ -384,12 +376,6 @@ private: SourceContainer _sources; - // When a pipeline is split via splitForSharded(), the resulting shards pipeline will set - // '_unsplitSources' to be the original list of DocumentSources representing the full pipeline. - // This is to allow the split pipelines to be subsequently reassembled into the original - // pipeline, if necessary. - boost::optional<SourceContainer> _unsplitSources; - SplitState _splitState = SplitState::kUnsplit; boost::intrusive_ptr<ExpressionContext> pCtx; bool _disposed = false; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 23497f83e41..c53262cfc47 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1757,17 +1757,6 @@ public: mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx)); mergePipe->optimizePipeline(); - auto beforeSplit = Value(mergePipe->serialize()); - - shardPipe = mergePipe->splitForSharded(); - ASSERT(shardPipe); - - shardPipe->unsplitFromSharded(std::move(mergePipe)); - ASSERT_FALSE(mergePipe); - - ASSERT_VALUE_EQ(Value(shardPipe->serialize()), beforeSplit); - - mergePipe = std::move(shardPipe); shardPipe = mergePipe->splitForSharded(); ASSERT(shardPipe); diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 2eb10e17133..e449aad0814 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -34,7 +34,6 @@ #include "mongo/bson/bsontypes.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/chunk_version.h" namespace mongo { @@ -115,13 +114,6 @@ CursorResponse::CursorResponse(NamespaceString nss, StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdResponse) { Status cmdStatus = getStatusFromCommandResult(cmdResponse); if (!cmdStatus.isOK()) { - if (ErrorCodes::isStaleShardVersionError(cmdStatus.code())) { - auto vWanted = ChunkVersion::fromBSON(cmdResponse, "vWanted"); - auto vReceived = ChunkVersion::fromBSON(cmdResponse, "vReceived"); - if (!vWanted.hasEqualEpoch(vReceived)) { - return Status(ErrorCodes::StaleEpoch, cmdStatus.reason()); - } - } return cmdStatus; } diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 919f3318c7e..80c45f7c5b8 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -321,28 +321,12 @@ std::vector<RemoteCursor> establishShardCursors( } } - // If we reach this point, we're either trying to establish cursors on a sharded execution - // namespace, or handling the case where a sharded collection was dropped and recreated as - // unsharded. Since views cannot be sharded, and because we will return an error rather than - // attempting to continue in the event that a recreated namespace is a view, we do not handle - // ErrorCodes::CommandOnShardedViewNotSupportedOnMongod here. - try { - return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - readPref, - requests, - false /* do not allow partial results */); - - } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>&) { - // If any shard returned a stale shardVersion error, invalidate the routing table cache. - // This will cause the cache to be refreshed the next time it is accessed. - Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*routingInfo)); - throw; - } catch (const ExceptionForCat<ErrorCategory::SnapshotError>&) { - // If any shard returned a snapshot error, recompute the atClusterTime. - throw; - } + return establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */); } struct DispatchShardPipelineResults { @@ -384,12 +368,8 @@ DispatchShardPipelineResults dispatchShardPipeline( // - First, determine whether we need to target more than one shard. If so, we split the // pipeline; if not, we retain the existing pipeline. // - Call establishShardCursors to dispatch the aggregation to the targeted shards. - // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with - // the refreshed routing table data. - // - If the pipeline is not split and we now need to target multiple shards, split it. If the - // pipeline is already split and we now only need to target a single shard, reassemble the - // original pipeline. - // - After exhausting 10 attempts to establish the cursors, we give up and throw. + // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the + // entire aggregation commmand. auto cursors = std::vector<RemoteCursor>(); auto shardResults = std::vector<AsyncRequestsSender::Response>(); auto opCtx = expCtx->opCtx; @@ -403,129 +383,102 @@ DispatchShardPipelineResults dispatchShardPipeline( auto pipelineForTargetedShards = std::move(pipeline); std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; - BSONObj targetedCommand; - int numAttempts = 0; + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); - while (++numAttempts <= kMaxNumStaleVersionRetries) { - // We need to grab a new routing table at the start of each iteration, since a stale config - // exception will invalidate the previous one. - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. + // Otherwise, uassert on all exceptions here. + if (!(liteParsedPipeline.hasChangeStream() && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + uassertStatusOK(executionNsRoutingInfoStatus); + } - // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. - // Otherwise, uassert on all exceptions here. - if (!(liteParsedPipeline.hasChangeStream() && - executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { - uassertStatusOK(executionNsRoutingInfoStatus); - } + auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() + ? std::move(executionNsRoutingInfoStatus.getValue()) + : boost::optional<CachedCollectionRoutingInfo>{}; - auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() - ? std::move(executionNsRoutingInfoStatus.getValue()) - : boost::optional<CachedCollectionRoutingInfo>{}; - - // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = - mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline); - std::set<ShardId> shardIds = getTargetedShards( - opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - - auto atClusterTime = computeAtClusterTime( - opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); - - invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized); - - // Don't need to split the pipeline if we are only targeting a single shard, unless: - // - There is a stage that needs to be run on the primary shard and the single target shard - // is not the primary. - // - The pipeline contains one or more stages which must always merge on mongoS. - const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || - (needsPrimaryShardMerge && executionNsRoutingInfo && - *shardIds.begin() != executionNsRoutingInfo->db().primaryId())); - - const bool isSplit = pipelineForTargetedShards->isSplitForShards(); - - // If we have to run on multiple shards and the pipeline is not yet split, split it. If we - // can run on a single shard and the pipeline is already split, reassemble it. - if (needsSplit && !isSplit) { - pipelineForMerging = std::move(pipelineForTargetedShards); - pipelineForTargetedShards = pipelineForMerging->splitForSharded(); - } else if (!needsSplit && isSplit) { - pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); - } + // Determine whether we can run the entire aggregation on a single shard. + const bool mustRunOnAll = + mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline); + std::set<ShardId> shardIds = getTargetedShards( + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - // Generate the command object for the targeted shards. - targetedCommand = createCommandForTargetedShards( - opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime); + auto atClusterTime = computeAtClusterTime( + opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); - // Refresh the shard registry if we're targeting all shards. We need the shard registry - // to be at least as current as the logical time used when creating the command for - // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAll) { - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { - shardRegistry->reload(opCtx); - } - } + invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized); - // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. - try { - if (expCtx->explain) { - if (mustRunOnAll) { - // Some stages (such as $currentOp) need to be broadcast to all shards, and - // should not participate in the shard version protocol. - shardResults = - scatterGatherUnversionedTargetAllShards(opCtx, - executionNss.db(), - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - } else { - // Aggregations on a real namespace should use the routing table to target - // shards, and should participate in the shard version protocol. - invariant(executionNsRoutingInfo); - shardResults = scatterGatherVersionedTargetByRoutingTable( - opCtx, - executionNss.db(), - executionNss, - *executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - shardQuery, - aggRequest.getCollation()); - } - } else { - cursors = establishShardCursors(opCtx, - executionNss, - liteParsedPipeline, - executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - shardQuery, - aggRequest.getCollation()); - } - } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) { - LOG(1) << "got stale shardVersion error " << redact(ex) << " while dispatching " - << redact(targetedCommand) << " after " << (numAttempts + 1) - << " dispatch attempts"; - continue; // Try again if allowed. - } catch (const ExceptionForCat<ErrorCategory::SnapshotError>& ex) { - LOG(1) << "got snapshot error " << redact(ex) << " while dispatching " - << redact(targetedCommand) << " after " << (numAttempts + 1) - << " dispatch attempts"; - continue; // Try again if allowed. - } + // Don't need to split the pipeline if we are only targeting a single shard, unless: + // - There is a stage that needs to be run on the primary shard and the single target shard + // is not the primary. + // - The pipeline contains one or more stages which must always merge on mongoS. + const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || + (needsPrimaryShardMerge && executionNsRoutingInfo && + *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); + + if (needsSplit) { + pipelineForMerging = std::move(pipelineForTargetedShards); + pipelineForTargetedShards = pipelineForMerging->splitForSharded(); + } - // Record the number of shards involved in the aggregation. If we are required to merge on - // the primary shard, but the primary shard was not in the set of targeted shards, then we - // must increment the number of involved shards. - CurOp::get(opCtx)->debug().nShards = - shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && - !shardIds.count(executionNsRoutingInfo->db().primaryId())); + // Generate the command object for the targeted shards. + BSONObj targetedCommand = createCommandForTargetedShards( + opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime); - break; // Success! + // Refresh the shard registry if we're targeting all shards. We need the shard registry + // to be at least as current as the logical time used when creating the command for + // $changeStream to work reliably, so we do a "hard" reload. + if (mustRunOnAll) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + shardRegistry->reload(opCtx); + } } + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAll) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and + // should not participate in the shard version protocol. + shardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db(), + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target + // shards, and should participate in the shard version protocol. + invariant(executionNsRoutingInfo); + shardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db(), + executionNss, + *executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation()); + } + } else { + cursors = establishShardCursors(opCtx, + executionNss, + liteParsedPipeline, + executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + shardQuery, + aggRequest.getCollation()); + } + + // Record the number of shards involved in the aggregation. If we are required to merge on + // the primary shard, but the primary shard was not in the set of targeted shards, then we + // must increment the number of involved shards. + CurOp::get(opCtx)->debug().nShards = + shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && + !shardIds.count(executionNsRoutingInfo->db().primaryId())); + return DispatchShardPipelineResults{needsPrimaryShardMerge, std::move(cursors), std::move(shardResults), @@ -1010,7 +963,6 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // error and hence shardId history does not need to be verified. auto atClusterTime = computeAtClusterTimeForOneShard(opCtx, shardId); - // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. cmdObj = CommandHelpers::filterCommandRequestForPassthrough( diff --git a/src/mongo/s/commands/cluster_aggregate_test.cpp b/src/mongo/s/commands/cluster_aggregate_test.cpp index 1ccf194b629..c6b79c7d346 100644 --- a/src/mongo/s/commands/cluster_aggregate_test.cpp +++ b/src/mongo/s/commands/cluster_aggregate_test.cpp @@ -194,18 +194,6 @@ TEST_F(ClusterAggregateTest, NoErrors) { runAggCommandSuccessful(kAggregateCmdScatterGather, false); } -// Verify ClusterFind::runQuery will retry on a snapshot error. -TEST_F(ClusterAggregateTest, RetryOnSnapshotError) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - // Target one shard. - runAggCommandOneError(kAggregateCmdTargeted, ErrorCodes::SnapshotUnavailable, true); - runAggCommandOneError(kAggregateCmdTargeted, ErrorCodes::SnapshotTooOld, true); - - // Target all shards - runAggCommandOneError(kAggregateCmdScatterGather, ErrorCodes::SnapshotUnavailable, false); - runAggCommandOneError(kAggregateCmdScatterGather, ErrorCodes::SnapshotTooOld, false); -} TEST_F(ClusterAggregateTest, AttachesAtClusterTimeForSnapshotReadConcern) { loadRoutingTableWithTwoChunksAndTwoShards(kNss); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 9dc6e9c50a3..c0edc37f1f3 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -72,6 +72,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_find.h" #include "mongo/s/stale_exception.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/op_msg.h" @@ -281,6 +282,8 @@ void execCommandClient(OperationContext* opCtx, } } +MONGO_FP_DECLARE(doNotRefreshShardsOnRetargettingError); + void runCommand(OperationContext* opCtx, const OpMsgRequest& request, const NetworkOp opType, @@ -358,44 +361,52 @@ void runCommand(OperationContext* opCtx, try { execCommandClient(opCtx, invocation.get(), request, &crb); return; - } catch (ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) { - const auto staleNs = [&] { - if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { - return NamespaceString(staleInfo->getns()); - } else if (auto implicitCreateInfo = - ex.extraInfo<CannotImplicitlyCreateCollectionInfo>()) { - return NamespaceString(implicitCreateInfo->getNss()); - } else { + } catch (const DBException& ex) { + if (ErrorCodes::isNeedRetargettingError(ex.code()) || + ErrorCodes::isSnapshotError(ex.code())) { + const auto staleNs = [&] { + if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { + return NamespaceString(staleInfo->getns()); + } else if (auto implicitCreateInfo = + ex.extraInfo<CannotImplicitlyCreateCollectionInfo>()) { + return NamespaceString(implicitCreateInfo->getNss()); + } else { + throw; + } + }(); + + if (staleNs.isEmpty()) { + // This should be impossible but older versions tried incorrectly to handle + // it here. + log() << "Received a stale config error with an empty namespace while " + "executing " + << redact(request.body) << " : " << redact(ex); throw; } - }(); - - if (staleNs.isEmpty()) { - // This should be impossible but older versions tried incorrectly to handle it - // here. - log() - << "Received a stale config error with an empty namespace while executing " - << redact(request.body) << " : " << redact(ex); - throw; - } - if (!canRetry) - throw; + if (!canRetry) + throw; - log() << "Retrying command " << redact(request.body) << causedBy(ex); + LOG(2) << "Retrying command " << redact(request.body) << causedBy(ex); - ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns()); - if (staleNs.isValid()) { - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); - } + if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError)) { + ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns()); + } + + if (staleNs.isValid()) { + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); + } - continue; - } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>& e) { - Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(e->getDb(), - e->getVersionReceived()); - if (!canRetry) + continue; + } else if (auto sce = ex.extraInfo<StaleDbRoutingVersion>()) { + Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion( + sce->getDb(), sce->getVersionReceived()); + if (!canRetry) + throw; + continue; + } else { throw; - continue; + } } MONGO_UNREACHABLE; } |