summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-26 13:27:29 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-01 17:14:20 -0400
commite83d715afbec0ba89f041ed53e7d18c8eff587e5 (patch)
tree3f085dd6e7882ca2ccde7e0ac02f35b0e3ff7f9a /src/mongo/s
parentc9a70c9a28a0b2c9ddbd66b5dee2308405a6eb68 (diff)
downloadmongo-e83d715afbec0ba89f041ed53e7d18c8eff587e5.tar.gz
SERVER-34695: Move aggregation retry logic to command processing layer
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp234
-rw-r--r--src/mongo/s/commands/cluster_aggregate_test.cpp12
-rw-r--r--src/mongo/s/commands/strategy.cpp73
3 files changed, 135 insertions, 184 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 919f3318c7e..80c45f7c5b8 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -321,28 +321,12 @@ std::vector<RemoteCursor> establishShardCursors(
}
}
- // If we reach this point, we're either trying to establish cursors on a sharded execution
- // namespace, or handling the case where a sharded collection was dropped and recreated as
- // unsharded. Since views cannot be sharded, and because we will return an error rather than
- // attempting to continue in the event that a recreated namespace is a view, we do not handle
- // ErrorCodes::CommandOnShardedViewNotSupportedOnMongod here.
- try {
- return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- readPref,
- requests,
- false /* do not allow partial results */);
-
- } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>&) {
- // If any shard returned a stale shardVersion error, invalidate the routing table cache.
- // This will cause the cache to be refreshed the next time it is accessed.
- Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*routingInfo));
- throw;
- } catch (const ExceptionForCat<ErrorCategory::SnapshotError>&) {
- // If any shard returned a snapshot error, recompute the atClusterTime.
- throw;
- }
+ return establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */);
}
struct DispatchShardPipelineResults {
@@ -384,12 +368,8 @@ DispatchShardPipelineResults dispatchShardPipeline(
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
// - Call establishShardCursors to dispatch the aggregation to the targeted shards.
- // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
- // the refreshed routing table data.
- // - If the pipeline is not split and we now need to target multiple shards, split it. If the
- // pipeline is already split and we now only need to target a single shard, reassemble the
- // original pipeline.
- // - After exhausting 10 attempts to establish the cursors, we give up and throw.
+ // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
+ // entire aggregation commmand.
auto cursors = std::vector<RemoteCursor>();
auto shardResults = std::vector<AsyncRequestsSender::Response>();
auto opCtx = expCtx->opCtx;
@@ -403,129 +383,102 @@ DispatchShardPipelineResults dispatchShardPipeline(
auto pipelineForTargetedShards = std::move(pipeline);
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
- BSONObj targetedCommand;
- int numAttempts = 0;
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
- while (++numAttempts <= kMaxNumStaleVersionRetries) {
- // We need to grab a new routing table at the start of each iteration, since a stale config
- // exception will invalidate the previous one.
- auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+ // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
+ // Otherwise, uassert on all exceptions here.
+ if (!(liteParsedPipeline.hasChangeStream() &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ uassertStatusOK(executionNsRoutingInfoStatus);
+ }
- // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
- // Otherwise, uassert on all exceptions here.
- if (!(liteParsedPipeline.hasChangeStream() &&
- executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
- uassertStatusOK(executionNsRoutingInfoStatus);
- }
+ auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
+ ? std::move(executionNsRoutingInfoStatus.getValue())
+ : boost::optional<CachedCollectionRoutingInfo>{};
- auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
- ? std::move(executionNsRoutingInfoStatus.getValue())
- : boost::optional<CachedCollectionRoutingInfo>{};
-
- // Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll =
- mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline);
- std::set<ShardId> shardIds = getTargetedShards(
- opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
-
- auto atClusterTime = computeAtClusterTime(
- opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
-
- invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized);
-
- // Don't need to split the pipeline if we are only targeting a single shard, unless:
- // - There is a stage that needs to be run on the primary shard and the single target shard
- // is not the primary.
- // - The pipeline contains one or more stages which must always merge on mongoS.
- const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
- (needsPrimaryShardMerge && executionNsRoutingInfo &&
- *shardIds.begin() != executionNsRoutingInfo->db().primaryId()));
-
- const bool isSplit = pipelineForTargetedShards->isSplitForShards();
-
- // If we have to run on multiple shards and the pipeline is not yet split, split it. If we
- // can run on a single shard and the pipeline is already split, reassemble it.
- if (needsSplit && !isSplit) {
- pipelineForMerging = std::move(pipelineForTargetedShards);
- pipelineForTargetedShards = pipelineForMerging->splitForSharded();
- } else if (!needsSplit && isSplit) {
- pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
- }
+ // Determine whether we can run the entire aggregation on a single shard.
+ const bool mustRunOnAll =
+ mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline);
+ std::set<ShardId> shardIds = getTargetedShards(
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
- // Generate the command object for the targeted shards.
- targetedCommand = createCommandForTargetedShards(
- opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime);
+ auto atClusterTime = computeAtClusterTime(
+ opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
- // Refresh the shard registry if we're targeting all shards. We need the shard registry
- // to be at least as current as the logical time used when creating the command for
- // $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAll) {
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
- shardRegistry->reload(opCtx);
- }
- }
+ invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized);
- // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
- try {
- if (expCtx->explain) {
- if (mustRunOnAll) {
- // Some stages (such as $currentOp) need to be broadcast to all shards, and
- // should not participate in the shard version protocol.
- shardResults =
- scatterGatherUnversionedTargetAllShards(opCtx,
- executionNss.db(),
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- } else {
- // Aggregations on a real namespace should use the routing table to target
- // shards, and should participate in the shard version protocol.
- invariant(executionNsRoutingInfo);
- shardResults = scatterGatherVersionedTargetByRoutingTable(
- opCtx,
- executionNss.db(),
- executionNss,
- *executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- shardQuery,
- aggRequest.getCollation());
- }
- } else {
- cursors = establishShardCursors(opCtx,
- executionNss,
- liteParsedPipeline,
- executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- shardQuery,
- aggRequest.getCollation());
- }
- } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) {
- LOG(1) << "got stale shardVersion error " << redact(ex) << " while dispatching "
- << redact(targetedCommand) << " after " << (numAttempts + 1)
- << " dispatch attempts";
- continue; // Try again if allowed.
- } catch (const ExceptionForCat<ErrorCategory::SnapshotError>& ex) {
- LOG(1) << "got snapshot error " << redact(ex) << " while dispatching "
- << redact(targetedCommand) << " after " << (numAttempts + 1)
- << " dispatch attempts";
- continue; // Try again if allowed.
- }
+ // Don't need to split the pipeline if we are only targeting a single shard, unless:
+ // - There is a stage that needs to be run on the primary shard and the single target shard
+ // is not the primary.
+ // - The pipeline contains one or more stages which must always merge on mongoS.
+ const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
+ (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
+
+ if (needsSplit) {
+ pipelineForMerging = std::move(pipelineForTargetedShards);
+ pipelineForTargetedShards = pipelineForMerging->splitForSharded();
+ }
- // Record the number of shards involved in the aggregation. If we are required to merge on
- // the primary shard, but the primary shard was not in the set of targeted shards, then we
- // must increment the number of involved shards.
- CurOp::get(opCtx)->debug().nShards =
- shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
- !shardIds.count(executionNsRoutingInfo->db().primaryId()));
+ // Generate the command object for the targeted shards.
+ BSONObj targetedCommand = createCommandForTargetedShards(
+ opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime);
- break; // Success!
+ // Refresh the shard registry if we're targeting all shards. We need the shard registry
+ // to be at least as current as the logical time used when creating the command for
+ // $changeStream to work reliably, so we do a "hard" reload.
+ if (mustRunOnAll) {
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ shardRegistry->reload(opCtx);
+ }
}
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAll) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and
+ // should not participate in the shard version protocol.
+ shardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db(),
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target
+ // shards, and should participate in the shard version protocol.
+ invariant(executionNsRoutingInfo);
+ shardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db(),
+ executionNss,
+ *executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation());
+ }
+ } else {
+ cursors = establishShardCursors(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery,
+ aggRequest.getCollation());
+ }
+
+ // Record the number of shards involved in the aggregation. If we are required to merge on
+ // the primary shard, but the primary shard was not in the set of targeted shards, then we
+ // must increment the number of involved shards.
+ CurOp::get(opCtx)->debug().nShards =
+ shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ !shardIds.count(executionNsRoutingInfo->db().primaryId()));
+
return DispatchShardPipelineResults{needsPrimaryShardMerge,
std::move(cursors),
std::move(shardResults),
@@ -1010,7 +963,6 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// error and hence shardId history does not need to be verified.
auto atClusterTime = computeAtClusterTimeForOneShard(opCtx, shardId);
-
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
diff --git a/src/mongo/s/commands/cluster_aggregate_test.cpp b/src/mongo/s/commands/cluster_aggregate_test.cpp
index 1ccf194b629..c6b79c7d346 100644
--- a/src/mongo/s/commands/cluster_aggregate_test.cpp
+++ b/src/mongo/s/commands/cluster_aggregate_test.cpp
@@ -194,18 +194,6 @@ TEST_F(ClusterAggregateTest, NoErrors) {
runAggCommandSuccessful(kAggregateCmdScatterGather, false);
}
-// Verify ClusterFind::runQuery will retry on a snapshot error.
-TEST_F(ClusterAggregateTest, RetryOnSnapshotError) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runAggCommandOneError(kAggregateCmdTargeted, ErrorCodes::SnapshotUnavailable, true);
- runAggCommandOneError(kAggregateCmdTargeted, ErrorCodes::SnapshotTooOld, true);
-
- // Target all shards
- runAggCommandOneError(kAggregateCmdScatterGather, ErrorCodes::SnapshotUnavailable, false);
- runAggCommandOneError(kAggregateCmdScatterGather, ErrorCodes::SnapshotTooOld, false);
-}
TEST_F(ClusterAggregateTest, AttachesAtClusterTimeForSnapshotReadConcern) {
loadRoutingTableWithTwoChunksAndTwoShards(kNss);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 9dc6e9c50a3..c0edc37f1f3 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -72,6 +72,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/op_msg.h"
@@ -281,6 +282,8 @@ void execCommandClient(OperationContext* opCtx,
}
}
+MONGO_FP_DECLARE(doNotRefreshShardsOnRetargettingError);
+
void runCommand(OperationContext* opCtx,
const OpMsgRequest& request,
const NetworkOp opType,
@@ -358,44 +361,52 @@ void runCommand(OperationContext* opCtx,
try {
execCommandClient(opCtx, invocation.get(), request, &crb);
return;
- } catch (ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) {
- const auto staleNs = [&] {
- if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
- return NamespaceString(staleInfo->getns());
- } else if (auto implicitCreateInfo =
- ex.extraInfo<CannotImplicitlyCreateCollectionInfo>()) {
- return NamespaceString(implicitCreateInfo->getNss());
- } else {
+ } catch (const DBException& ex) {
+ if (ErrorCodes::isNeedRetargettingError(ex.code()) ||
+ ErrorCodes::isSnapshotError(ex.code())) {
+ const auto staleNs = [&] {
+ if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
+ return NamespaceString(staleInfo->getns());
+ } else if (auto implicitCreateInfo =
+ ex.extraInfo<CannotImplicitlyCreateCollectionInfo>()) {
+ return NamespaceString(implicitCreateInfo->getNss());
+ } else {
+ throw;
+ }
+ }();
+
+ if (staleNs.isEmpty()) {
+ // This should be impossible but older versions tried incorrectly to handle
+ // it here.
+ log() << "Received a stale config error with an empty namespace while "
+ "executing "
+ << redact(request.body) << " : " << redact(ex);
throw;
}
- }();
-
- if (staleNs.isEmpty()) {
- // This should be impossible but older versions tried incorrectly to handle it
- // here.
- log()
- << "Received a stale config error with an empty namespace while executing "
- << redact(request.body) << " : " << redact(ex);
- throw;
- }
- if (!canRetry)
- throw;
+ if (!canRetry)
+ throw;
- log() << "Retrying command " << redact(request.body) << causedBy(ex);
+ LOG(2) << "Retrying command " << redact(request.body) << causedBy(ex);
- ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns());
- if (staleNs.isValid()) {
- Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs);
- }
+ if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError)) {
+ ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns());
+ }
+
+ if (staleNs.isValid()) {
+ Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs);
+ }
- continue;
- } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>& e) {
- Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(e->getDb(),
- e->getVersionReceived());
- if (!canRetry)
+ continue;
+ } else if (auto sce = ex.extraInfo<StaleDbRoutingVersion>()) {
+ Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(
+ sce->getDb(), sce->getVersionReceived());
+ if (!canRetry)
+ throw;
+ continue;
+ } else {
throw;
- continue;
+ }
}
MONGO_UNREACHABLE;
}