diff options
author | James Wahlin <james@mongodb.com> | 2018-11-27 13:28:27 -0500 |
---|---|---|
committer | James Wahlin <james@mongodb.com> | 2018-12-12 14:41:24 -0500 |
commit | 056d61676f91f6da0a030347ae4b92255d752d8f (patch) | |
tree | 92f5b2d319ce1cd5701be912e6b96cf9a6fdaa4b /src/mongo/db | |
parent | d2573d47786b035d5bcdeaf30207bbfcd58bf14e (diff) | |
download | mongo-056d61676f91f6da0a030347ae4b92255d752d8f.tar.gz |
SERVER-32308 Support for $lookup to execute on mongos against a sharded foreign collection
Diffstat (limited to 'src/mongo/db')
15 files changed, 633 insertions, 106 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 2f1c2e27cca..71270f0e72f 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -320,7 +320,10 @@ env.Library( 'mongos_process_interface.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/s/query/async_results_merger', + '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner', + '$BUILD_DIR/mongo/s/query/cluster_query', 'mongo_process_common', ] ) diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 17622131061..737c316c9bf 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -246,8 +246,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { // with the resume token. auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); + auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx); if (auto first = pipeline->getNext()) { auto firstOplogEntry = Value(*first); uassert(40576, diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index a7b0567b66b..a94179c1431 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -458,34 +458,31 @@ public: MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index d93535dc030..1b6583bb655 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -207,8 +207,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { // We've already allocated space for the trailing $match stage in '_fromPipeline'. _fromPipeline.back() = *matchStage; - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx)); + auto pipeline = + pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx); while (auto next = pipeline->getNext()) { uassert(40271, str::stream() diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 6309f89597a..47f37939f73 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -63,30 +63,27 @@ public: MockMongoInterface(std::deque<DocumentSource::GetNextResult> results) : _results(std::move(results)) {} - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_results)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 04094a1d45b..7223f854521 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -196,9 +196,16 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { txnRequirement = resolvedRequirements.second; } + // If executing on mongos and the foreign collection is sharded, then this stage can run on + // mongos. + HostTypeRequirement hostRequirement = + (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)) + ? HostTypeRequirement::kMongoS + : HostTypeRequirement::kPrimaryShard; + StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kNone, - HostTypeRequirement::kPrimaryShard, + hostRequirement, diskRequirement, FacetRequirement::kAllowed, txnRequirement); @@ -289,8 +296,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( // If we don't have a cache, build and return the pipeline immediately. if (!_cache || _cache->isAbandoned()) { - return uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx)); + return pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx); } // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a @@ -300,8 +306,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( pipelineOpts.attachCursorSource = false; // Construct the basic pipeline without a cache stage. - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); + auto pipeline = + pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); // Add the cache stage at the end and optimize. During the optimization process, the cache will // either move itself to the correct position in the pipeline, or will abandon itself if no @@ -313,8 +319,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. - uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( - _fromExpCtx, pipeline.get())); + pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx, + pipeline.release()); } // If the cache has been abandoned, release it. diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index d17b1a73c9c..2a2b3338789 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -81,13 +81,6 @@ public: return requiredPrivileges; } - /** - * Lookup from a sharded collection is not allowed. - */ - bool allowShardedForeignCollection(NamespaceString nss) const final { - return (_foreignNssSet.find(nss) == _foreignNssSet.end()); - } - private: const NamespaceString _fromNss; const stdx::unordered_set<NamespaceString> _foreignNssSet; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 148197d4d5b..9c3dbf5777b 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -85,34 +85,31 @@ public: MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } boost::optional<Document> lookupSingleDocument( @@ -125,11 +122,12 @@ public: // case of a change stream on a whole database so we need to make a copy of the // ExpressionContext with the new namespace. auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none); - auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); - if (swPipeline == ErrorCodes::NamespaceNotFound) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline; + try { + pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); + } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { return boost::none; } - auto pipeline = uassertStatusOK(std::move(swPipeline)); auto lookedUpDocument = pipeline->getNext(); if (auto next = pipeline->getNext()) { diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 4f461d6705c..bacfc5c6cda 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -544,28 +544,25 @@ public: return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") || pipeline->popFrontWithName("$project")) { @@ -575,7 +572,7 @@ public: } pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 5f092107c1a..be934e88707 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -184,20 +184,24 @@ public: * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to * add an initial cursor source. * - * This function returns a non-OK status if parsing the pipeline failed. + * This function throws if parsing the pipeline failed. */ - virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + virtual std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) = 0; /** - * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This - * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be - * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be + * Accepts a pipeline and returns a new one which will draw input from the underlying + * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be + * thrown if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be * the only case where NamespaceNotFound is returned. + * + * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr. + * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the + * compiler expects to find an implementation of PipelineDeleter. */ - virtual Status attachCursorSourceToPipeline( + virtual std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; /** diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 85f141c9277..6336910029c 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -28,10 +28,13 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + #include "mongo/platform/basic.h" #include "mongo/db/pipeline/mongos_process_interface.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/curop.h" #include "mongo/db/index/index_descriptor.h" @@ -45,17 +48,109 @@ #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/query/cluster_query_knobs.h" +#include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_exec_stage.h" +#include "mongo/s/transaction_router.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/log.h" namespace mongo { +MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); + using boost::intrusive_ptr; using std::shared_ptr; using std::string; using std::unique_ptr; namespace { +// Given a document representing an aggregation command such as +// +// {aggregate: "myCollection", pipeline: [], ...}, +// +// produces the corresponding explain command: +// +// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} +Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { + MutableDocument explainCommandBuilder; + explainCommandBuilder["explain"] = Value(aggregateCommand); + // Downstream host targeting code expects queryOptions at the top level of the command object. + explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = + Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); + + // readConcern needs to be promoted to the top-level of the request. + explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = + Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); + + // Add explain command options. + for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { + explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); + } + + return explainCommandBuilder.freeze(); +} + +std::vector<RemoteCursor> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj& cmdObj, + const AggregationRequest& request, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery) { + LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; + + const bool mustRunOnAll = MongoSInterface::mustRunOnAllShards(nss, litePipe); + std::set<ShardId> shardIds = MongoSInterface::getTargetedShards( + opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); + std::vector<std::pair<ShardId, BSONObj>> requests; + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo || mustRunOnAll); + + if (mustRunOnAll) { + // The pipeline contains a stage which must be run on all shards. Skip versioning and + // enqueue the raw command objects. + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + } else if (routingInfo->cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation, and build versioned requests for them. + for (auto& shardId : shardIds) { + auto versionedCmdObj = + appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo->db().primaryId(), + !routingInfo->db().primary()->isConfig() + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj); + } + + if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " + "until fail point is disabled."; + while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + sleepsecs(1); + } + } + + return establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */, + MongoSInterface::getDesiredRetryPolicy(request)); +} + /** * Determines the single shard to which the given query will be targeted, and its associated * shardVersion. Throws if the query targets more than one shard. @@ -115,6 +210,379 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace +Shard::RetryPolicy MongoSInterface::getDesiredRetryPolicy(const AggregationRequest& req) { + // The idempotent retry policy will retry even for writeConcern failures, so only set it if the + // pipeline does not support writeConcern. + if (req.getWriteConcern()) { + return Shard::RetryPolicy::kNotIdempotent; + } + return Shard::RetryPolicy::kIdempotent; +} + +BSONObj MongoSInterface::createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + const boost::optional<ShardId>& shardId, + Pipeline* pipeline, + BSONObj collationObj) { + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + if (pipeline) { + targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); + } + + return MongoSInterface::genericTransformForShards( + std::move(targetedCmd), opCtx, shardId, request, collationObj); +} + +BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const boost::optional<ShardId>& shardId, + const AggregationRequest& request, + BSONObj collationObj) { + cmdForShards[AggregationRequest::kFromMongosName] = Value(true); + // If this is a request for an aggregation explain, then we must wrap the aggregate inside an + // explain command. + if (auto explainVerbosity = request.getExplain()) { + cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); + } + + if (!collationObj.isEmpty()) { + cmdForShards[AggregationRequest::kCollationName] = Value(collationObj); + } + + if (opCtx->getTxnNumber()) { + invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(), + str::stream() << "Command for shards unexpectedly had the " + << OperationSessionInfo::kTxnNumberFieldName + << " field set: " + << cmdForShards.peek().toString()); + cmdForShards[OperationSessionInfo::kTxnNumberFieldName] = + Value(static_cast<long long>(*opCtx->getTxnNumber())); + } + + auto aggCmd = cmdForShards.freeze().toBson(); + + if (shardId) { + if (auto txnRouter = TransactionRouter::get(opCtx)) { + aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd); + } + } + + // agg creates temp collection and should handle implicit create separately. + return appendAllowImplicitCreate(aggCmd, true); +} + +BSONObj MongoSInterface::createCommandForTargetedShards( + OperationContext* opCtx, + const AggregationRequest& request, + const cluster_aggregation_planner::SplitPipeline& splitPipeline, + const BSONObj collationObj, + const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + bool needsMerge) { + + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it + // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may + // have detected a logged in user and appended that user name to the $listSessions spec to + // send to the shards. + targetedCmd[AggregationRequest::kPipelineName] = + Value(splitPipeline.shardsPipeline->serialize()); + + // When running on many shards with the exchange we may not need merging. + if (needsMerge) { + targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + + // For split pipelines which need merging, do *not* propagate the writeConcern to the shards + // part. Otherwise this is part of an exchange and in that case we should include the + // writeConcern. + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); + } + + targetedCmd[AggregationRequest::kCursorName] = + Value(DOC(AggregationRequest::kBatchSizeName << 0)); + + targetedCmd[AggregationRequest::kExchangeName] = + exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); + + return genericTransformForShards( + std::move(targetedCmd), opCtx, boost::none, request, collationObj); +} + +std::set<ShardId> MongoSInterface::getTargetedShards( + OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj shardQuery, + const BSONObj collation) { + if (mustRunOnAllShards) { + // The pipeline begins with a stage which must be run on all shards. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + return {shardIds.begin(), shardIds.end()}; + } + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo); + + return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); +} + +bool MongoSInterface::mustRunOnAllShards(const NamespaceString& nss, + const LiteParsedPipeline& litePipe) { + // The following aggregations must be routed to all shards: + // - Any collectionless aggregation, such as non-localOps $currentOp. + // - Any aggregation which begins with a $changeStream stage. + return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); +} + +StatusWith<CachedCollectionRoutingInfo> MongoSInterface::getExecutionNsRoutingInfo( + OperationContext* opCtx, const NamespaceString& execNss) { + // First, verify that there are shards present in the cluster. If not, then we return the + // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because + // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on + // a collection before its enclosing database is created. However, if there are no shards + // present, then $changeStream should immediately return an empty cursor just as other + // aggregations do when the database does not exist. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + if (shardIds.size() == 0) { + return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; + } + + // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not + // exist. + return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); +} + +/** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and + * the pipeline that will need to be executed to merge the results from the remotes. If a stale + * shard version is encountered, refreshes the routing table and tries again. + */ +MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj) { + // The process is as follows: + // - First, determine whether we need to target more than one shard. If so, we split the + // pipeline; if not, we retain the existing pipeline. + // - Call establishShardCursors to dispatch the aggregation to the targeted shards. + // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the + // entire aggregation commmand. + auto cursors = std::vector<RemoteCursor>(); + auto shardResults = std::vector<AsyncRequestsSender::Response>(); + auto opCtx = expCtx->opCtx; + + const bool needsPrimaryShardMerge = + (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); + + const bool needsMongosMerge = pipeline->needsMongosMerger(); + + const auto shardQuery = pipeline->getInitialQuery(); + + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + + // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. + // Otherwise, uassert on all exceptions here. + if (!(liteParsedPipeline.hasChangeStream() && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + uassertStatusOK(executionNsRoutingInfoStatus); + } + + auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() + ? std::move(executionNsRoutingInfoStatus.getValue()) + : boost::optional<CachedCollectionRoutingInfo>{}; + + // Determine whether we can run the entire aggregation on a single shard. + const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline); + std::set<ShardId> shardIds = getTargetedShards( + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); + + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->computeAndSetAtClusterTime( + opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); + } + + // Don't need to split the pipeline if we are only targeting a single shard, unless: + // - There is a stage that needs to be run on the primary shard and the single target shard + // is not the primary. + // - The pipeline contains one or more stages which must always merge on mongoS. + const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || + (needsPrimaryShardMerge && executionNsRoutingInfo && + *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); + + boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; + + if (needsSplit) { + splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); + + exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + opCtx, splitPipeline->mergePipeline.get()); + } + + // Generate the command object for the targeted shards. + BSONObj targetedCommand = splitPipeline + ? createCommandForTargetedShards( + opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) + : createPassthroughCommandForShard( + opCtx, aggRequest, boost::none, pipeline.get(), collationObj); + + // Refresh the shard registry if we're targeting all shards. We need the shard registry + // to be at least as current as the logical time used when creating the command for + // $changeStream to work reliably, so we do a "hard" reload. + if (mustRunOnAll) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + shardRegistry->reload(opCtx); + } + } + + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAll) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and + // should not participate in the shard version protocol. + shardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db(), + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target + // shards, and should participate in the shard version protocol. + invariant(executionNsRoutingInfo); + shardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db(), + executionNss, + *executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation()); + } + } else { + cursors = establishShardCursors(opCtx, + executionNss, + liteParsedPipeline, + executionNsRoutingInfo, + targetedCommand, + aggRequest, + ReadPreferenceSetting::get(opCtx), + shardQuery); + invariant(cursors.size() % shardIds.size() == 0, + str::stream() << "Number of cursors (" << cursors.size() + << ") is not a multiple of producers (" + << shardIds.size() + << ")"); + } + + // Convert remote cursors into a vector of "owned" cursors. + std::vector<OwnedRemoteCursor> ownedCursors; + for (auto&& cursor : cursors) { + ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); + } + + // Record the number of shards involved in the aggregation. If we are required to merge on + // the primary shard, but the primary shard was not in the set of targeted shards, then we + // must increment the number of involved shards. + CurOp::get(opCtx)->debug().nShards = + shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && + !shardIds.count(executionNsRoutingInfo->db().primaryId())); + + return DispatchShardPipelineResults{needsPrimaryShardMerge, + std::move(ownedCursors), + std::move(shardResults), + std::move(splitPipeline), + std::move(pipeline), + targetedCommand, + shardIds.size(), + exchangeSpec}; +} + +std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions) { + // Explain is not supported for auxiliary lookups. + invariant(!expCtx->explain); + + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); + if (pipelineOptions.optimize) { + pipeline->optimizePipeline(); + } + if (pipelineOptions.attachCursorSource) { + // 'attachCursorSourceToPipeline' handles any complexity related to sharding. + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); + } + + return pipeline; +} + + +std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + + // Generate the command object for the targeted shards. + std::vector<BSONObj> rawStages = [pipeline]() { + auto serialization = pipeline->serialize(); + std::vector<BSONObj> stages; + stages.reserve(serialization.size()); + + for (const auto& stageObj : serialization) { + invariant(stageObj.getType() == BSONType::Object); + stages.push_back(stageObj.getDocument().toBson()); + } + + return stages; + }(); + + AggregationRequest aggRequest(expCtx->ns, rawStages); + LiteParsedPipeline liteParsedPipeline(aggRequest); + auto shardDispatchResults = MongoSInterface::dispatchShardPipeline( + expCtx, + expCtx->ns, + aggRequest, + liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)), + expCtx->collation); + + std::vector<ShardId> targetedShards; + targetedShards.reserve(shardDispatchResults.remoteCursors.size()); + for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { + targetedShards.emplace_back(remoteCursor->getShardId().toString()); + } + + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; + boost::optional<BSONObj> shardCursorsSortSpec = boost::none; + if (shardDispatchResults.splitPipeline) { + mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); + shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; + } else { + mergePipeline = std::move(shardDispatchResults.pipelineForSingleShard); + } + + cluster_aggregation_planner::addMergeCursorsSource( + mergePipeline.get(), + liteParsedPipeline, + shardDispatchResults.commandForTargetedShards, + std::move(shardDispatchResults.remoteCursors), + targetedShards, + shardCursorsSortSpec, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor()); + + return mergePipeline; +} + boost::optional<Document> MongoSInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index f2806baf001..6111f3346d8 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -32,6 +32,10 @@ #include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/async_requests_sender.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/query/cluster_aggregation_planner.h" +#include "mongo/s/query/owned_remote_cursor.h" namespace mongo { @@ -41,6 +45,81 @@ namespace mongo { */ class MongoSInterface final : public MongoProcessCommon { public: + struct DispatchShardPipelineResults { + // True if this pipeline was split, and the second half of the pipeline needs to be run on + // the primary shard for the database. + bool needsPrimaryShardMerge; + + // Populated if this *is not* an explain, this vector represents the cursors on the remote + // shards. + std::vector<OwnedRemoteCursor> remoteCursors; + + // Populated if this *is* an explain, this vector represents the results from each shard. + std::vector<AsyncRequestsSender::Response> remoteExplainOutput; + + // The split version of the pipeline if more than one shard was targeted, otherwise + // boost::none. + boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; + + // If the pipeline targeted a single shard, this is the pipeline to run on that shard. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; + + // The command object to send to the targeted shards. + BSONObj commandForTargetedShards; + + // How many exchange producers are running the shard part of splitPipeline. + size_t numProducers; + + // The exchange specification if the query can run with the exchange otherwise boost::none. + boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + }; + + static Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req); + + static BSONObj createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + const boost::optional<ShardId>& shardId, + Pipeline* pipeline, + BSONObj collationObj); + + /** + * Appends information to the command sent to the shards which should be appended both if this + * is a passthrough sent to a single shard and if this is a split pipeline. + */ + static BSONObj genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const boost::optional<ShardId>& shardId, + const AggregationRequest& request, + BSONObj collationObj); + + static BSONObj createCommandForTargetedShards( + OperationContext* opCtx, + const AggregationRequest& request, + const cluster_aggregation_planner::SplitPipeline& splitPipeline, + const BSONObj collationObj, + const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + bool needsMerge); + + static std::set<ShardId> getTargetedShards( + OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj shardQuery, + const BSONObj collation); + + static bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe); + + static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo( + OperationContext* opCtx, const NamespaceString& execNss); + + static DispatchShardPipelineResults dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj); + MongoSInterface() = default; virtual ~MongoSInterface() = default; @@ -119,10 +198,8 @@ public: MONGO_UNREACHABLE; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { - MONGO_UNREACHABLE; - } + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final { MONGO_UNREACHABLE; @@ -133,12 +210,10 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) final { - MONGO_UNREACHABLE; - } + const MakePipelineOptions pipelineOptions) final; /** * The following methods only make sense for data-bearing nodes and should never be called on diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index bf0987ea4ab..1eb3af3aa97 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -274,45 +274,35 @@ void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( _client.runCommand("admin", renameCommandObj, info)); } -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoInterfaceStandalone::makePipeline( +std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } - Status cursorStatus = Status::OK(); - if (opts.attachCursorSource) { - cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } - return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; + return pipeline; } -Status MongoInterfaceStandalone::attachCursorSourceToPipeline( +unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { invariant(pipeline->getSources().empty() || !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); boost::optional<AutoGetCollectionForReadCommand> autoColl; if (expCtx->uuid) { - try { - autoColl.emplace(expCtx->opCtx, - NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}, - AutoGetCollection::ViewMode::kViewsForbidden, - Date_t::max(), - AutoStatsTracker::LogMode::kUpdateTop); - } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { - // The UUID doesn't exist anymore - return ex.toStatus(); - } + autoColl.emplace(expCtx->opCtx, + NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}, + AutoGetCollection::ViewMode::kViewsForbidden, + Date_t::max(), + AutoStatsTracker::LogMode::kUpdateTop); } else { autoColl.emplace(expCtx->opCtx, expCtx->ns, @@ -337,7 +327,7 @@ Status MongoInterfaceStandalone::attachCursorSourceToPipeline( // the initial cursor stage. pipeline->optimizePipeline(); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const { @@ -381,7 +371,7 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument( nss, collectionUUID, _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); - pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx)); + pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return boost::none; } diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 12627655cd5..f1c7fbcc910 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -87,12 +87,12 @@ public: const NamespaceString& targetNs, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) final; - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final; - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final; + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( OperationContext* opCtx, const NamespaceString&, UUID) const override; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 08b9e073f35..b9d7befb857 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -116,15 +116,15 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) override { MONGO_UNREACHABLE; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) override { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override { MONGO_UNREACHABLE; } |