summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-26 13:27:29 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-01 17:14:20 -0400
commite83d715afbec0ba89f041ed53e7d18c8eff587e5 (patch)
tree3f085dd6e7882ca2ccde7e0ac02f35b0e3ff7f9a
parentc9a70c9a28a0b2c9ddbd66b5dee2308405a6eb68 (diff)
downloadmongo-e83d715afbec0ba89f041ed53e7d18c8eff587e5.tar.gz
SERVER-34695: Move aggregation retry logic to command processing layer
-rw-r--r--jstests/aggregation/shard_targeting.js88
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp26
-rw-r--r--src/mongo/db/pipeline/pipeline.h14
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp11
-rw-r--r--src/mongo/db/query/cursor_response.cpp8
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp234
-rw-r--r--src/mongo/s/commands/cluster_aggregate_test.cpp12
-rw-r--r--src/mongo/s/commands/strategy.cpp73
8 files changed, 175 insertions, 291 deletions
diff --git a/jstests/aggregation/shard_targeting.js b/jstests/aggregation/shard_targeting.js
index e6f9d95a84a..7ddf8ab55e1 100644
--- a/jstests/aggregation/shard_targeting.js
+++ b/jstests/aggregation/shard_targeting.js
@@ -45,6 +45,10 @@
assert.commandWorked(st.shard1.getDB('admin').runCommand(
{configureFailPoint: 'doNotRefreshRecipientAfterCommit', mode: 'alwaysOn'}));
+ // Turn off automatic shard refresh in mongos when a stale config error is thrown.
+ assert.commandWorked(mongosForAgg.getDB('admin').runCommand(
+ {configureFailPoint: 'doNotRefreshShardsOnRetargettingError', mode: 'alwaysOn'}));
+
assert.commandWorked(mongosDB.dropDatabase());
// Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
@@ -164,14 +168,16 @@
// shard, get a stale config exception, and find that more than one shard is now involved.
// Move the _id: [-100, 0) chunk from st.shard0.shardName to st.shard1.shardName via
// mongosForMove.
- assert.commandWorked(mongosForMove.getDB("admin").runCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: -50}, to: st.shard1.shardName}));
+ assert.commandWorked(mongosForMove.getDB("admin").runCommand({
+ moveChunk: mongosColl.getFullName(),
+ find: {_id: -50},
+ to: st.shard1.shardName,
+ }));
// Run the same aggregation that targeted a single shard via the now-stale mongoS. It should
// attempt to send the aggregation to st.shard0.shardName, hit a stale config exception,
- // split the
- // pipeline and redispatch. We append an $_internalSplitPipeline stage in order to force a
- // shard merge rather than a mongoS merge.
+ // split the pipeline and redispatch. We append an $_internalSplitPipeline stage in order to
+ // force a shard merge rather than a mongoS merge.
testName = "agg_shard_targeting_backout_passthrough_and_split_if_cache_is_stale";
assert.eq(mongosColl
.aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}]
@@ -184,24 +190,21 @@
// Before the first dispatch:
// - mongosForMove and st.shard0.shardName (the donor shard) are up to date.
// - mongosForAgg and st.shard1.shardName are stale. mongosForAgg incorrectly believes that
- // the
- // necessary data is all on st.shard0.shardName.
+ // the necessary data is all on st.shard0.shardName.
+ //
// We therefore expect that:
// - mongosForAgg will throw a stale config error when it attempts to establish a
// single-shard cursor on st.shard0.shardName (attempt 1).
- // - mongosForAgg will back out, refresh itself, split the pipeline and redispatch to both
- // shards.
+ // - mongosForAgg will back out, refresh itself, and redispatch to both shards.
// - st.shard1.shardName will throw a stale config and refresh itself when the split
- // pipeline is sent
- // to it (attempt 2).
- // - mongosForAgg will back out, retain the split pipeline and redispatch (attempt 3).
+ // pipeline is sent to it (attempt 2).
+ // - mongosForAgg will back out and redispatch (attempt 3).
// - The aggregation will succeed on the third dispatch.
// We confirm this behaviour via the following profiler results:
// - One aggregation on st.shard0.shardName with a shard version exception (indicating that
- // the mongoS
- // was stale).
+ // the mongoS was stale).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {
@@ -213,8 +216,7 @@
});
// - One aggregation on st.shard1.shardName with a shard version exception (indicating that
- // the shard
- // was stale).
+ // the shard was stale).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard1DB,
filter: {
@@ -226,11 +228,9 @@
});
// - At most two aggregations on st.shard0.shardName with no stale config exceptions. The
- // first, if
- // present, is an aborted cursor created if the command reaches st.shard0.shardName before
- // st.shard1.shardName
- // throws its stale config exception during attempt 2. The second profiler entry is from the
- // aggregation which succeeded.
+ // first, if present, is an aborted cursor created if the command reaches
+ // st.shard0.shardName before st.shard1.shardName throws its stale config exception during
+ // attempt 2. The second profiler entry is from the aggregation which succeeded.
profilerHasAtLeastOneAtMostNumMatchingEntriesOrThrow({
profileDB: shard0DB,
filter: {
@@ -254,8 +254,7 @@
});
// - One $mergeCursors aggregation on primary st.shard0.shardName, since we eventually
- // target both
- // shards after backing out the passthrough and splitting the pipeline.
+ // target both shards after backing out the passthrough and splitting the pipeline.
profilerHasSingleMatchingEntryOrThrow({
profileDB: primaryShardDB,
filter: {
@@ -265,13 +264,14 @@
}
});
- // Test that a split pipeline will back out and reassemble the pipeline if we target
- // multiple shards, get a stale config exception, and find that we can now target a single
- // shard.
// Move the _id: [-100, 0) chunk back from st.shard1.shardName to st.shard0.shardName via
- // mongosForMove.
- assert.commandWorked(mongosForMove.getDB("admin").runCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: -50}, to: st.shard0.shardName}));
+ // mongosForMove. Shard0 and mongosForAgg are now stale.
+ assert.commandWorked(mongosForMove.getDB("admin").runCommand({
+ moveChunk: mongosColl.getFullName(),
+ find: {_id: -50},
+ to: st.shard0.shardName,
+ _waitForDelete: true
+ }));
// Run the same aggregation via the now-stale mongoS. It should split the pipeline, hit a
// stale config exception, and reset to the original single-shard pipeline upon refresh. We
@@ -289,25 +289,21 @@
// Before the first dispatch:
// - mongosForMove and st.shard1.shardName (the donor shard) are up to date.
// - mongosForAgg and st.shard0.shardName are stale. mongosForAgg incorrectly believes that
- // the
- // necessary data is spread across both shards.
+ // the necessary data is spread across both shards.
+ //
// We therefore expect that:
// - mongosForAgg will throw a stale config error when it attempts to establish a cursor on
// st.shard1.shardName (attempt 1).
- // - mongosForAgg will back out, refresh itself, coalesce the split pipeline into a single
- // pipeline and redispatch to st.shard0.shardName.
+ // - mongosForAgg will back out, refresh itself, and redispatch to st.shard0.shardName.
// - st.shard0.shardName will throw a stale config and refresh itself when the pipeline is
- // sent to it
- // (attempt 2).
- // - mongosForAgg will back out, retain the single-shard pipeline and redispatch (attempt
- // 3).
+ // sent to it (attempt 2).
+ // - mongosForAgg will back out, and redispatch (attempt 3).
// - The aggregation will succeed on the third dispatch.
// We confirm this behaviour via the following profiler results:
// - One aggregation on st.shard1.shardName with a shard version exception (indicating that
- // the mongoS
- // was stale).
+ // the mongoS was stale).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard1DB,
filter: {
@@ -319,8 +315,7 @@
});
// - One aggregation on st.shard0.shardName with a shard version exception (indicating that
- // the shard
- // was stale).
+ // the shard was stale).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {
@@ -332,11 +327,9 @@
});
// - At most two aggregations on st.shard0.shardName with no stale config exceptions. The
- // first, if
- // present, is an aborted cursor created if the command reaches st.shard0.shardName before
- // st.shard1.shardName
- // throws its stale config exception during attempt 1. The second profiler entry is the
- // aggregation which succeeded.
+ // first, if present, is an aborted cursor created if the command reaches
+ // st.shard0.shardName before st.shard1.shardName throws its stale config exception during
+ // attempt 1. The second profiler entry is the aggregation which succeeded.
profilerHasAtLeastOneAtMostNumMatchingEntriesOrThrow({
profileDB: shard0DB,
filter: {
@@ -349,8 +342,7 @@
});
// No $mergeCursors aggregation on primary st.shard0.shardName, since after backing out the
- // split
- // pipeline we eventually target only st.shard0.shardName.
+ // split pipeline we eventually target only st.shard0.shardName.
profilerHasZeroMatchingEntriesOrThrow({
profileDB: primaryShardDB,
filter: {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index c784bdd624f..e63f15c147f 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -318,7 +318,6 @@ void Pipeline::dispose(OperationContext* opCtx) {
std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() {
invariant(!isSplitForShards());
invariant(!isSplitForMerge());
- invariant(!_unsplitSources);
// Create and initialize the shard spec we'll return. We start with an empty pipeline on the
// shards and all work being done in the merger. Optimizations can move operations between
@@ -326,9 +325,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() {
std::unique_ptr<Pipeline, PipelineDeleter> shardPipeline(new Pipeline(pCtx),
PipelineDeleter(pCtx->opCtx));
- // Keep a copy of the original source list in case we need to reset the pipeline from split to
- // unsplit later.
- shardPipeline->_unsplitSources.emplace(_sources);
cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this);
shardPipeline->_splitState = SplitState::kSplitForShards;
_splitState = SplitState::kSplitForMerge;
@@ -338,28 +334,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() {
return shardPipeline;
}
-void Pipeline::unsplitFromSharded(
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMergingShard) {
- invariant(isSplitForShards());
- invariant(!isSplitForMerge());
- invariant(pipelineForMergingShard);
- invariant(_unsplitSources);
-
- // Clear the merge source list so that destroying the pipeline object won't dispose of the
- // stages. We still have a reference to each of the stages which will be moved back to the shard
- // pipeline via '_unsplitSources'.
- pipelineForMergingShard->_sources.clear();
- pipelineForMergingShard.reset();
-
- // Set '_sources' to its original state, re-stitch, and clear the '_unsplitSources' optional.
- _sources = *_unsplitSources;
- _unsplitSources.reset();
-
- _splitState = SplitState::kUnsplit;
-
- stitch();
-}
-
BSONObj Pipeline::getInitialQuery() const {
if (_sources.empty())
return BSONObj();
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 04b5769792d..22a8f8a8f88 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -165,14 +165,6 @@ public:
std::unique_ptr<Pipeline, PipelineDeleter> splitForSharded();
/**
- * Reassemble a split shard pipeline into its original form. Upon return, this pipeline will
- * contain the original source list. Must be called on the shards part of a split pipeline
- * returned by a call to splitForSharded(). It is an error to call this on the merge part of the
- * pipeline, or on a pipeline that has not been split.
- */
- void unsplitFromSharded(std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMergingShard);
-
- /**
* Returns true if this pipeline has not been split.
*/
bool isUnsplit() const {
@@ -384,12 +376,6 @@ private:
SourceContainer _sources;
- // When a pipeline is split via splitForSharded(), the resulting shards pipeline will set
- // '_unsplitSources' to be the original list of DocumentSources representing the full pipeline.
- // This is to allow the split pipelines to be subsequently reassembled into the original
- // pipeline, if necessary.
- boost::optional<SourceContainer> _unsplitSources;
-
SplitState _splitState = SplitState::kUnsplit;
boost::intrusive_ptr<ExpressionContext> pCtx;
bool _disposed = false;
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 23497f83e41..c53262cfc47 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1757,17 +1757,6 @@ public:
mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx));
mergePipe->optimizePipeline();
- auto beforeSplit = Value(mergePipe->serialize());
-
- shardPipe = mergePipe->splitForSharded();
- ASSERT(shardPipe);
-
- shardPipe->unsplitFromSharded(std::move(mergePipe));
- ASSERT_FALSE(mergePipe);
-
- ASSERT_VALUE_EQ(Value(shardPipe->serialize()), beforeSplit);
-
- mergePipe = std::move(shardPipe);
shardPipe = mergePipe->splitForSharded();
ASSERT(shardPipe);
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 2eb10e17133..e449aad0814 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -34,7 +34,6 @@
#include "mongo/bson/bsontypes.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/chunk_version.h"
namespace mongo {
@@ -115,13 +114,6 @@ CursorResponse::CursorResponse(NamespaceString nss,
StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdResponse) {
Status cmdStatus = getStatusFromCommandResult(cmdResponse);
if (!cmdStatus.isOK()) {
- if (ErrorCodes::isStaleShardVersionError(cmdStatus.code())) {
- auto vWanted = ChunkVersion::fromBSON(cmdResponse, "vWanted");
- auto vReceived = ChunkVersion::fromBSON(cmdResponse, "vReceived");
- if (!vWanted.hasEqualEpoch(vReceived)) {
- return Status(ErrorCodes::StaleEpoch, cmdStatus.reason());
- }
- }
return cmdStatus;
}
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 919f3318c7e..80c45f7c5b8 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -321,28 +321,12 @@ std::vector<RemoteCursor> establishShardCursors(
}
}
- // If we reach this point, we're either trying to establish cursors on a sharded execution
- // namespace, or handling the case where a sharded collection was dropped and recreated as
- // unsharded. Since views cannot be sharded, and because we will return an error rather than
- // attempting to continue in the event that a recreated namespace is a view, we do not handle
- // ErrorCodes::CommandOnShardedViewNotSupportedOnMongod here.
- try {
- return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- readPref,
- requests,
- false /* do not allow partial results */);
-
- } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>&) {
- // If any shard returned a stale shardVersion error, invalidate the routing table cache.
- // This will cause the cache to be refreshed the next time it is accessed.
- Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*routingInfo));
- throw;
- } catch (const ExceptionForCat<ErrorCategory::SnapshotError>&) {
- // If any shard returned a snapshot error, recompute the atClusterTime.
- throw;
- }
+ return establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */);
}
struct DispatchShardPipelineResults {
@@ -384,12 +368,8 @@ DispatchShardPipelineResults dispatchShardPipeline(
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
// - Call establishShardCursors to dispatch the aggregation to the targeted shards.
- // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
- // the refreshed routing table data.
- // - If the pipeline is not split and we now need to target multiple shards, split it. If the
- // pipeline is already split and we now only need to target a single shard, reassemble the
- // original pipeline.
- // - After exhausting 10 attempts to establish the cursors, we give up and throw.
+ // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
+ // entire aggregation commmand.
auto cursors = std::vector<RemoteCursor>();
auto shardResults = std::vector<AsyncRequestsSender::Response>();
auto opCtx = expCtx->opCtx;
@@ -403,129 +383,102 @@ DispatchShardPipelineResults dispatchShardPipeline(
auto pipelineForTargetedShards = std::move(pipeline);
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
- BSONObj targetedCommand;
- int numAttempts = 0;
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
- while (++numAttempts <= kMaxNumStaleVersionRetries) {
- // We need to grab a new routing table at the start of each iteration, since a stale config
- // exception will invalidate the previous one.
- auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+ // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
+ // Otherwise, uassert on all exceptions here.
+ if (!(liteParsedPipeline.hasChangeStream() &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ uassertStatusOK(executionNsRoutingInfoStatus);
+ }
- // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
- // Otherwise, uassert on all exceptions here.
- if (!(liteParsedPipeline.hasChangeStream() &&
- executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
- uassertStatusOK(executionNsRoutingInfoStatus);
- }
+ auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
+ ? std::move(executionNsRoutingInfoStatus.getValue())
+ : boost::optional<CachedCollectionRoutingInfo>{};
- auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
- ? std::move(executionNsRoutingInfoStatus.getValue())
- : boost::optional<CachedCollectionRoutingInfo>{};
-
- // Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll =
- mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline);
- std::set<ShardId> shardIds = getTargetedShards(
- opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
-
- auto atClusterTime = computeAtClusterTime(
- opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
-
- invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized);
-
- // Don't need to split the pipeline if we are only targeting a single shard, unless:
- // - There is a stage that needs to be run on the primary shard and the single target shard
- // is not the primary.
- // - The pipeline contains one or more stages which must always merge on mongoS.
- const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
- (needsPrimaryShardMerge && executionNsRoutingInfo &&
- *shardIds.begin() != executionNsRoutingInfo->db().primaryId()));
-
- const bool isSplit = pipelineForTargetedShards->isSplitForShards();
-
- // If we have to run on multiple shards and the pipeline is not yet split, split it. If we
- // can run on a single shard and the pipeline is already split, reassemble it.
- if (needsSplit && !isSplit) {
- pipelineForMerging = std::move(pipelineForTargetedShards);
- pipelineForTargetedShards = pipelineForMerging->splitForSharded();
- } else if (!needsSplit && isSplit) {
- pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
- }
+ // Determine whether we can run the entire aggregation on a single shard.
+ const bool mustRunOnAll =
+ mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline);
+ std::set<ShardId> shardIds = getTargetedShards(
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
- // Generate the command object for the targeted shards.
- targetedCommand = createCommandForTargetedShards(
- opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime);
+ auto atClusterTime = computeAtClusterTime(
+ opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
- // Refresh the shard registry if we're targeting all shards. We need the shard registry
- // to be at least as current as the logical time used when creating the command for
- // $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAll) {
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
- shardRegistry->reload(opCtx);
- }
- }
+ invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized);
- // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
- try {
- if (expCtx->explain) {
- if (mustRunOnAll) {
- // Some stages (such as $currentOp) need to be broadcast to all shards, and
- // should not participate in the shard version protocol.
- shardResults =
- scatterGatherUnversionedTargetAllShards(opCtx,
- executionNss.db(),
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- } else {
- // Aggregations on a real namespace should use the routing table to target
- // shards, and should participate in the shard version protocol.
- invariant(executionNsRoutingInfo);
- shardResults = scatterGatherVersionedTargetByRoutingTable(
- opCtx,
- executionNss.db(),
- executionNss,
- *executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- shardQuery,
- aggRequest.getCollation());
- }
- } else {
- cursors = establishShardCursors(opCtx,
- executionNss,
- liteParsedPipeline,
- executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- shardQuery,
- aggRequest.getCollation());
- }
- } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) {
- LOG(1) << "got stale shardVersion error " << redact(ex) << " while dispatching "
- << redact(targetedCommand) << " after " << (numAttempts + 1)
- << " dispatch attempts";
- continue; // Try again if allowed.
- } catch (const ExceptionForCat<ErrorCategory::SnapshotError>& ex) {
- LOG(1) << "got snapshot error " << redact(ex) << " while dispatching "
- << redact(targetedCommand) << " after " << (numAttempts + 1)
- << " dispatch attempts";
- continue; // Try again if allowed.
- }
+ // Don't need to split the pipeline if we are only targeting a single shard, unless:
+ // - There is a stage that needs to be run on the primary shard and the single target shard
+ // is not the primary.
+ // - The pipeline contains one or more stages which must always merge on mongoS.
+ const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
+ (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
+
+ if (needsSplit) {
+ pipelineForMerging = std::move(pipelineForTargetedShards);
+ pipelineForTargetedShards = pipelineForMerging->splitForSharded();
+ }
- // Record the number of shards involved in the aggregation. If we are required to merge on
- // the primary shard, but the primary shard was not in the set of targeted shards, then we
- // must increment the number of involved shards.
- CurOp::get(opCtx)->debug().nShards =
- shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
- !shardIds.count(executionNsRoutingInfo->db().primaryId()));
+ // Generate the command object for the targeted shards.
+ BSONObj targetedCommand = createCommandForTargetedShards(
+ opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime);
- break; // Success!
+ // Refresh the shard registry if we're targeting all shards. We need the shard registry
+ // to be at least as current as the logical time used when creating the command for
+ // $changeStream to work reliably, so we do a "hard" reload.
+ if (mustRunOnAll) {
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ shardRegistry->reload(opCtx);
+ }
}
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAll) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and
+ // should not participate in the shard version protocol.
+ shardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db(),
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target
+ // shards, and should participate in the shard version protocol.
+ invariant(executionNsRoutingInfo);
+ shardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db(),
+ executionNss,
+ *executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation());
+ }
+ } else {
+ cursors = establishShardCursors(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery,
+ aggRequest.getCollation());
+ }
+
+ // Record the number of shards involved in the aggregation. If we are required to merge on
+ // the primary shard, but the primary shard was not in the set of targeted shards, then we
+ // must increment the number of involved shards.
+ CurOp::get(opCtx)->debug().nShards =
+ shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ !shardIds.count(executionNsRoutingInfo->db().primaryId()));
+
return DispatchShardPipelineResults{needsPrimaryShardMerge,
std::move(cursors),
std::move(shardResults),
@@ -1010,7 +963,6 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// error and hence shardId history does not need to be verified.
auto atClusterTime = computeAtClusterTimeForOneShard(opCtx, shardId);
-
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
diff --git a/src/mongo/s/commands/cluster_aggregate_test.cpp b/src/mongo/s/commands/cluster_aggregate_test.cpp
index 1ccf194b629..c6b79c7d346 100644
--- a/src/mongo/s/commands/cluster_aggregate_test.cpp
+++ b/src/mongo/s/commands/cluster_aggregate_test.cpp
@@ -194,18 +194,6 @@ TEST_F(ClusterAggregateTest, NoErrors) {
runAggCommandSuccessful(kAggregateCmdScatterGather, false);
}
-// Verify ClusterFind::runQuery will retry on a snapshot error.
-TEST_F(ClusterAggregateTest, RetryOnSnapshotError) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runAggCommandOneError(kAggregateCmdTargeted, ErrorCodes::SnapshotUnavailable, true);
- runAggCommandOneError(kAggregateCmdTargeted, ErrorCodes::SnapshotTooOld, true);
-
- // Target all shards
- runAggCommandOneError(kAggregateCmdScatterGather, ErrorCodes::SnapshotUnavailable, false);
- runAggCommandOneError(kAggregateCmdScatterGather, ErrorCodes::SnapshotTooOld, false);
-}
TEST_F(ClusterAggregateTest, AttachesAtClusterTimeForSnapshotReadConcern) {
loadRoutingTableWithTwoChunksAndTwoShards(kNss);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 9dc6e9c50a3..c0edc37f1f3 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -72,6 +72,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/op_msg.h"
@@ -281,6 +282,8 @@ void execCommandClient(OperationContext* opCtx,
}
}
+MONGO_FP_DECLARE(doNotRefreshShardsOnRetargettingError);
+
void runCommand(OperationContext* opCtx,
const OpMsgRequest& request,
const NetworkOp opType,
@@ -358,44 +361,52 @@ void runCommand(OperationContext* opCtx,
try {
execCommandClient(opCtx, invocation.get(), request, &crb);
return;
- } catch (ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) {
- const auto staleNs = [&] {
- if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
- return NamespaceString(staleInfo->getns());
- } else if (auto implicitCreateInfo =
- ex.extraInfo<CannotImplicitlyCreateCollectionInfo>()) {
- return NamespaceString(implicitCreateInfo->getNss());
- } else {
+ } catch (const DBException& ex) {
+ if (ErrorCodes::isNeedRetargettingError(ex.code()) ||
+ ErrorCodes::isSnapshotError(ex.code())) {
+ const auto staleNs = [&] {
+ if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
+ return NamespaceString(staleInfo->getns());
+ } else if (auto implicitCreateInfo =
+ ex.extraInfo<CannotImplicitlyCreateCollectionInfo>()) {
+ return NamespaceString(implicitCreateInfo->getNss());
+ } else {
+ throw;
+ }
+ }();
+
+ if (staleNs.isEmpty()) {
+ // This should be impossible but older versions tried incorrectly to handle
+ // it here.
+ log() << "Received a stale config error with an empty namespace while "
+ "executing "
+ << redact(request.body) << " : " << redact(ex);
throw;
}
- }();
-
- if (staleNs.isEmpty()) {
- // This should be impossible but older versions tried incorrectly to handle it
- // here.
- log()
- << "Received a stale config error with an empty namespace while executing "
- << redact(request.body) << " : " << redact(ex);
- throw;
- }
- if (!canRetry)
- throw;
+ if (!canRetry)
+ throw;
- log() << "Retrying command " << redact(request.body) << causedBy(ex);
+ LOG(2) << "Retrying command " << redact(request.body) << causedBy(ex);
- ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns());
- if (staleNs.isValid()) {
- Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs);
- }
+ if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError)) {
+ ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns());
+ }
+
+ if (staleNs.isValid()) {
+ Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs);
+ }
- continue;
- } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>& e) {
- Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(e->getDb(),
- e->getVersionReceived());
- if (!canRetry)
+ continue;
+ } else if (auto sce = ex.extraInfo<StaleDbRoutingVersion>()) {
+ Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(
+ sce->getDb(), sce->getVersionReceived());
+ if (!canRetry)
+ throw;
+ continue;
+ } else {
throw;
- continue;
+ }
}
MONGO_UNREACHABLE;
}