From 586663fec7c3a7d4a8b0185ff24825bd15e80dff Mon Sep 17 00:00:00 2001 From: "Mickey. J Winters" Date: Fri, 25 Feb 2022 14:52:13 +0000 Subject: SERVER-62738 implement aggregate $_passthroughToShard option --- src/mongo/s/query/cluster_aggregation_planner.cpp | 119 +++++++++++++++++++++- 1 file changed, 115 insertions(+), 4 deletions(-) (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp') diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 6062c02e6ba..9430ddb8270 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_skip.h" @@ -101,7 +102,9 @@ AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCt ReadPreferenceSetting::get(opCtx), sharded_agg_helpers::getDesiredRetryPolicy(opCtx)); const auto response = ars.next(); - invariant(ars.done()); + tassert(6273807, + "requested and received data from just one shard, but results are still pending", + ars.done()); return response; } @@ -567,7 +570,14 @@ AggregationTargeter AggregationTargeter::make( boost::optional cm, stdx::unordered_set involvedNamespaces, bool hasChangeStream, - bool allowedToPassthrough) { + bool allowedToPassthrough, + bool perShardCursor) { + if (perShardCursor) { + uassert(6273800, + "featureFlagPerShardCursor must be enabled to use $_passthroughToShard", + feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()); + return {TargetingPolicy::kSpecificShardOnly, nullptr, cm}; + } // Check if any of the involved collections are sharded. bool involvesShardedCollections = [&]() { @@ -616,6 +626,19 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr& Document serializedCommand, const PrivilegeVector& privileges, BSONObjBuilder* out) { + if (feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()) { + return runPipelineOnSpecificShardOnly(expCtx, + namespaces, + cm, + explain, + serializedCommand, + privileges, + cm.dbPrimary(), + false, + out); + } + // TODO SERVER-58673 remove the if statement here, remove code below and just call + // runPipelineOnSpecificDirectly. make sure to clean up divergence between the two functions. auto opCtx = expCtx->opCtx; // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an @@ -640,7 +663,9 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr& ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); auto response = ars.next(); - invariant(ars.done()); + tassert(6273805, + "requested and received data from just one shard, but results are still pending", + ars.done()); uassertStatusOK(response.swResponse); auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); @@ -649,7 +674,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr& uassertStatusOK(commandStatus.withContext("command failed because of stale config")); } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { uassertStatusOK( - commandStatus.withContext("command failed because can not establish a snapshot")); + commandStatus.withContext("command failed because establishing a snapshot failed")); } BSONObj result; @@ -820,5 +845,91 @@ std::pair> getCollationAndUUID( return {collation.isEmpty() ? getCollation() : collation, getUUID()}; } +Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr& expCtx, + const ClusterAggregate::Namespaces& namespaces, + const ChunkManager& cm, + boost::optional explain, + Document serializedCommand, + const PrivilegeVector& privileges, + ShardId shardId, + bool forPerShardCursor, + BSONObjBuilder* out) { + auto opCtx = expCtx->opCtx; + + if (forPerShardCursor) { + tassert(6273804, + "Per shard cursors are supposed to pass fromMongos: false to shards", + !expCtx->inMongos); + } + + // Format the command for the shard. This wraps the command as an explain if necessary, and + // rewrites the result into a format safe to forward to shards. + BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx, + serializedCommand, + explain, + nullptr, /* pipeline */ + BSONObj(), + boost::none); + + if (!forPerShardCursor && shardId != ShardId::kConfigServerId) { + cmdObj = appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()); + } + if (!forPerShardCursor) { + // Per shard cursors should not send any shard version info. + cmdObj = appendDbVersionIfPresent(std::move(cmdObj), cm.dbVersion()); + } + + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + namespaces.executionNss.db().toString(), + {{shardId, cmdObj}}, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + auto response = ars.next(); + tassert(6273806, + "requested and received data from just one shard, but results are still pending", + ars.done()); + + uassertStatusOK(response.swResponse); + auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); + + if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) { + uassertStatusOK(commandStatus.withContext("command failed because of stale config")); + } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { + uassertStatusOK( + commandStatus.withContext("command failed because can not establish a snapshot")); + } + + BSONObj result; + if (explain) { + // If this was an explain, then we get back an explain result object rather than a cursor. + result = response.swResponse.getValue().data; + } else { + result = uassertStatusOK(storePossibleCursor( + opCtx, + shardId, + *response.shardHostAndPort, + response.swResponse.getValue().data, + namespaces.requestedNss, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + privileges, + expCtx->tailableMode, + forPerShardCursor ? boost::optional(change_stream_constants::kSortSpec) + : boost::none)); + } + + // First append the properly constructed writeConcernError. It will then be skipped + // in appendElementsUnique. + if (auto wcErrorElem = result["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out); + } + + out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result)); + + return getStatusFromCommandResult(out->asTempObj()); +} + } // namespace cluster_aggregation_planner } // namespace mongo -- cgit v1.2.1