diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-01-16 12:42:27 -0500 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-01-16 14:16:34 -0500 |
commit | 4eabf1ea6225f444b3b0b3b2fee785aaa306212f (patch) | |
tree | 53fc00f7e31089dcb3ffb4c16f770b0a5468c3b9 /src | |
parent | 2f788aa745ca1366704b821087225af49ce3285a (diff) | |
download | mongo-4eabf1ea6225f444b3b0b3b2fee785aaa306212f.tar.gz |
Revert "SERVER-32308: Add the ability for a $lookup stage to execute on mongos against a sharded foreign collection"
This reverts commit 7298d273c0497f2720ec1471ad0f4910bff07af4.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.h | 20 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongo_process_interface.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stub_mongo_process_interface.h | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 512 | ||||
-rw-r--r-- | src/mongo/s/commands/pipeline_s.cpp | 526 | ||||
-rw-r--r-- | src/mongo/s/commands/pipeline_s.h | 73 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.h | 14 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_internal_cursor.h | 56 |
15 files changed, 545 insertions, 760 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index d975895dc4d..cf1267eeb03 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -218,7 +218,7 @@ public: MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { return false; } @@ -236,16 +236,16 @@ public: } if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); } return pipeline; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 6db48d43850..05a62606bb6 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -75,16 +75,16 @@ public: } if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); } return pipeline; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_results)); - return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } private: diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index ecc9d572bfe..e4fd70920f6 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -283,8 +283,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. - pipeline = uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( - _fromExpCtx, pipeline.release())); + uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( + _fromExpCtx, pipeline.get())); } // If the cache has been abandoned, release it. @@ -614,39 +614,6 @@ void DocumentSourceLookUp::initializeIntrospectionPipeline() { sources.empty() || !sources.front()->constraints().isChangeStreamStage()); } -DocumentSource::StageConstraints DocumentSourceLookUp::constraints( - Pipeline::SplitState pipeState) const { - - const bool mayUseDisk = wasConstructedWithPipelineSyntax() && - std::any_of(_parsedIntrospectionPipeline->getSources().begin(), - _parsedIntrospectionPipeline->getSources().end(), - [](const auto& source) { - return source->constraints().diskRequirement == - DiskUseRequirement::kWritesTmpData; - }); - - // If executing on mongos and the foreign collection is sharded, then this stage can run on - // mongos. - HostTypeRequirement hostReq; - if (pExpCtx->inMongos) { - hostReq = pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs) - ? HostTypeRequirement::kMongoS - : HostTypeRequirement::kPrimaryShard; - } else { - hostReq = HostTypeRequirement::kPrimaryShard; - } - - StageConstraints constraints(StreamType::kStreaming, - PositionRequirement::kNone, - hostReq, - mayUseDisk ? DiskUseRequirement::kWritesTmpData - : DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed); - - constraints.canSwapWithMatch = true; - return constraints; -} - void DocumentSourceLookUp::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { Document doc; diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 7424b2ef97d..530c62f985c 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -96,7 +96,25 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints(Pipeline::SplitState pipeState) const final; + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + const bool mayUseDisk = wasConstructedWithPipelineSyntax() && + std::any_of(_parsedIntrospectionPipeline->getSources().begin(), + _parsedIntrospectionPipeline->getSources().end(), + [](const auto& source) { + return source->constraints().diskRequirement == + DiskUseRequirement::kWritesTmpData; + }); + + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + mayUseDisk ? DiskUseRequirement::kWritesTmpData + : DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + + constraints.canSwapWithMatch = true; + return constraints; + } GetDepsReturn getDependencies(DepsTracker* deps) const final; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 77feab2f825..175806d5f7e 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -101,16 +101,16 @@ public: } if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); } return pipeline; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } boost::optional<Document> lookupSingleDocument( diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index b77c549a855..dfe4ece48ac 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -511,14 +511,14 @@ public: } if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); } return pipeline; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { if (pipeline->popFrontWithCriteria("$match") || pipeline->popFrontWithCriteria("$sort") || @@ -529,7 +529,7 @@ public: } pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } private: diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 7d78c880d1b..d32e8276c63 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -156,16 +156,12 @@ public: const MakePipelineOptions opts = MakePipelineOptions{}) = 0; /** - * Accepts a pipeline and returns a new one which will draw input from the underlying - * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be + * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This + * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be * the only case where NamespaceNotFound is returned. - * - * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr. - * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the - * compiler expects to find an implementation of PipelineDeleter. */ - virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( + virtual Status attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; /** diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 16d6d9e325c..7ce395f3716 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -585,8 +585,8 @@ DBClientBase* PipelineD::MongoDInterface::directClient() { } bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - AutoGetCollectionForRead autoColl(opCtx, nss); - // TODO SERVER-32198: Use CollectionShardingState::collectionIsSharded() to confirm sharding + AutoGetCollectionForReadCommand autoColl(opCtx, nss); + // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding // state. auto css = CollectionShardingState::get(opCtx, nss); return bool(css->getMetadata()); @@ -689,15 +689,16 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterfac pipeline.getValue()->optimizePipeline(); } + Status cursorStatus = Status::OK(); + if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); } - return pipeline; + return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; } -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> -PipelineD::MongoDInterface::attachCursorSourceToPipeline( +Status PipelineD::MongoDInterface::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { invariant(pipeline->getSources().empty() || !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); @@ -728,7 +729,7 @@ PipelineD::MongoDInterface::attachCursorSourceToPipeline( PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); - return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } std::vector<BSONObj> PipelineD::MongoDInterface::getCurrentOps( diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index f626b0f904b..afbb6b8f73f 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -95,8 +95,8 @@ public: const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final; - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final; std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, CurrentOpConnectionsMode connMode, CurrentOpUserMode userMode, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 488cf97c58e..0ec37b15eb4 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -108,8 +108,8 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) override { MONGO_UNREACHABLE; } diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index f7e55ee4965..db8e9fed543 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -62,13 +62,43 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/establish_cursors.h" +#include "mongo/s/query/router_stage_update_on_add_shard.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" +#include "mongo/util/fail_point.h" #include "mongo/util/log.h" namespace mongo { +MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors); + namespace { +// Given a document representing an aggregation command such as +// +// {aggregate: "myCollection", pipeline: [], ...}, +// +// produces the corresponding explain command: +// +// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} +Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { + MutableDocument explainCommandBuilder; + explainCommandBuilder["explain"] = Value(aggregateCommand); + // Downstream host targeting code expects queryOptions at the top level of the command object. + explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = + Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); + + // readConcern needs to be promoted to the top-level of the request. + explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = + Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); + + // Add explain command options. + for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { + explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); + } + + return explainCommandBuilder.freeze(); +} + Status appendExplainResults( const std::vector<AsyncRequestsSender::Response>& shardResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, @@ -114,6 +144,15 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId, return getStatusFromCommandResult(result->asTempObj()); } +bool mustRunOnAllShards(const NamespaceString& nss, + const CachedCollectionRoutingInfo& routingInfo, + const LiteParsedPipeline& litePipe) { + // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection + // must run on all shards. + const bool nsIsSharded = static_cast<bool>(routingInfo.cm()); + return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream()); +} + StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, const NamespaceString& execNss, CatalogCache* catalogCache) { @@ -135,6 +174,65 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte return swRoutingInfo; } +std::set<ShardId> getTargetedShards(OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + const CachedCollectionRoutingInfo& routingInfo, + const BSONObj shardQuery, + const BSONObj collation) { + if (mustRunOnAllShards(nss, routingInfo, litePipe)) { + // The pipeline begins with a stage which must be run on all shards. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + return {shardIds.begin(), shardIds.end()}; + } + + if (routingInfo.cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation. + std::set<ShardId> shardIds; + routingInfo.cm()->getShardIdsForQuery(opCtx, shardQuery, collation, &shardIds); + return shardIds; + } + + // The collection is unsharded. Target only the primary shard for the database. + return {routingInfo.primaryId()}; +} + +BSONObj createCommandForTargetedShards( + const AggregationRequest& request, + const BSONObj originalCmdObj, + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) { + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + targetedCmd[AggregationRequest::kFromMongosName] = Value(true); + + // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough. + if (pipelineForTargetedShards) { + targetedCmd[AggregationRequest::kPipelineName] = + Value(pipelineForTargetedShards->serialize()); + + if (pipelineForTargetedShards->isSplitForShards()) { + targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + targetedCmd[AggregationRequest::kCursorName] = + Value(DOC(AggregationRequest::kBatchSizeName << 0)); + } + } + + // If this pipeline is not split, ensure that the write concern is propagated if present. + if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) { + targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); + } + + // If this is a request for an aggregation explain, then we must wrap the aggregate inside an + // explain command. + if (auto explainVerbosity = request.getExplain()) { + targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity)); + } + + return targetedCmd.freeze().toBson(); +} + BSONObj createCommandForMergingShard( const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, @@ -157,6 +255,247 @@ BSONObj createCommandForMergingShard( return mergeCmd.freeze().toBson(); } +StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + CachedCollectionRoutingInfo* routingInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery, + const BSONObj& collation) { + LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; + + std::set<ShardId> shardIds = + getTargetedShards(opCtx, nss, litePipe, *routingInfo, shardQuery, collation); + std::vector<std::pair<ShardId, BSONObj>> requests; + + if (mustRunOnAllShards(nss, *routingInfo, litePipe)) { + // The pipeline contains a stage which must be run on all shards. Skip versioning and + // enqueue the raw command objects. + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + } else if (routingInfo->cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation, and build versioned requests for them. + for (auto& shardId : shardIds) { + auto versionedCmdObj = + appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo->primaryId(), + !routingInfo->primary()->isConfig() + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj); + } + + if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " + "until fail point is disabled."; + while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + sleepsecs(1); + } + } + + // If we reach this point, we're either trying to establish cursors on a sharded execution + // namespace, or handling the case where a sharded collection was dropped and recreated as + // unsharded. Since views cannot be sharded, and because we will return an error rather than + // attempting to continue in the event that a recreated namespace is a view, we set + // viewDefinitionOut to nullptr. + BSONObj* viewDefinitionOut = nullptr; + auto swCursors = establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */, + viewDefinitionOut /* can't receive view definition */); + + // If any shard returned a stale shardVersion error, invalidate the routing table cache. + // This will cause the cache to be refreshed the next time it is accessed. + if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*routingInfo)); + } + + return swCursors; +} + +struct DispatchShardPipelineResults { + // True if this pipeline was split, and the second half of the pipeline needs to be run on the + // primary shard for the database. + bool needsPrimaryShardMerge; + + // Populated if this *is not* an explain, this vector represents the cursors on the remote + // shards. + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + + // Populated if this *is* an explain, this vector represents the results from each shard. + std::vector<AsyncRequestsSender::Response> remoteExplainOutput; + + // The half of the pipeline that was sent to each shard, or the entire pipeline if there was + // only one shard targeted. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards; + + // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; + + // The command object to send to the targeted shards. + BSONObj commandForTargetedShards; +}; + +/** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and + * the pipeline that will need to be executed to merge the results from the remotes. If a stale + * shard version is encountered, refreshes the routing table and tries again. + */ +StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + BSONObj originalCmdObj, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { + // The process is as follows: + // - First, determine whether we need to target more than one shard. If so, we split the + // pipeline; if not, we retain the existing pipeline. + // - Call establishShardCursors to dispatch the aggregation to the targeted shards. + // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with + // the refreshed routing table data. + // - If the pipeline is not split and we now need to target multiple shards, split it. If the + // pipeline is already split and we now only need to target a single shard, reassemble the + // original pipeline. + // - After exhausting 10 attempts to establish the cursors, we give up and throw. + auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); + auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>(); + auto opCtx = expCtx->opCtx; + + const bool needsPrimaryShardMerge = + (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); + + const bool needsMongosMerge = pipeline->needsMongosMerger(); + + const auto shardQuery = pipeline->getInitialQuery(); + + auto pipelineForTargetedShards = std::move(pipeline); + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; + BSONObj targetedCommand; + + int numAttempts = 0; + + do { + // We need to grab a new routing table at the start of each iteration, since a stale config + // exception will invalidate the previous one. + auto executionNsRoutingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss)); + + // Determine whether we can run the entire aggregation on a single shard. + std::set<ShardId> shardIds = getTargetedShards(opCtx, + executionNss, + liteParsedPipeline, + executionNsRoutingInfo, + shardQuery, + aggRequest.getCollation()); + + uassert(ErrorCodes::ShardNotFound, + "No targets were found for this aggregation. All shards were removed from the " + "cluster mid-operation", + shardIds.size() > 0); + + // Don't need to split the pipeline if we are only targeting a single shard, unless: + // - There is a stage that needs to be run on the primary shard and the single target shard + // is not the primary. + // - The pipeline contains one or more stages which must always merge on mongoS. + const bool needsSplit = + (shardIds.size() > 1u || needsMongosMerge || + (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId())); + + const bool isSplit = pipelineForTargetedShards->isSplitForShards(); + + // If we have to run on multiple shards and the pipeline is not yet split, split it. If we + // can run on a single shard and the pipeline is already split, reassemble it. + if (needsSplit && !isSplit) { + pipelineForMerging = std::move(pipelineForTargetedShards); + pipelineForTargetedShards = pipelineForMerging->splitForSharded(); + } else if (!needsSplit && isSplit) { + pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); + } + + // Generate the command object for the targeted shards. + targetedCommand = + createCommandForTargetedShards(aggRequest, originalCmdObj, pipelineForTargetedShards); + + // Refresh the shard registry if we're targeting all shards. We need the shard registry + // to be at least as current as the logical time used when creating the command for + // $changeStream to work reliably, so we do a "hard" reload. + if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + shardRegistry->reload(opCtx); + } + } + + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and should + // not participate in the shard version protocol. + swShardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db().toString(), + executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target shards, + // and should participate in the shard version protocol. + swShardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db().toString(), + executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation(), + nullptr /* viewDefinition */); + } + } else { + swCursors = establishShardCursors(opCtx, + executionNss, + liteParsedPipeline, + &executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + shardQuery, + aggRequest.getCollation()); + + if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { + LOG(1) << "got stale shardVersion error " << swCursors.getStatus() + << " while dispatching " << redact(targetedCommand) << " after " + << (numAttempts + 1) << " dispatch attempts"; + } + } + } while (++numAttempts < kMaxNumStaleVersionRetries && + (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK())); + + if (!swShardResults.isOK()) { + return swShardResults.getStatus(); + } + if (!swCursors.isOK()) { + return swCursors.getStatus(); + } + return DispatchShardPipelineResults{needsPrimaryShardMerge, + std::move(swCursors.getValue()), + std::move(swShardResults.getValue()), + std::move(pipelineForTargetedShards), + std::move(pipelineForMerging), + targetedCommand}; +} StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor( OperationContext* opCtx, @@ -181,6 +520,104 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCurs return {{std::move(mergingShardId), std::move(shardCmdResponse)}}; } +BSONObj establishMergingMongosCursor(OperationContext* opCtx, + const AggregationRequest& request, + const NamespaceString& requestedNss, + BSONObj cmdToRunOnNewShards, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, + std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { + + ClusterClientCursorParams params( + requestedNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + ReadPreferenceSetting::get(opCtx)); + + params.tailableMode = pipelineForMerging->getContext()->tailableMode; + params.mergePipeline = std::move(pipelineForMerging); + params.remotes = std::move(cursors); + + // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch + // size we pass here is used for getMores, so do not specify a batch size if the initial request + // had a batch size of 0. + params.batchSize = request.getBatchSize() == 0 + ? boost::none + : boost::optional<long long>(request.getBatchSize()); + + if (liteParsedPipeline.hasChangeStream()) { + // For change streams, we need to set up a custom stage to establish cursors on new shards + // when they are added. + params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params) { + return stdx::make_unique<RouterStageUpdateOnAddShard>( + opCtx, executor, params, cmdToRunOnNewShards); + }; + } + auto ccc = ClusterClientCursorImpl::make( + opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); + + auto cursorState = ClusterCursorManager::CursorState::NotExhausted; + BSONObjBuilder cursorResponse; + + CursorResponseBuilder responseBuilder(true, &cursorResponse); + + for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { + ClusterQueryResult next; + try { + next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind)); + } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) { + // This exception is thrown when a $changeStream stage encounters an event + // that invalidates the cursor. We should close the cursor and return without + // error. + cursorState = ClusterCursorManager::CursorState::Exhausted; + break; + } + + // Check whether we have exhausted the pipeline's results. + if (next.isEOF()) { + // We reached end-of-stream. If the cursor is not tailable, then we mark it as + // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when + // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no + // hope of returning data and thus we need to close the mongos cursor as well. + if (!ccc->isTailable() || ccc->remotesExhausted()) { + cursorState = ClusterCursorManager::CursorState::Exhausted; + } + break; + } + + // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor + // to be returned on the next getMore. + auto nextObj = *next.getResult(); + + if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { + ccc->queueResult(nextObj); + break; + } + + responseBuilder.append(nextObj); + } + + ccc->detachFromOperationContext(); + + CursorId clusterCursorId = 0; + + if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { + clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor( + opCtx, + ccc.releaseCursor(), + requestedNss, + ClusterCursorManager::CursorType::MultiTarget, + ClusterCursorManager::CursorLifetime::Mortal)); + } + + responseBuilder.done(clusterCursorId, requestedNss.ns()); + + CommandHelpers::appendCommandStatus(cursorResponse, Status::OK()); + + return cursorResponse.obj(); +} + BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard, const NamespaceString& nss) { ScopedDbConnection conn(primaryShard->getConnString()); @@ -216,10 +653,9 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, BSONObj cmdObj, BSONObjBuilder* result) { const auto catalogCache = Grid::get(opCtx)->catalogCache(); - auto executionNss = namespaces.executionNss; auto executionNsRoutingInfoStatus = - getExecutionNsRoutingInfo(opCtx, executionNss, catalogCache); + getExecutionNsRoutingInfo(opCtx, namespaces.executionNss, catalogCache); LiteParsedPipeline liteParsedPipeline(request); @@ -241,33 +677,29 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Determine the appropriate collation and 'resolve' involved namespaces to make the // ExpressionContext. - // We may not try to execute anything on mongos, but we still have to populate this map so that - // any $lookups, etc. will be able to have a resolved view definition when they are parsed. - // TODO: SERVER-32548 will add support for lookup against a sharded view, so this map needs to - // be correct to determine whether the aggregate should be passthrough or sent to all shards. + // We won't try to execute anything on a mongos, but we still have to populate this map so that + // any $lookups, etc. will be able to have a resolved view definition. It's okay that this is + // incorrect, we will repopulate the real resolved namespace map on the mongod. Note that we + // need to check if any involved collections are sharded before forwarding an aggregation + // command on an unsharded collection. StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; - bool involvesShardedCollections = false; for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) { const auto resolvedNsRoutingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + uassert( + 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm()); resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); - if (resolvedNsRoutingInfo.cm()) { - involvesShardedCollections = true; - } } - // A pipeline is allowed to passthrough to the primary shard iff the following conditions are - // met: - // - // 1. The namespace of the aggregate and any other involved namespaces are unsharded. - // 2. Is allowed to be forwarded to shards. - // 3. Does not need to run on all shards. - // 4. Doesn't need transformation via DocumentSource::serialize(). + // If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, does + // not need to run on all shards, and doesn't need transformation via + // DocumentSource::serialize(), then go ahead and pass it through to the owning shard + // unmodified. if (!executionNsRoutingInfo.cm() && - !PipelineS::mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline) && + !mustRunOnAllShards(namespaces.executionNss, executionNsRoutingInfo, liteParsedPipeline) && liteParsedPipeline.allowedToForwardFromMongos() && - liteParsedPipeline.allowedToPassthroughFromMongos() && !involvesShardedCollections) { + liteParsedPipeline.allowedToPassthroughFromMongos()) { return aggPassthrough(opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), @@ -288,7 +720,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } else { // Unsharded collection. Get collection metadata from primary chunk. auto collationObj = getDefaultCollationForUnshardedCollection( - executionNsRoutingInfo.primary().get(), executionNss); + executionNsRoutingInfo.primary().get(), namespaces.executionNss); if (!collationObj.isEmpty()) { collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(collationObj)); @@ -315,19 +747,23 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, << " is not capable of producing input", !pipeline->getSources().front()->constraints().requiresInputDocSource); - auto cursorResponse = PipelineS::establishMergingMongosCursor(opCtx, - request, - namespaces.requestedNss, - cmdObj, - liteParsedPipeline, - std::move(pipeline), - {}); + auto cursorResponse = establishMergingMongosCursor(opCtx, + request, + namespaces.requestedNss, + cmdObj, + liteParsedPipeline, + std::move(pipeline), + {}); CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result); return getStatusFromCommandResult(result->asTempObj()); } - auto dispatchResults = uassertStatusOK(PipelineS::dispatchShardPipeline( - mergeCtx, executionNss, cmdObj, request, liteParsedPipeline, std::move(pipeline))); + auto dispatchResults = uassertStatusOK(dispatchShardPipeline(mergeCtx, + namespaces.executionNss, + cmdObj, + request, + liteParsedPipeline, + std::move(pipeline))); if (mergeCtx->explain) { // If we reach here, we've either succeeded in running the explain or exhausted all @@ -372,13 +808,13 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, (!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) { // Register the new mongoS cursor, and retrieve the initial batch of results. auto cursorResponse = - PipelineS::establishMergingMongosCursor(opCtx, - request, - namespaces.requestedNss, - dispatchResults.commandForTargetedShards, - liteParsedPipeline, - std::move(mergingPipeline), - std::move(dispatchResults.remoteCursors)); + establishMergingMongosCursor(opCtx, + request, + namespaces.requestedNss, + dispatchResults.commandForTargetedShards, + liteParsedPipeline, + std::move(mergingPipeline), + std::move(dispatchResults.remoteCursors)); // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline // can never run on mongoS. Filter the command response and return immediately. @@ -393,7 +829,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto mergeResponse = uassertStatusOK( establishMergingShardCursor(opCtx, - executionNss, + namespaces.executionNss, dispatchResults.remoteCursors, mergeCmdObj, boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge, @@ -465,7 +901,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - PipelineS::createCommandForTargetedShards(aggRequest, cmdObj, nullptr)); + createCommandForTargetedShards(aggRequest, cmdObj, nullptr)); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index 45662b6aa55..26f8dad4ec2 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -26,38 +26,20 @@ * then also delete it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand - #include "mongo/platform/basic.h" #include "mongo/s/commands/pipeline_s.h" -#include "mongo/db/auth/authorization_session.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/query/collation/collation_spec.h" -#include "mongo/db/query/find_common.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/s/client/shard_registry.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/grid.h" -#include "mongo/s/query/cluster_client_cursor_impl.h" -#include "mongo/s/query/cluster_client_cursor_params.h" -#include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_cursor_manager.h" -#include "mongo/s/query/cluster_query_knobs.h" -#include "mongo/s/query/document_source_router_adapter.h" -#include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_exec_stage.h" -#include "mongo/s/query/router_stage_internal_cursor.h" -#include "mongo/s/query/router_stage_merge.h" -#include "mongo/s/query/router_stage_update_on_add_shard.h" -#include "mongo/s/query/store_possible_cursor.h" -#include "mongo/s/stale_exception.h" -#include "mongo/util/fail_point.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" namespace mongo { @@ -66,8 +48,6 @@ using std::shared_ptr; using std::string; using std::unique_ptr; -MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors); - namespace { /** * Determines the single shard to which the given query will be targeted, and its associated @@ -107,500 +87,8 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( return swRoutingInfo; } -/** - * Given a document representing an aggregation command such as - * - * {aggregate: "myCollection", pipeline: [], ...}, - * - * produces the corresponding explain command: - * - * {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} - */ -Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { - MutableDocument explainCommandBuilder; - explainCommandBuilder["explain"] = Value(aggregateCommand); - // Downstream host targeting code expects queryOptions at the top level of the command object. - explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = - Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); - - // readConcern needs to be promoted to the top-level of the request. - explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = - Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); - - // Add explain command options. - for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { - explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); - } - - return explainCommandBuilder.freeze(); -} - -std::set<ShardId> getTargetedShards(OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - const CachedCollectionRoutingInfo& routingInfo, - const BSONObj shardQuery, - const BSONObj collation) { - if (PipelineS::mustRunOnAllShards(nss, routingInfo, litePipe)) { - // The pipeline begins with a stage which must be run on all shards. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - return {shardIds.begin(), shardIds.end()}; - } - - if (routingInfo.cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation. - std::set<ShardId> shardIds; - routingInfo.cm()->getShardIdsForQuery(opCtx, shardQuery, collation, &shardIds); - return shardIds; - } - - // The collection is unsharded. Target only the primary shard for the database. - return {routingInfo.primaryId()}; -} - -StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - CachedCollectionRoutingInfo* routingInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { - LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; - - std::set<ShardId> shardIds = - getTargetedShards(opCtx, nss, litePipe, *routingInfo, shardQuery, collation); - std::vector<std::pair<ShardId, BSONObj>> requests; - - if (PipelineS::mustRunOnAllShards(nss, *routingInfo, litePipe)) { - // The pipeline contains a stage which must be run on all shards. Skip versioning and - // enqueue the raw command objects. - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); - } - } else if (routingInfo->cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation, and build versioned requests for them. - for (auto& shardId : shardIds) { - auto versionedCmdObj = - appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); - requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); - } - } else { - // The collection is unsharded. Target only the primary shard for the database. - // Don't append shard version info when contacting the config servers. - requests.emplace_back(routingInfo->primaryId(), - !routingInfo->primary()->isConfig() - ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) - : cmdObj); - } - - if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " - "until fail point is disabled."; - while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - sleepsecs(1); - } - } - - // If we reach this point, we're either trying to establish cursors on a sharded execution - // namespace, or handling the case where a sharded collection was dropped and recreated as - // unsharded. Since views cannot be sharded, and because we will return an error rather than - // attempting to continue in the event that a recreated namespace is a view, we set - // viewDefinitionOut to nullptr. - BSONObj* viewDefinitionOut = nullptr; - auto swCursors = establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - readPref, - requests, - false /* do not allow partial results */, - viewDefinitionOut /* can't receive view definition */); - - // If any shard returned a stale shardVersion error, invalidate the routing table cache. - // This will cause the cache to be refreshed the next time it is accessed. - if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*routingInfo)); - } - - return swCursors; -} - } // namespace -bool PipelineS::mustRunOnAllShards(const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, - const LiteParsedPipeline& litePipe) { - // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection - // must run on all shards. - const bool nsIsSharded = static_cast<bool>(routingInfo.cm()); - return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream()); -} - -BSONObj PipelineS::createCommandForTargetedShards( - const AggregationRequest& request, - BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) { - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - targetedCmd[AggregationRequest::kFromMongosName] = Value(true); - - // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough. - if (pipelineForTargetedShards) { - targetedCmd[AggregationRequest::kPipelineName] = - Value(pipelineForTargetedShards->serialize()); - - if (pipelineForTargetedShards->isSplitForShards()) { - targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); - targetedCmd[AggregationRequest::kCursorName] = - Value(DOC(AggregationRequest::kBatchSizeName << 0)); - } - } - - // If this pipeline is not split, ensure that the write concern is propagated if present. - if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) { - targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); - } - - // If this is a request for an aggregation explain, then we must wrap the aggregate inside an - // explain command. - if (auto explainVerbosity = request.getExplain()) { - targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity)); - } - - return targetedCmd.freeze().toBson(); -} - -BSONObj PipelineS::establishMergingMongosCursor( - OperationContext* opCtx, - const AggregationRequest& request, - const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, - std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { - - ClusterClientCursorParams params( - requestedNss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - ReadPreferenceSetting::get(opCtx)); - - params.tailableMode = pipelineForMerging->getContext()->tailableMode; - params.mergePipeline = std::move(pipelineForMerging); - params.remotes = std::move(cursors); - - // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch - // size we pass here is used for getMores, so do not specify a batch size if the initial request - // had a batch size of 0. - params.batchSize = request.getBatchSize() == 0 - ? boost::none - : boost::optional<long long>(request.getBatchSize()); - - if (liteParsedPipeline.hasChangeStream()) { - // For change streams, we need to set up a custom stage to establish cursors on new shards - // when they are added. - params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - return stdx::make_unique<RouterStageUpdateOnAddShard>( - opCtx, executor, params, cmdToRunOnNewShards); - }; - } - auto ccc = ClusterClientCursorImpl::make( - opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); - - auto cursorState = ClusterCursorManager::CursorState::NotExhausted; - BSONObjBuilder cursorResponse; - - CursorResponseBuilder responseBuilder(true, &cursorResponse); - - for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { - ClusterQueryResult next; - try { - next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind)); - } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) { - // This exception is thrown when a $changeStream stage encounters an event - // that invalidates the cursor. We should close the cursor and return without - // error. - cursorState = ClusterCursorManager::CursorState::Exhausted; - break; - } - - // Check whether we have exhausted the pipeline's results. - if (next.isEOF()) { - // We reached end-of-stream. If the cursor is not tailable, then we mark it as - // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when - // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no - // hope of returning data and thus we need to close the mongos cursor as well. - if (!ccc->isTailable() || ccc->remotesExhausted()) { - cursorState = ClusterCursorManager::CursorState::Exhausted; - } - break; - } - - // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor - // to be returned on the next getMore. - auto nextObj = *next.getResult(); - - if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { - ccc->queueResult(nextObj); - break; - } - - responseBuilder.append(nextObj); - } - - ccc->detachFromOperationContext(); - - CursorId clusterCursorId = 0; - - if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { - clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor( - opCtx, - ccc.releaseCursor(), - requestedNss, - ClusterCursorManager::CursorType::MultiTarget, - ClusterCursorManager::CursorLifetime::Mortal)); - } - - responseBuilder.done(clusterCursorId, requestedNss.ns()); - - CommandHelpers::appendCommandStatus(cursorResponse, Status::OK()); - - return cursorResponse.obj(); -} - -/** - * Targets shards for the pipeline and returns a struct with the remote cursors or results, and - * the pipeline that will need to be executed to merge the results from the remotes. If a stale - * shard version is encountered, refreshes the routing table and tries again. - */ -StatusWith<PipelineS::DispatchShardPipelineResults> PipelineS::dispatchShardPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& executionNss, - BSONObj originalCmdObj, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { - // The process is as follows: - // - First, determine whether we need to target more than one shard. If so, we split the - // pipeline; if not, we retain the existing pipeline. - // - Call establishShardCursors to dispatch the aggregation to the targeted shards. - // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with - // the refreshed routing table data. - // - If the pipeline is not split and we now need to target multiple shards, split it. If the - // pipeline is already split and we now only need to target a single shard, reassemble the - // original pipeline. - // - After exhausting 10 attempts to establish the cursors, we give up and throw. - auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); - auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>(); - auto opCtx = expCtx->opCtx; - - const bool needsPrimaryShardMerge = - (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); - - const bool needsMongosMerge = pipeline->needsMongosMerger(); - - const auto shardQuery = pipeline->getInitialQuery(); - - auto pipelineForTargetedShards = std::move(pipeline); - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; - BSONObj targetedCommand; - - int numAttempts = 0; - - do { - // We need to grab a new routing table at the start of each iteration, since a stale config - // exception will invalidate the previous one. - auto executionNsRoutingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss)); - - // Determine whether we can run the entire aggregation on a single shard. - std::set<ShardId> shardIds = getTargetedShards(opCtx, - executionNss, - liteParsedPipeline, - executionNsRoutingInfo, - shardQuery, - aggRequest.getCollation()); - - uassert(ErrorCodes::ShardNotFound, - "No targets were found for this aggregation. All shards were removed from the " - "cluster mid-operation", - shardIds.size() > 0); - - // Don't need to split the pipeline if we are only targeting a single shard, unless: - // - There is a stage that needs to be run on the primary shard and the single target shard - // is not the primary. - // - The pipeline contains one or more stages which must always merge on mongoS. - const bool needsSplit = - (shardIds.size() > 1u || needsMongosMerge || - (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId())); - - const bool isSplit = pipelineForTargetedShards->isSplitForShards(); - - // If we have to run on multiple shards and the pipeline is not yet split, split it. If we - // can run on a single shard and the pipeline is already split, reassemble it. - if (needsSplit && !isSplit) { - pipelineForMerging = std::move(pipelineForTargetedShards); - pipelineForTargetedShards = pipelineForMerging->splitForSharded(); - } else if (!needsSplit && isSplit) { - pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); - } - - // Generate the command object for the targeted shards. - targetedCommand = PipelineS::createCommandForTargetedShards( - aggRequest, originalCmdObj, pipelineForTargetedShards); - - // Refresh the shard registry if we're targeting all shards. We need the shard registry - // to be at least as current as the logical time used when creating the command for - // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { - shardRegistry->reload(opCtx); - } - } - - // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. - if (expCtx->explain) { - if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { - // Some stages (such as $currentOp) need to be broadcast to all shards, and should - // not participate in the shard version protocol. - swShardResults = - scatterGatherUnversionedTargetAllShards(opCtx, - executionNss.db().toString(), - executionNss, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - } else { - // Aggregations on a real namespace should use the routing table to target shards, - // and should participate in the shard version protocol. - swShardResults = - scatterGatherVersionedTargetByRoutingTable(opCtx, - executionNss.db().toString(), - executionNss, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - shardQuery, - aggRequest.getCollation(), - nullptr /* viewDefinition */); - } - } else { - swCursors = establishShardCursors(opCtx, - executionNss, - liteParsedPipeline, - &executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - shardQuery, - aggRequest.getCollation()); - - if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { - LOG(1) << "got stale shardVersion error " << swCursors.getStatus() - << " while dispatching " << redact(targetedCommand) << " after " - << (numAttempts + 1) << " dispatch attempts"; - } - } - } while (++numAttempts < kMaxNumStaleVersionRetries && - (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK())); - - if (!swShardResults.isOK()) { - return swShardResults.getStatus(); - } - if (!swCursors.isOK()) { - return swCursors.getStatus(); - } - return DispatchShardPipelineResults{needsPrimaryShardMerge, - std::move(swCursors.getValue()), - std::move(swShardResults.getValue()), - std::move(pipelineForTargetedShards), - std::move(pipelineForMerging), - targetedCommand}; -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineS::MongoSInterface::makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) { - // Explain is not supported for auxiliary lookups. - invariant(!expCtx->explain); - - // Temporarily remove any deadline from this operation, we don't want to timeout while doing - // this lookup. - OperationContext::DeadlineStash deadlineStash(expCtx->opCtx); - - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } - if (pipelineOptions.optimize) { - pipeline.getValue()->optimizePipeline(); - } - if (pipelineOptions.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); - } - - return pipeline; -} - -StatusWith<unique_ptr<Pipeline, PipelineDeleter>> -PipelineS::MongoSInterface::attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceRouterAdapter*>(pipeline->getSources().front().get())); - - // Generate the command object for the targeted shards. - auto serialization = pipeline->serialize(); - std::vector<BSONObj> rawStages; - rawStages.reserve(serialization.size()); - std::transform(serialization.begin(), - serialization.end(), - std::back_inserter(rawStages), - [](const Value& stageObj) { - invariant(stageObj.getType() == BSONType::Object); - return stageObj.getDocument().toBson(); - }); - AggregationRequest aggRequest(expCtx->ns, rawStages); - LiteParsedPipeline liteParsedPipeline(aggRequest); - auto dispatchStatus = PipelineS::dispatchShardPipeline( - expCtx, - expCtx->ns, - aggRequest.serializeToCommandObj().toBson(), - aggRequest, - liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx))); - - if (!dispatchStatus.isOK()) { - return dispatchStatus.getStatus(); - } - auto targetingResults = std::move(dispatchStatus.getValue()); - - auto params = stdx::make_unique<ClusterClientCursorParams>( - expCtx->ns, - AuthorizationSession::get(expCtx->opCtx->getClient())->getAuthenticatedUserNames(), - ReadPreferenceSetting::get(expCtx->opCtx)); - params->remotes = std::move(targetingResults.remoteCursors); - params->mergePipeline = std::move(targetingResults.pipelineForMerging); - - // We will transfer ownership of the params to the RouterStageInternalCursor, but need a - // reference to them to construct the RouterStageMerge. - auto* unownedParams = params.get(); - auto root = ClusterClientCursorImpl::buildMergerPlan( - expCtx->opCtx, - Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), - unownedParams); - auto routerExecutionTree = stdx::make_unique<RouterStageInternalCursor>( - expCtx->opCtx, std::move(params), std::move(root)); - - return Pipeline::create( - {DocumentSourceRouterAdapter::create(expCtx, std::move(routerExecutionTree))}, expCtx); -} - boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, @@ -704,16 +192,4 @@ std::vector<GenericCursor> PipelineS::MongoSInterface::getCursors( return cursorManager->getAllCursors(); } -bool PipelineS::MongoSInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - const auto catalogCache = Grid::get(opCtx)->catalogCache(); - - auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss); - - if (!routingInfoStatus.isOK()) { - // db doesn't exist. - return false; - } - return static_cast<bool>(routingInfoStatus.getValue().cm()); -} - } // namespace mongo diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h index 968e28f36aa..cdf72158e31 100644 --- a/src/mongo/s/commands/pipeline_s.h +++ b/src/mongo/s/commands/pipeline_s.h @@ -28,12 +28,8 @@ #pragma once -#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/mongo_process_interface.h" #include "mongo/db/pipeline/pipeline.h" -#include "mongo/s/async_requests_sender.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/query/cluster_client_cursor_params.h" namespace mongo { /** @@ -59,7 +55,9 @@ public: MONGO_UNREACHABLE; } - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { + MONGO_UNREACHABLE; + } BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, @@ -105,8 +103,10 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { + MONGO_UNREACHABLE; + } std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, CurrentOpConnectionsMode connMode, @@ -128,7 +128,9 @@ public: StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) final; + const MakePipelineOptions pipelineOptions) final { + MONGO_UNREACHABLE; + } boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -141,61 +143,6 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; }; - struct DispatchShardPipelineResults { - // True if this pipeline was split, and the second half of the pipeline needs to be run on - // the - // primary shard for the database. - bool needsPrimaryShardMerge; - - // Populated if this *is not* an explain, this vector represents the cursors on the remote - // shards. - std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; - - // Populated if this *is* an explain, this vector represents the results from each shard. - std::vector<AsyncRequestsSender::Response> remoteExplainOutput; - - // The half of the pipeline that was sent to each shard, or the entire pipeline if there was - // only one shard targeted. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards; - - // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; - - // The command object to send to the targeted shards. - BSONObj commandForTargetedShards; - }; - - static BSONObj createCommandForTargetedShards( - const AggregationRequest&, - const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards); - - static BSONObj establishMergingMongosCursor( - OperationContext*, - const AggregationRequest&, - const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, - const LiteParsedPipeline&, - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, - std::vector<ClusterClientCursorParams::RemoteCursor>); - - /** - * Targets shards for the pipeline and returns a struct with the remote cursors or results, and - * the pipeline that will need to be executed to merge the results from the remotes. If a stale - * shard version is encountered, refreshes the routing table and tries again. - */ - static StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( - const boost::intrusive_ptr<ExpressionContext>&, - const NamespaceString& executionNss, - const BSONObj originalCmdObj, - const AggregationRequest&, - const LiteParsedPipeline&, - std::unique_ptr<Pipeline, PipelineDeleter>); - - static bool mustRunOnAllShards(const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, - const LiteParsedPipeline& litePipe); - private: PipelineS() = delete; // Should never be instantiated. }; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index e11174c062b..67e334a7f47 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -115,13 +115,6 @@ public: boost::optional<ReadPreferenceSetting> getReadPreference() const final; - /** - * Constructs the pipeline of MergerPlanStages which will be used to answer the query. - */ - static std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params); - public: /** private for tests */ /** @@ -140,6 +133,13 @@ public: boost::optional<LogicalSessionId> lsid); private: + /** + * Constructs the pipeline of MergerPlanStages which will be used to answer the query. + */ + std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params); + ClusterClientCursorParams _params; // Number of documents already returned by next(). diff --git a/src/mongo/s/query/router_stage_internal_cursor.h b/src/mongo/s/query/router_stage_internal_cursor.h deleted file mode 100644 index 95ca8831648..00000000000 --- a/src/mongo/s/query/router_stage_internal_cursor.h +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/s/query/router_exec_stage.h" - -namespace mongo { - -/** - * This is a special type of RouterExecStage that is used to iterate remote cursors that were - * created internally and do not represent a client cursor, such as those used in a $lookup. - * - * The purpose of this class is to provide ownership over a ClusterClientCursorParams struct without - * creating a ClusterClientCursor, which would show up in the server stats for this mongos. - */ -class RouterStageInternalCursor final : public RouterExecStage { -public: - RouterStageInternalCursor(OperationContext* opCtx, - std::unique_ptr<ClusterClientCursorParams>&& params, - std::unique_ptr<RouterExecStage> child) - : RouterExecStage(opCtx, std::move(child)), _params(std::move(params)) {} - - StatusWith<ClusterQueryResult> next(ExecContext execContext) { - return getChildStage()->next(execContext); - } - -private: - std::unique_ptr<ClusterClientCursorParams> _params; -}; -} // namespace mongo |