diff options
Diffstat (limited to 'src')
17 files changed, 702 insertions, 574 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 2f1c2e27cca..71270f0e72f 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -320,7 +320,10 @@ env.Library( 'mongos_process_interface.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/s/query/async_results_merger', + '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner', + '$BUILD_DIR/mongo/s/query/cluster_query', 'mongo_process_common', ] ) diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 17622131061..737c316c9bf 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -246,8 +246,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { // with the resume token. auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); + auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx); if (auto first = pipeline->getNext()) { auto firstOplogEntry = Value(*first); uassert(40576, diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index a7b0567b66b..a94179c1431 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -458,34 +458,31 @@ public: MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index d93535dc030..1b6583bb655 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -207,8 +207,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { // We've already allocated space for the trailing $match stage in '_fromPipeline'. _fromPipeline.back() = *matchStage; - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx)); + auto pipeline = + pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx); while (auto next = pipeline->getNext()) { uassert(40271, str::stream() diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 6309f89597a..47f37939f73 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -63,30 +63,27 @@ public: MockMongoInterface(std::deque<DocumentSource::GetNextResult> results) : _results(std::move(results)) {} - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_results)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 04094a1d45b..7223f854521 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -196,9 +196,16 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { txnRequirement = resolvedRequirements.second; } + // If executing on mongos and the foreign collection is sharded, then this stage can run on + // mongos. + HostTypeRequirement hostRequirement = + (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)) + ? HostTypeRequirement::kMongoS + : HostTypeRequirement::kPrimaryShard; + StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kNone, - HostTypeRequirement::kPrimaryShard, + hostRequirement, diskRequirement, FacetRequirement::kAllowed, txnRequirement); @@ -289,8 +296,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( // If we don't have a cache, build and return the pipeline immediately. if (!_cache || _cache->isAbandoned()) { - return uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx)); + return pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx); } // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a @@ -300,8 +306,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( pipelineOpts.attachCursorSource = false; // Construct the basic pipeline without a cache stage. - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); + auto pipeline = + pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); // Add the cache stage at the end and optimize. During the optimization process, the cache will // either move itself to the correct position in the pipeline, or will abandon itself if no @@ -313,8 +319,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. - uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( - _fromExpCtx, pipeline.get())); + pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx, + pipeline.release()); } // If the cache has been abandoned, release it. diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index d17b1a73c9c..2a2b3338789 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -81,13 +81,6 @@ public: return requiredPrivileges; } - /** - * Lookup from a sharded collection is not allowed. - */ - bool allowShardedForeignCollection(NamespaceString nss) const final { - return (_foreignNssSet.find(nss) == _foreignNssSet.end()); - } - private: const NamespaceString _fromNss; const stdx::unordered_set<NamespaceString> _foreignNssSet; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 148197d4d5b..9c3dbf5777b 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -85,34 +85,31 @@ public: MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } boost::optional<Document> lookupSingleDocument( @@ -125,11 +122,12 @@ public: // case of a change stream on a whole database so we need to make a copy of the // ExpressionContext with the new namespace. auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none); - auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); - if (swPipeline == ErrorCodes::NamespaceNotFound) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline; + try { + pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); + } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { return boost::none; } - auto pipeline = uassertStatusOK(std::move(swPipeline)); auto lookedUpDocument = pipeline->getNext(); if (auto next = pipeline->getNext()) { diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 4f461d6705c..bacfc5c6cda 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -544,28 +544,25 @@ public: return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") || pipeline->popFrontWithName("$project")) { @@ -575,7 +572,7 @@ public: } pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 5f092107c1a..be934e88707 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -184,20 +184,24 @@ public: * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to * add an initial cursor source. * - * This function returns a non-OK status if parsing the pipeline failed. + * This function throws if parsing the pipeline failed. */ - virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + virtual std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) = 0; /** - * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This - * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be - * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be + * Accepts a pipeline and returns a new one which will draw input from the underlying + * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be + * thrown if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be * the only case where NamespaceNotFound is returned. + * + * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr. + * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the + * compiler expects to find an implementation of PipelineDeleter. */ - virtual Status attachCursorSourceToPipeline( + virtual std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; /** diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 85f141c9277..6336910029c 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -28,10 +28,13 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + #include "mongo/platform/basic.h" #include "mongo/db/pipeline/mongos_process_interface.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/curop.h" #include "mongo/db/index/index_descriptor.h" @@ -45,17 +48,109 @@ #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/query/cluster_query_knobs.h" +#include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_exec_stage.h" +#include "mongo/s/transaction_router.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/log.h" namespace mongo { +MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); + using boost::intrusive_ptr; using std::shared_ptr; using std::string; using std::unique_ptr; namespace { +// Given a document representing an aggregation command such as +// +// {aggregate: "myCollection", pipeline: [], ...}, +// +// produces the corresponding explain command: +// +// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} +Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { + MutableDocument explainCommandBuilder; + explainCommandBuilder["explain"] = Value(aggregateCommand); + // Downstream host targeting code expects queryOptions at the top level of the command object. + explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = + Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); + + // readConcern needs to be promoted to the top-level of the request. + explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = + Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); + + // Add explain command options. + for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { + explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); + } + + return explainCommandBuilder.freeze(); +} + +std::vector<RemoteCursor> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj& cmdObj, + const AggregationRequest& request, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery) { + LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; + + const bool mustRunOnAll = MongoSInterface::mustRunOnAllShards(nss, litePipe); + std::set<ShardId> shardIds = MongoSInterface::getTargetedShards( + opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); + std::vector<std::pair<ShardId, BSONObj>> requests; + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo || mustRunOnAll); + + if (mustRunOnAll) { + // The pipeline contains a stage which must be run on all shards. Skip versioning and + // enqueue the raw command objects. + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + } else if (routingInfo->cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation, and build versioned requests for them. + for (auto& shardId : shardIds) { + auto versionedCmdObj = + appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo->db().primaryId(), + !routingInfo->db().primary()->isConfig() + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj); + } + + if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " + "until fail point is disabled."; + while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + sleepsecs(1); + } + } + + return establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */, + MongoSInterface::getDesiredRetryPolicy(request)); +} + /** * Determines the single shard to which the given query will be targeted, and its associated * shardVersion. Throws if the query targets more than one shard. @@ -115,6 +210,379 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace +Shard::RetryPolicy MongoSInterface::getDesiredRetryPolicy(const AggregationRequest& req) { + // The idempotent retry policy will retry even for writeConcern failures, so only set it if the + // pipeline does not support writeConcern. + if (req.getWriteConcern()) { + return Shard::RetryPolicy::kNotIdempotent; + } + return Shard::RetryPolicy::kIdempotent; +} + +BSONObj MongoSInterface::createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + const boost::optional<ShardId>& shardId, + Pipeline* pipeline, + BSONObj collationObj) { + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + if (pipeline) { + targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); + } + + return MongoSInterface::genericTransformForShards( + std::move(targetedCmd), opCtx, shardId, request, collationObj); +} + +BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const boost::optional<ShardId>& shardId, + const AggregationRequest& request, + BSONObj collationObj) { + cmdForShards[AggregationRequest::kFromMongosName] = Value(true); + // If this is a request for an aggregation explain, then we must wrap the aggregate inside an + // explain command. + if (auto explainVerbosity = request.getExplain()) { + cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); + } + + if (!collationObj.isEmpty()) { + cmdForShards[AggregationRequest::kCollationName] = Value(collationObj); + } + + if (opCtx->getTxnNumber()) { + invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(), + str::stream() << "Command for shards unexpectedly had the " + << OperationSessionInfo::kTxnNumberFieldName + << " field set: " + << cmdForShards.peek().toString()); + cmdForShards[OperationSessionInfo::kTxnNumberFieldName] = + Value(static_cast<long long>(*opCtx->getTxnNumber())); + } + + auto aggCmd = cmdForShards.freeze().toBson(); + + if (shardId) { + if (auto txnRouter = TransactionRouter::get(opCtx)) { + aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd); + } + } + + // agg creates temp collection and should handle implicit create separately. + return appendAllowImplicitCreate(aggCmd, true); +} + +BSONObj MongoSInterface::createCommandForTargetedShards( + OperationContext* opCtx, + const AggregationRequest& request, + const cluster_aggregation_planner::SplitPipeline& splitPipeline, + const BSONObj collationObj, + const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + bool needsMerge) { + + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it + // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may + // have detected a logged in user and appended that user name to the $listSessions spec to + // send to the shards. + targetedCmd[AggregationRequest::kPipelineName] = + Value(splitPipeline.shardsPipeline->serialize()); + + // When running on many shards with the exchange we may not need merging. + if (needsMerge) { + targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + + // For split pipelines which need merging, do *not* propagate the writeConcern to the shards + // part. Otherwise this is part of an exchange and in that case we should include the + // writeConcern. + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); + } + + targetedCmd[AggregationRequest::kCursorName] = + Value(DOC(AggregationRequest::kBatchSizeName << 0)); + + targetedCmd[AggregationRequest::kExchangeName] = + exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); + + return genericTransformForShards( + std::move(targetedCmd), opCtx, boost::none, request, collationObj); +} + +std::set<ShardId> MongoSInterface::getTargetedShards( + OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj shardQuery, + const BSONObj collation) { + if (mustRunOnAllShards) { + // The pipeline begins with a stage which must be run on all shards. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + return {shardIds.begin(), shardIds.end()}; + } + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo); + + return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); +} + +bool MongoSInterface::mustRunOnAllShards(const NamespaceString& nss, + const LiteParsedPipeline& litePipe) { + // The following aggregations must be routed to all shards: + // - Any collectionless aggregation, such as non-localOps $currentOp. + // - Any aggregation which begins with a $changeStream stage. + return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); +} + +StatusWith<CachedCollectionRoutingInfo> MongoSInterface::getExecutionNsRoutingInfo( + OperationContext* opCtx, const NamespaceString& execNss) { + // First, verify that there are shards present in the cluster. If not, then we return the + // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because + // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on + // a collection before its enclosing database is created. However, if there are no shards + // present, then $changeStream should immediately return an empty cursor just as other + // aggregations do when the database does not exist. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + if (shardIds.size() == 0) { + return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; + } + + // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not + // exist. + return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); +} + +/** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and + * the pipeline that will need to be executed to merge the results from the remotes. If a stale + * shard version is encountered, refreshes the routing table and tries again. + */ +MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj) { + // The process is as follows: + // - First, determine whether we need to target more than one shard. If so, we split the + // pipeline; if not, we retain the existing pipeline. + // - Call establishShardCursors to dispatch the aggregation to the targeted shards. + // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the + // entire aggregation commmand. + auto cursors = std::vector<RemoteCursor>(); + auto shardResults = std::vector<AsyncRequestsSender::Response>(); + auto opCtx = expCtx->opCtx; + + const bool needsPrimaryShardMerge = + (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); + + const bool needsMongosMerge = pipeline->needsMongosMerger(); + + const auto shardQuery = pipeline->getInitialQuery(); + + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + + // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. + // Otherwise, uassert on all exceptions here. + if (!(liteParsedPipeline.hasChangeStream() && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + uassertStatusOK(executionNsRoutingInfoStatus); + } + + auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() + ? std::move(executionNsRoutingInfoStatus.getValue()) + : boost::optional<CachedCollectionRoutingInfo>{}; + + // Determine whether we can run the entire aggregation on a single shard. + const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline); + std::set<ShardId> shardIds = getTargetedShards( + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); + + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->computeAndSetAtClusterTime( + opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); + } + + // Don't need to split the pipeline if we are only targeting a single shard, unless: + // - There is a stage that needs to be run on the primary shard and the single target shard + // is not the primary. + // - The pipeline contains one or more stages which must always merge on mongoS. + const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || + (needsPrimaryShardMerge && executionNsRoutingInfo && + *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); + + boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; + + if (needsSplit) { + splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); + + exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + opCtx, splitPipeline->mergePipeline.get()); + } + + // Generate the command object for the targeted shards. + BSONObj targetedCommand = splitPipeline + ? createCommandForTargetedShards( + opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) + : createPassthroughCommandForShard( + opCtx, aggRequest, boost::none, pipeline.get(), collationObj); + + // Refresh the shard registry if we're targeting all shards. We need the shard registry + // to be at least as current as the logical time used when creating the command for + // $changeStream to work reliably, so we do a "hard" reload. + if (mustRunOnAll) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + shardRegistry->reload(opCtx); + } + } + + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAll) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and + // should not participate in the shard version protocol. + shardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db(), + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target + // shards, and should participate in the shard version protocol. + invariant(executionNsRoutingInfo); + shardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db(), + executionNss, + *executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation()); + } + } else { + cursors = establishShardCursors(opCtx, + executionNss, + liteParsedPipeline, + executionNsRoutingInfo, + targetedCommand, + aggRequest, + ReadPreferenceSetting::get(opCtx), + shardQuery); + invariant(cursors.size() % shardIds.size() == 0, + str::stream() << "Number of cursors (" << cursors.size() + << ") is not a multiple of producers (" + << shardIds.size() + << ")"); + } + + // Convert remote cursors into a vector of "owned" cursors. + std::vector<OwnedRemoteCursor> ownedCursors; + for (auto&& cursor : cursors) { + ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); + } + + // Record the number of shards involved in the aggregation. If we are required to merge on + // the primary shard, but the primary shard was not in the set of targeted shards, then we + // must increment the number of involved shards. + CurOp::get(opCtx)->debug().nShards = + shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && + !shardIds.count(executionNsRoutingInfo->db().primaryId())); + + return DispatchShardPipelineResults{needsPrimaryShardMerge, + std::move(ownedCursors), + std::move(shardResults), + std::move(splitPipeline), + std::move(pipeline), + targetedCommand, + shardIds.size(), + exchangeSpec}; +} + +std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions) { + // Explain is not supported for auxiliary lookups. + invariant(!expCtx->explain); + + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); + if (pipelineOptions.optimize) { + pipeline->optimizePipeline(); + } + if (pipelineOptions.attachCursorSource) { + // 'attachCursorSourceToPipeline' handles any complexity related to sharding. + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); + } + + return pipeline; +} + + +std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + + // Generate the command object for the targeted shards. + std::vector<BSONObj> rawStages = [pipeline]() { + auto serialization = pipeline->serialize(); + std::vector<BSONObj> stages; + stages.reserve(serialization.size()); + + for (const auto& stageObj : serialization) { + invariant(stageObj.getType() == BSONType::Object); + stages.push_back(stageObj.getDocument().toBson()); + } + + return stages; + }(); + + AggregationRequest aggRequest(expCtx->ns, rawStages); + LiteParsedPipeline liteParsedPipeline(aggRequest); + auto shardDispatchResults = MongoSInterface::dispatchShardPipeline( + expCtx, + expCtx->ns, + aggRequest, + liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)), + expCtx->collation); + + std::vector<ShardId> targetedShards; + targetedShards.reserve(shardDispatchResults.remoteCursors.size()); + for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { + targetedShards.emplace_back(remoteCursor->getShardId().toString()); + } + + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; + boost::optional<BSONObj> shardCursorsSortSpec = boost::none; + if (shardDispatchResults.splitPipeline) { + mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); + shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; + } else { + mergePipeline = std::move(shardDispatchResults.pipelineForSingleShard); + } + + cluster_aggregation_planner::addMergeCursorsSource( + mergePipeline.get(), + liteParsedPipeline, + shardDispatchResults.commandForTargetedShards, + std::move(shardDispatchResults.remoteCursors), + targetedShards, + shardCursorsSortSpec, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor()); + + return mergePipeline; +} + boost::optional<Document> MongoSInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index f2806baf001..6111f3346d8 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -32,6 +32,10 @@ #include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/async_requests_sender.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/query/cluster_aggregation_planner.h" +#include "mongo/s/query/owned_remote_cursor.h" namespace mongo { @@ -41,6 +45,81 @@ namespace mongo { */ class MongoSInterface final : public MongoProcessCommon { public: + struct DispatchShardPipelineResults { + // True if this pipeline was split, and the second half of the pipeline needs to be run on + // the primary shard for the database. + bool needsPrimaryShardMerge; + + // Populated if this *is not* an explain, this vector represents the cursors on the remote + // shards. + std::vector<OwnedRemoteCursor> remoteCursors; + + // Populated if this *is* an explain, this vector represents the results from each shard. + std::vector<AsyncRequestsSender::Response> remoteExplainOutput; + + // The split version of the pipeline if more than one shard was targeted, otherwise + // boost::none. + boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; + + // If the pipeline targeted a single shard, this is the pipeline to run on that shard. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; + + // The command object to send to the targeted shards. + BSONObj commandForTargetedShards; + + // How many exchange producers are running the shard part of splitPipeline. + size_t numProducers; + + // The exchange specification if the query can run with the exchange otherwise boost::none. + boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + }; + + static Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req); + + static BSONObj createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + const boost::optional<ShardId>& shardId, + Pipeline* pipeline, + BSONObj collationObj); + + /** + * Appends information to the command sent to the shards which should be appended both if this + * is a passthrough sent to a single shard and if this is a split pipeline. + */ + static BSONObj genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const boost::optional<ShardId>& shardId, + const AggregationRequest& request, + BSONObj collationObj); + + static BSONObj createCommandForTargetedShards( + OperationContext* opCtx, + const AggregationRequest& request, + const cluster_aggregation_planner::SplitPipeline& splitPipeline, + const BSONObj collationObj, + const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + bool needsMerge); + + static std::set<ShardId> getTargetedShards( + OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj shardQuery, + const BSONObj collation); + + static bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe); + + static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo( + OperationContext* opCtx, const NamespaceString& execNss); + + static DispatchShardPipelineResults dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj); + MongoSInterface() = default; virtual ~MongoSInterface() = default; @@ -119,10 +198,8 @@ public: MONGO_UNREACHABLE; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { - MONGO_UNREACHABLE; - } + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final { MONGO_UNREACHABLE; @@ -133,12 +210,10 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) final { - MONGO_UNREACHABLE; - } + const MakePipelineOptions pipelineOptions) final; /** * The following methods only make sense for data-bearing nodes and should never be called on diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index bf0987ea4ab..1eb3af3aa97 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -274,45 +274,35 @@ void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( _client.runCommand("admin", renameCommandObj, info)); } -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoInterfaceStandalone::makePipeline( +std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } - Status cursorStatus = Status::OK(); - if (opts.attachCursorSource) { - cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } - return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; + return pipeline; } -Status MongoInterfaceStandalone::attachCursorSourceToPipeline( +unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { invariant(pipeline->getSources().empty() || !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); boost::optional<AutoGetCollectionForReadCommand> autoColl; if (expCtx->uuid) { - try { - autoColl.emplace(expCtx->opCtx, - NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}, - AutoGetCollection::ViewMode::kViewsForbidden, - Date_t::max(), - AutoStatsTracker::LogMode::kUpdateTop); - } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { - // The UUID doesn't exist anymore - return ex.toStatus(); - } + autoColl.emplace(expCtx->opCtx, + NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}, + AutoGetCollection::ViewMode::kViewsForbidden, + Date_t::max(), + AutoStatsTracker::LogMode::kUpdateTop); } else { autoColl.emplace(expCtx->opCtx, expCtx->ns, @@ -337,7 +327,7 @@ Status MongoInterfaceStandalone::attachCursorSourceToPipeline( // the initial cursor stage. pipeline->optimizePipeline(); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const { @@ -381,7 +371,7 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument( nss, collectionUUID, _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); - pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx)); + pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return boost::none; } diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 12627655cd5..f1c7fbcc910 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -87,12 +87,12 @@ public: const NamespaceString& targetNs, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) final; - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final; - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final; + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( OperationContext* opCtx, const NamespaceString&, UUID) const override; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 08b9e073f35..b9d7befb857 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -116,15 +116,15 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) override { MONGO_UNREACHABLE; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) override { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override { MONGO_UNREACHABLE; } diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 015ea1c66d1..d06c2e91373 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -26,10 +26,10 @@ env.Library( target='cluster_aggregate', source=[ 'cluster_aggregate.cpp', - 'cluster_aggregation_planner.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner', '$BUILD_DIR/mongo/s/query/cluster_client_cursor', '$BUILD_DIR/mongo/db/pipeline/mongos_process_interface', '$BUILD_DIR/mongo/db/views/views', @@ -37,6 +37,16 @@ env.Library( ] ) +env.Library( + target='cluster_aggregation_planner', + source=[ + 'cluster_aggregation_planner.cpp', + ], + LIBDEPS=[ + 'cluster_query', + ] +) + env.CppUnitTest( target="cluster_aggregate_test", source=[ diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 9fc978b46ee..506842da29c 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -82,7 +82,6 @@ namespace mongo { using SplitPipeline = cluster_aggregation_planner::SplitPipeline; -MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToEstablishMergingShardCursor); MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToDispatchExchangeConsumerPipeline); @@ -90,41 +89,6 @@ constexpr unsigned ClusterAggregate::kMaxViewRetries; namespace { -Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) { - // The idempotent retry policy will retry even for writeConcern failures, so only set it if the - // pipeline does not support writeConcern. - if (req.getWriteConcern()) { - return Shard::RetryPolicy::kNotIdempotent; - } - return Shard::RetryPolicy::kIdempotent; -} - -// Given a document representing an aggregation command such as -// -// {aggregate: "myCollection", pipeline: [], ...}, -// -// produces the corresponding explain command: -// -// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} -Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { - MutableDocument explainCommandBuilder; - explainCommandBuilder["explain"] = Value(aggregateCommand); - // Downstream host targeting code expects queryOptions at the top level of the command object. - explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = - Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); - - // readConcern needs to be promoted to the top-level of the request. - explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = - Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); - - // Add explain command options. - for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { - explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); - } - - return explainCommandBuilder.freeze(); -} - Status appendCursorResponseToCommandResult(const ShardId& shardId, const BSONObj cursorResponse, BSONObjBuilder* result) { @@ -138,143 +102,6 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId, return getStatusFromCommandResult(result->asTempObj()); } -bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) { - // The following aggregations must be routed to all shards: - // - Any collectionless aggregation, such as non-localOps $currentOp. - // - Any aggregation which begins with a $changeStream stage. - return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); -} - -StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss) { - // First, verify that there are shards present in the cluster. If not, then we return the - // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because - // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on - // a collection before its enclosing database is created. However, if there are no shards - // present, then $changeStream should immediately return an empty cursor just as other - // aggregations do when the database does not exist. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - if (shardIds.size() == 0) { - return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; - } - - // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not - // exist. - return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); -} - -std::set<ShardId> getTargetedShards(OperationContext* opCtx, - bool mustRunOnAllShards, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const BSONObj shardQuery, - const BSONObj collation) { - if (mustRunOnAllShards) { - // The pipeline begins with a stage which must be run on all shards. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - return {shardIds.begin(), shardIds.end()}; - } - - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo); - - return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); -} - -/** - * Appends information to the command sent to the shards which should be appended both if this is a - * passthrough sent to a single shard and if this is a split pipeline. - */ -BSONObj genericTransformForShards(MutableDocument&& cmdForShards, - OperationContext* opCtx, - const boost::optional<ShardId>& shardId, - const AggregationRequest& request, - BSONObj collationObj) { - cmdForShards[AggregationRequest::kFromMongosName] = Value(true); - // If this is a request for an aggregation explain, then we must wrap the aggregate inside an - // explain command. - if (auto explainVerbosity = request.getExplain()) { - cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); - } - - if (!collationObj.isEmpty()) { - cmdForShards[AggregationRequest::kCollationName] = Value(collationObj); - } - - if (opCtx->getTxnNumber()) { - invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(), - str::stream() << "Command for shards unexpectedly had the " - << OperationSessionInfo::kTxnNumberFieldName - << " field set: " - << cmdForShards.peek().toString()); - cmdForShards[OperationSessionInfo::kTxnNumberFieldName] = - Value(static_cast<long long>(*opCtx->getTxnNumber())); - } - - auto aggCmd = cmdForShards.freeze().toBson(); - - if (shardId) { - if (auto txnRouter = TransactionRouter::get(opCtx)) { - aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd); - } - } - - // agg creates temp collection and should handle implicit create separately. - return appendAllowImplicitCreate(aggCmd, true); -} - -BSONObj createPassthroughCommandForShard(OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional<ShardId>& shardId, - Pipeline* pipeline, - BSONObj collationObj) { - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - if (pipeline) { - targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); - } - - return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj); -} - -BSONObj createCommandForTargetedShards( - OperationContext* opCtx, - const AggregationRequest& request, - const SplitPipeline& splitPipeline, - const BSONObj collationObj, - const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, - bool needsMerge) { - - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it - // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may - // have detected a logged in user and appended that user name to the $listSessions spec to - // send to the shards. - targetedCmd[AggregationRequest::kPipelineName] = - Value(splitPipeline.shardsPipeline->serialize()); - - // When running on many shards with the exchange we may not need merging. - if (needsMerge) { - targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); - - // For split pipelines which need merging, do *not* propagate the writeConcern to the shards - // part. Otherwise this is part of an exchange and in that case we should include the - // writeConcern. - targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); - } - - targetedCmd[AggregationRequest::kCursorName] = - Value(DOC(AggregationRequest::kBatchSizeName << 0)); - - targetedCmd[AggregationRequest::kExchangeName] = - exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); - - return genericTransformForShards( - std::move(targetedCmd), opCtx, boost::none, request, collationObj); -} - BSONObj createCommandForMergingShard(const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const ShardId& shardId, @@ -302,252 +129,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request, return appendAllowImplicitCreate(aggCmd, true); } -std::vector<RemoteCursor> establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const BSONObj& cmdObj, - const AggregationRequest& request, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery) { - LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; - - const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe); - std::set<ShardId> shardIds = - getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); - std::vector<std::pair<ShardId, BSONObj>> requests; - - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo || mustRunOnAll); - - if (mustRunOnAll) { - // The pipeline contains a stage which must be run on all shards. Skip versioning and - // enqueue the raw command objects. - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); - } - } else if (routingInfo->cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation, and build versioned requests for them. - for (auto& shardId : shardIds) { - auto versionedCmdObj = - appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); - requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); - } - } else { - // The collection is unsharded. Target only the primary shard for the database. - // Don't append shard version info when contacting the config servers. - requests.emplace_back(routingInfo->db().primaryId(), - !routingInfo->db().primary()->isConfig() - ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) - : cmdObj); - } - - if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " - "until fail point is disabled."; - while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - sleepsecs(1); - } - } - - return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - readPref, - requests, - false /* do not allow partial results */, - getDesiredRetryPolicy(request)); -} - -struct DispatchShardPipelineResults { - // True if this pipeline was split, and the second half of the pipeline needs to be run on the - // primary shard for the database. - bool needsPrimaryShardMerge; - - // Populated if this *is not* an explain, this vector represents the cursors on the remote - // shards. - std::vector<OwnedRemoteCursor> remoteCursors; - - // Populated if this *is* an explain, this vector represents the results from each shard. - std::vector<AsyncRequestsSender::Response> remoteExplainOutput; - - // The split version of the pipeline if more than one shard was targeted, otherwise boost::none. - boost::optional<SplitPipeline> splitPipeline; - - // If the pipeline targeted a single shard, this is the pipeline to run on that shard. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; - - // The command object to send to the targeted shards. - BSONObj commandForTargetedShards; - - // How many exchange producers are running the shard part of splitPipeline. - size_t numProducers; - - // The exchange specification if the query can run with the exchange otherwise boost::none. - boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; -}; - -/** - * Targets shards for the pipeline and returns a struct with the remote cursors or results, and - * the pipeline that will need to be executed to merge the results from the remotes. If a stale - * shard version is encountered, refreshes the routing table and tries again. - */ -DispatchShardPipelineResults dispatchShardPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& executionNss, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - BSONObj collationObj) { - // The process is as follows: - // - First, determine whether we need to target more than one shard. If so, we split the - // pipeline; if not, we retain the existing pipeline. - // - Call establishShardCursors to dispatch the aggregation to the targeted shards. - // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the - // entire aggregation commmand. - auto cursors = std::vector<RemoteCursor>(); - auto shardResults = std::vector<AsyncRequestsSender::Response>(); - auto opCtx = expCtx->opCtx; - - const bool needsPrimaryShardMerge = - (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); - - const bool needsMongosMerge = pipeline->needsMongosMerger(); - - const auto shardQuery = pipeline->getInitialQuery(); - - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); - - // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. - // Otherwise, uassert on all exceptions here. - if (!(liteParsedPipeline.hasChangeStream() && - executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { - uassertStatusOK(executionNsRoutingInfoStatus); - } - - auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() - ? std::move(executionNsRoutingInfoStatus.getValue()) - : boost::optional<CachedCollectionRoutingInfo>{}; - - // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline); - std::set<ShardId> shardIds = getTargetedShards( - opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->computeAndSetAtClusterTime( - opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); - } - - // Don't need to split the pipeline if we are only targeting a single shard, unless: - // - There is a stage that needs to be run on the primary shard and the single target shard - // is not the primary. - // - The pipeline contains one or more stages which must always merge on mongoS. - const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || - (needsPrimaryShardMerge && executionNsRoutingInfo && - *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); - - boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; - boost::optional<SplitPipeline> splitPipeline; - - if (needsSplit) { - splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); - - exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( - opCtx, splitPipeline->mergePipeline.get()); - } - - // Generate the command object for the targeted shards. - BSONObj targetedCommand = splitPipeline - ? createCommandForTargetedShards( - opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) - : createPassthroughCommandForShard( - opCtx, aggRequest, boost::none, pipeline.get(), collationObj); - - // Refresh the shard registry if we're targeting all shards. We need the shard registry - // to be at least as current as the logical time used when creating the command for - // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAll) { - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { - shardRegistry->reload(opCtx); - } - } - - // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. - if (expCtx->explain) { - if (mustRunOnAll) { - // Some stages (such as $currentOp) need to be broadcast to all shards, and - // should not participate in the shard version protocol. - shardResults = - scatterGatherUnversionedTargetAllShards(opCtx, - executionNss.db(), - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - } else { - // Aggregations on a real namespace should use the routing table to target - // shards, and should participate in the shard version protocol. - invariant(executionNsRoutingInfo); - shardResults = - scatterGatherVersionedTargetByRoutingTable(opCtx, - executionNss.db(), - executionNss, - *executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - shardQuery, - aggRequest.getCollation()); - } - } else { - cursors = establishShardCursors(opCtx, - executionNss, - liteParsedPipeline, - executionNsRoutingInfo, - targetedCommand, - aggRequest, - ReadPreferenceSetting::get(opCtx), - shardQuery); - invariant(cursors.size() % shardIds.size() == 0, - str::stream() << "Number of cursors (" << cursors.size() - << ") is not a multiple of producers (" - << shardIds.size() - << ")"); - } - - // Convert remote cursors into a vector of "owned" cursors. - std::vector<OwnedRemoteCursor> ownedCursors; - for (auto&& cursor : cursors) { - ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); - } - - // Record the number of shards involved in the aggregation. If we are required to merge on - // the primary shard, but the primary shard was not in the set of targeted shards, then we - // must increment the number of involved shards. - CurOp::get(opCtx)->debug().nShards = - shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && - !shardIds.count(executionNsRoutingInfo->db().primaryId())); - - return DispatchShardPipelineResults{needsPrimaryShardMerge, - std::move(ownedCursors), - std::move(shardResults), - std::move(splitPipeline), - std::move(pipeline), - targetedCommand, - shardIds.size(), - exchangeSpec}; -} - -DispatchShardPipelineResults dispatchExchangeConsumerPipeline( +MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, BSONObj collationObj, - DispatchShardPipelineResults* shardDispatchResults) { + MongoSInterface::DispatchShardPipelineResults* shardDispatchResults) { invariant(!liteParsedPipeline.hasChangeStream()); auto opCtx = expCtx->opCtx; @@ -584,7 +172,7 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none); - auto consumerCmdObj = createCommandForTargetedShards( + auto consumerCmdObj = MongoSInterface::createCommandForTargetedShards( opCtx, aggRequest, consumerPipelines.back(), collationObj, boost::none, false); requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], @@ -617,16 +205,16 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront()); mergeCursors->dismissCursorOwnership(); } - return DispatchShardPipelineResults{false, - std::move(ownedCursors), - {} /*TODO SERVER-36279*/, - std::move(splitPipeline), - nullptr, - BSONObj(), - numConsumers}; + return MongoSInterface::DispatchShardPipelineResults{false, + std::move(ownedCursors), + {} /*TODO SERVER-36279*/, + std::move(splitPipeline), + nullptr, + BSONObj(), + numConsumers}; } -Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, +Status appendExplainResults(MongoSInterface::DispatchShardPipelineResults&& dispatchResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, BSONObjBuilder* result) { if (dispatchResults.splitPipeline) { @@ -688,7 +276,7 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); - Shard::RetryPolicy retryPolicy = getDesiredRetryPolicy(request); + Shard::RetryPolicy retryPolicy = MongoSInterface::getDesiredRetryPolicy(request); return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy)); } @@ -914,36 +502,16 @@ ShardId pickMergingShard(OperationContext* opCtx, : targetedShards[prng.nextInt32(targetedShards.size())]; } -// "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the -// pipeline. We won't try to execute anything on a mongos, but we still have to populate this map so -// that any $lookups, etc. will be able to have a resolved view definition. It's okay that this is -// incorrect, we will repopulate the real namespace map on the mongod. Note that this function must -// be called before forwarding an aggregation command on an unsharded collection, in order to verify -// that the involved namespaces are allowed to be sharded. -StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces( - OperationContext* opCtx, const LiteParsedPipeline& litePipe) { - - StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; - for (auto&& nss : litePipe.getInvolvedNamespaces()) { - const auto resolvedNsRoutingInfo = - uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - uassert(28769, - str::stream() << nss.ns() << " cannot be sharded", - !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss)); - resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); - } - return resolvedNamespaces; -} - -// Build an appropriate ExpressionContext for the pipeline. This helper validates that all involved -// namespaces are unsharded, instantiates an appropriate collator, creates a MongoProcessInterface -// for use by the pipeline's stages, and optionally extracts the UUID from the collection info if -// present. -boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - BSONObj collationObj, - boost::optional<UUID> uuid) { +// Build an appropriate ExpressionContext for the pipeline. This helper instantiates an appropriate +// collator, creates a MongoProcessInterface for use by the pipeline's stages, and optionally +// extracts the UUID from the collection info if present. +boost::intrusive_ptr<ExpressionContext> makeExpressionContext( + OperationContext* opCtx, + const AggregationRequest& request, + const LiteParsedPipeline& litePipe, + BSONObj collationObj, + boost::optional<UUID> uuid, + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces) { std::unique_ptr<CollatorInterface> collation; if (!collationObj.isEmpty()) { @@ -958,7 +526,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* request, std::move(collation), std::make_shared<MongoSInterface>(), - resolveInvolvedNamespaces(opCtx, litePipe), + std::move(resolvedNamespaces), uuid); mergeCtx->inMongos = true; @@ -1002,7 +570,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex const AggregationRequest& request, const LiteParsedPipeline& litePipe, const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - DispatchShardPipelineResults&& shardDispatchResults, + MongoSInterface::DispatchShardPipelineResults&& shardDispatchResults, BSONObjBuilder* result) { // We should never be in a situation where we call this function on a non-merge pipeline. invariant(shardDispatchResults.splitPipeline); @@ -1089,7 +657,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, const AggregationRequest& request, BSONObjBuilder* result) { uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec()); - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); + auto executionNsRoutingInfoStatus = + MongoSInterface::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); boost::optional<CachedCollectionRoutingInfo> routingInfo; LiteParsedPipeline litePipe(request); @@ -1109,18 +678,38 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // Determine whether this aggregation must be dispatched to all shards in the cluster. - const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, litePipe); + const bool mustRunOnAll = + MongoSInterface::mustRunOnAllShards(namespaces.executionNss, litePipe); // If we don't have a routing table, then this is a $changeStream which must run on all shards. invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream())); - // If this pipeline is not on a sharded collection, is allowed to be forwarded to shards, does - // not need to run on all shards, and doesn't need to go through DocumentSource::serialize(), - // then go ahead and pass it through to the owning shard unmodified. Note that we first call - // resolveInvolvedNamespaces to validate that none of the namespaces are sharded. + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + bool involvesShardedCollections = false; + for (auto&& nss : litePipe.getInvolvedNamespaces()) { + const auto resolvedNsRoutingInfo = + uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + + uassert(28769, + str::stream() << nss.ns() << " cannot be sharded", + !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss)); + + resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); + if (resolvedNsRoutingInfo.cm()) { + involvesShardedCollections = true; + } + } + + // A pipeline is allowed to passthrough to the primary shard iff the following conditions are + // met: + // + // 1. The namespace of the aggregate and any other involved namespaces are unsharded. + // 2. Is allowed to be forwarded to shards. + // 3. Does not need to run on all shards. + // 4. Doesn't need transformation via DocumentSource::serialize(). if (routingInfo && !routingInfo->cm() && !mustRunOnAll && - litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) { - resolveInvolvedNamespaces(opCtx, litePipe); + litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos() && + !involvesShardedCollections) { const auto primaryShardId = routingInfo->db().primary()->getId(); return aggPassthrough(opCtx, namespaces, primaryShardId, request, litePipe, result); } @@ -1133,7 +722,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the // pipeline's stages. - auto expCtx = makeExpressionContext(opCtx, request, litePipe, collationObj, uuid); + auto expCtx = makeExpressionContext( + opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces)); // Parse and optimize the full pipeline. auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); @@ -1154,7 +744,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = dispatchShardPipeline( + auto shardDispatchResults = MongoSInterface::dispatchShardPipeline( expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); // If the operation is an explain, then we verify that it succeeded on all targeted shards, @@ -1237,7 +827,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, BSONObj())); + MongoSInterface::createPassthroughCommandForShard( + opCtx, aggRequest, shardId, nullptr, BSONObj())); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, |