summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJames Wahlin <james@mongodb.com>2018-11-27 13:28:27 -0500
committerJames Wahlin <james@mongodb.com>2018-12-12 14:41:24 -0500
commit056d61676f91f6da0a030347ae4b92255d752d8f (patch)
tree92f5b2d319ce1cd5701be912e6b96cf9a6fdaa4b /src/mongo/db
parentd2573d47786b035d5bcdeaf30207bbfcd58bf14e (diff)
downloadmongo-056d61676f91f6da0a030347ae4b92255d752d8f.tar.gz
SERVER-32308 Support for $lookup to execute on mongos against a sharded foreign collection
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/SConscript3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h7
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp17
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h16
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp468
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h91
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp36
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h6
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h6
15 files changed, 633 insertions, 106 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 2f1c2e27cca..71270f0e72f 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -320,7 +320,10 @@ env.Library(
'mongos_process_interface.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/s/query/async_results_merger',
+ '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner',
+ '$BUILD_DIR/mongo/s/query/cluster_query',
'mongo_process_common',
]
)
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 17622131061..737c316c9bf 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -246,8 +246,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
// with the resume token.
auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
auto matchSpec = BSON("$match" << BSONObj());
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
+ auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx);
if (auto first = pipeline->getNext()) {
auto firstOplogEntry = Value(*first);
uassert(40576,
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index a7b0567b66b..a94179c1431 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -458,34 +458,31 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index d93535dc030..1b6583bb655 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -207,8 +207,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
// We've already allocated space for the trailing $match stage in '_fromPipeline'.
_fromPipeline.back() = *matchStage;
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx));
+ auto pipeline =
+ pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx);
while (auto next = pipeline->getNext()) {
uassert(40271,
str::stream()
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 6309f89597a..47f37939f73 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -63,30 +63,27 @@ public:
MockMongoInterface(std::deque<DocumentSource::GetNextResult> results)
: _results(std::move(results)) {}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_results));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 04094a1d45b..7223f854521 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -196,9 +196,16 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
txnRequirement = resolvedRequirements.second;
}
+ // If executing on mongos and the foreign collection is sharded, then this stage can run on
+ // mongos.
+ HostTypeRequirement hostRequirement =
+ (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs))
+ ? HostTypeRequirement::kMongoS
+ : HostTypeRequirement::kPrimaryShard;
+
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
- HostTypeRequirement::kPrimaryShard,
+ hostRequirement,
diskRequirement,
FacetRequirement::kAllowed,
txnRequirement);
@@ -289,8 +296,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
// If we don't have a cache, build and return the pipeline immediately.
if (!_cache || _cache->isAbandoned()) {
- return uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx));
+ return pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx);
}
// Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
@@ -300,8 +306,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
pipelineOpts.attachCursorSource = false;
// Construct the basic pipeline without a cache stage.
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
+ auto pipeline =
+ pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
// Add the cache stage at the end and optimize. During the optimization process, the cache will
// either move itself to the correct position in the pipeline, or will abandon itself if no
@@ -313,8 +319,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
- uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- _fromExpCtx, pipeline.get()));
+ pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx,
+ pipeline.release());
}
// If the cache has been abandoned, release it.
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index d17b1a73c9c..2a2b3338789 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -81,13 +81,6 @@ public:
return requiredPrivileges;
}
- /**
- * Lookup from a sharded collection is not allowed.
- */
- bool allowShardedForeignCollection(NamespaceString nss) const final {
- return (_foreignNssSet.find(nss) == _foreignNssSet.end());
- }
-
private:
const NamespaceString _fromNss;
const stdx::unordered_set<NamespaceString> _foreignNssSet;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 148197d4d5b..9c3dbf5777b 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -85,34 +85,31 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
boost::optional<Document> lookupSingleDocument(
@@ -125,11 +122,12 @@ public:
// case of a change stream on a whole database so we need to make a copy of the
// ExpressionContext with the new namespace.
auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none);
- auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
- if (swPipeline == ErrorCodes::NamespaceNotFound) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
+ try {
+ pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
+ } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
return boost::none;
}
- auto pipeline = uassertStatusOK(std::move(swPipeline));
auto lookedUpDocument = pipeline->getNext();
if (auto next = pipeline->getNext()) {
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 4f461d6705c..bacfc5c6cda 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -544,28 +544,25 @@ public:
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") ||
pipeline->popFrontWithName("$project")) {
@@ -575,7 +572,7 @@ public:
}
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 5f092107c1a..be934e88707 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -184,20 +184,24 @@ public:
* - If opts.attachCursorSource is false, the pipeline will be returned without attempting to
* add an initial cursor source.
*
- * This function returns a non-OK status if parsing the pipeline failed.
+ * This function throws if parsing the pipeline failed.
*/
- virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ virtual std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
/**
- * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This
- * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be
- * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
+ * Accepts a pipeline and returns a new one which will draw input from the underlying
+ * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be
+ * thrown if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
* the only case where NamespaceNotFound is returned.
+ *
+ * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr.
+ * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the
+ * compiler expects to find an implementation of PipelineDeleter.
*/
- virtual Status attachCursorSourceToPipeline(
+ virtual std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
/**
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 85f141c9277..6336910029c 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -28,10 +28,13 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/mongos_process_interface.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/curop.h"
#include "mongo/db/index/index_descriptor.h"
@@ -45,17 +48,109 @@
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/query/cluster_query_knobs.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_exec_stage.h"
+#include "mongo/s/transaction_router.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/log.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
+
using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
namespace {
+// Given a document representing an aggregation command such as
+//
+// {aggregate: "myCollection", pipeline: [], ...},
+//
+// produces the corresponding explain command:
+//
+// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
+Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
+ MutableDocument explainCommandBuilder;
+ explainCommandBuilder["explain"] = Value(aggregateCommand);
+ // Downstream host targeting code expects queryOptions at the top level of the command object.
+ explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
+ Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
+
+ // readConcern needs to be promoted to the top-level of the request.
+ explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
+ Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
+
+ // Add explain command options.
+ for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
+ explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
+ }
+
+ return explainCommandBuilder.freeze();
+}
+
+std::vector<RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj& cmdObj,
+ const AggregationRequest& request,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery) {
+ LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
+
+ const bool mustRunOnAll = MongoSInterface::mustRunOnAllShards(nss, litePipe);
+ std::set<ShardId> shardIds = MongoSInterface::getTargetedShards(
+ opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo || mustRunOnAll);
+
+ if (mustRunOnAll) {
+ // The pipeline contains a stage which must be run on all shards. Skip versioning and
+ // enqueue the raw command objects.
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+ } else if (routingInfo->cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation, and build versioned requests for them.
+ for (auto& shardId : shardIds) {
+ auto versionedCmdObj =
+ appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
+ requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo->db().primaryId(),
+ !routingInfo->db().primary()->isConfig()
+ ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
+ : cmdObj);
+ }
+
+ if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
+ "until fail point is disabled.";
+ while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ sleepsecs(1);
+ }
+ }
+
+ return establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */,
+ MongoSInterface::getDesiredRetryPolicy(request));
+}
+
/**
* Determines the single shard to which the given query will be targeted, and its associated
* shardVersion. Throws if the query targets more than one shard.
@@ -115,6 +210,379 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
+Shard::RetryPolicy MongoSInterface::getDesiredRetryPolicy(const AggregationRequest& req) {
+ // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
+ // pipeline does not support writeConcern.
+ if (req.getWriteConcern()) {
+ return Shard::RetryPolicy::kNotIdempotent;
+ }
+ return Shard::RetryPolicy::kIdempotent;
+}
+
+BSONObj MongoSInterface::createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
+ Pipeline* pipeline,
+ BSONObj collationObj) {
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ if (pipeline) {
+ targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+ }
+
+ return MongoSInterface::genericTransformForShards(
+ std::move(targetedCmd), opCtx, shardId, request, collationObj);
+}
+
+BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
+ const AggregationRequest& request,
+ BSONObj collationObj) {
+ cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
+ // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
+ // explain command.
+ if (auto explainVerbosity = request.getExplain()) {
+ cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
+ }
+
+ if (!collationObj.isEmpty()) {
+ cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
+ }
+
+ if (opCtx->getTxnNumber()) {
+ invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(),
+ str::stream() << "Command for shards unexpectedly had the "
+ << OperationSessionInfo::kTxnNumberFieldName
+ << " field set: "
+ << cmdForShards.peek().toString());
+ cmdForShards[OperationSessionInfo::kTxnNumberFieldName] =
+ Value(static_cast<long long>(*opCtx->getTxnNumber()));
+ }
+
+ auto aggCmd = cmdForShards.freeze().toBson();
+
+ if (shardId) {
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd);
+ }
+ }
+
+ // agg creates temp collection and should handle implicit create separately.
+ return appendAllowImplicitCreate(aggCmd, true);
+}
+
+BSONObj MongoSInterface::createCommandForTargetedShards(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const cluster_aggregation_planner::SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ bool needsMerge) {
+
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
+ // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
+ // have detected a logged in user and appended that user name to the $listSessions spec to
+ // send to the shards.
+ targetedCmd[AggregationRequest::kPipelineName] =
+ Value(splitPipeline.shardsPipeline->serialize());
+
+ // When running on many shards with the exchange we may not need merging.
+ if (needsMerge) {
+ targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+
+ // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
+ // part. Otherwise this is part of an exchange and in that case we should include the
+ // writeConcern.
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
+ }
+
+ targetedCmd[AggregationRequest::kCursorName] =
+ Value(DOC(AggregationRequest::kBatchSizeName << 0));
+
+ targetedCmd[AggregationRequest::kExchangeName] =
+ exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
+
+ return genericTransformForShards(
+ std::move(targetedCmd), opCtx, boost::none, request, collationObj);
+}
+
+std::set<ShardId> MongoSInterface::getTargetedShards(
+ OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation) {
+ if (mustRunOnAllShards) {
+ // The pipeline begins with a stage which must be run on all shards.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ return {shardIds.begin(), shardIds.end()};
+ }
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo);
+
+ return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
+}
+
+bool MongoSInterface::mustRunOnAllShards(const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe) {
+ // The following aggregations must be routed to all shards:
+ // - Any collectionless aggregation, such as non-localOps $currentOp.
+ // - Any aggregation which begins with a $changeStream stage.
+ return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
+}
+
+StatusWith<CachedCollectionRoutingInfo> MongoSInterface::getExecutionNsRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& execNss) {
+ // First, verify that there are shards present in the cluster. If not, then we return the
+ // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because
+ // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
+ // a collection before its enclosing database is created. However, if there are no shards
+ // present, then $changeStream should immediately return an empty cursor just as other
+ // aggregations do when the database does not exist.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ if (shardIds.size() == 0) {
+ return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
+ }
+
+ // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not
+ // exist.
+ return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
+}
+
+/**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
+ * the pipeline that will need to be executed to merge the results from the remotes. If a stale
+ * shard version is encountered, refreshes the routing table and tries again.
+ */
+MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj) {
+ // The process is as follows:
+ // - First, determine whether we need to target more than one shard. If so, we split the
+ // pipeline; if not, we retain the existing pipeline.
+ // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
+ // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
+ // entire aggregation commmand.
+ auto cursors = std::vector<RemoteCursor>();
+ auto shardResults = std::vector<AsyncRequestsSender::Response>();
+ auto opCtx = expCtx->opCtx;
+
+ const bool needsPrimaryShardMerge =
+ (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
+
+ const bool needsMongosMerge = pipeline->needsMongosMerger();
+
+ const auto shardQuery = pipeline->getInitialQuery();
+
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+
+ // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
+ // Otherwise, uassert on all exceptions here.
+ if (!(liteParsedPipeline.hasChangeStream() &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ uassertStatusOK(executionNsRoutingInfoStatus);
+ }
+
+ auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
+ ? std::move(executionNsRoutingInfoStatus.getValue())
+ : boost::optional<CachedCollectionRoutingInfo>{};
+
+ // Determine whether we can run the entire aggregation on a single shard.
+ const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline);
+ std::set<ShardId> shardIds = getTargetedShards(
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
+
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->computeAndSetAtClusterTime(
+ opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
+ }
+
+ // Don't need to split the pipeline if we are only targeting a single shard, unless:
+ // - There is a stage that needs to be run on the primary shard and the single target shard
+ // is not the primary.
+ // - The pipeline contains one or more stages which must always merge on mongoS.
+ const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
+ (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
+
+ boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
+
+ if (needsSplit) {
+ splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
+
+ exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ opCtx, splitPipeline->mergePipeline.get());
+ }
+
+ // Generate the command object for the targeted shards.
+ BSONObj targetedCommand = splitPipeline
+ ? createCommandForTargetedShards(
+ opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
+ : createPassthroughCommandForShard(
+ opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
+
+ // Refresh the shard registry if we're targeting all shards. We need the shard registry
+ // to be at least as current as the logical time used when creating the command for
+ // $changeStream to work reliably, so we do a "hard" reload.
+ if (mustRunOnAll) {
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ shardRegistry->reload(opCtx);
+ }
+ }
+
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAll) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and
+ // should not participate in the shard version protocol.
+ shardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db(),
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target
+ // shards, and should participate in the shard version protocol.
+ invariant(executionNsRoutingInfo);
+ shardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db(),
+ executionNss,
+ *executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation());
+ }
+ } else {
+ cursors = establishShardCursors(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ executionNsRoutingInfo,
+ targetedCommand,
+ aggRequest,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery);
+ invariant(cursors.size() % shardIds.size() == 0,
+ str::stream() << "Number of cursors (" << cursors.size()
+ << ") is not a multiple of producers ("
+ << shardIds.size()
+ << ")");
+ }
+
+ // Convert remote cursors into a vector of "owned" cursors.
+ std::vector<OwnedRemoteCursor> ownedCursors;
+ for (auto&& cursor : cursors) {
+ ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
+ }
+
+ // Record the number of shards involved in the aggregation. If we are required to merge on
+ // the primary shard, but the primary shard was not in the set of targeted shards, then we
+ // must increment the number of involved shards.
+ CurOp::get(opCtx)->debug().nShards =
+ shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ !shardIds.count(executionNsRoutingInfo->db().primaryId()));
+
+ return DispatchShardPipelineResults{needsPrimaryShardMerge,
+ std::move(ownedCursors),
+ std::move(shardResults),
+ std::move(splitPipeline),
+ std::move(pipeline),
+ targetedCommand,
+ shardIds.size(),
+ exchangeSpec};
+}
+
+std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions pipelineOptions) {
+ // Explain is not supported for auxiliary lookups.
+ invariant(!expCtx->explain);
+
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
+ if (pipelineOptions.optimize) {
+ pipeline->optimizePipeline();
+ }
+ if (pipelineOptions.attachCursorSource) {
+ // 'attachCursorSourceToPipeline' handles any complexity related to sharding.
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
+ }
+
+ return pipeline;
+}
+
+
+std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ // Generate the command object for the targeted shards.
+ std::vector<BSONObj> rawStages = [pipeline]() {
+ auto serialization = pipeline->serialize();
+ std::vector<BSONObj> stages;
+ stages.reserve(serialization.size());
+
+ for (const auto& stageObj : serialization) {
+ invariant(stageObj.getType() == BSONType::Object);
+ stages.push_back(stageObj.getDocument().toBson());
+ }
+
+ return stages;
+ }();
+
+ AggregationRequest aggRequest(expCtx->ns, rawStages);
+ LiteParsedPipeline liteParsedPipeline(aggRequest);
+ auto shardDispatchResults = MongoSInterface::dispatchShardPipeline(
+ expCtx,
+ expCtx->ns,
+ aggRequest,
+ liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)),
+ expCtx->collation);
+
+ std::vector<ShardId> targetedShards;
+ targetedShards.reserve(shardDispatchResults.remoteCursors.size());
+ for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
+ targetedShards.emplace_back(remoteCursor->getShardId().toString());
+ }
+
+ std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
+ boost::optional<BSONObj> shardCursorsSortSpec = boost::none;
+ if (shardDispatchResults.splitPipeline) {
+ mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
+ shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
+ } else {
+ mergePipeline = std::move(shardDispatchResults.pipelineForSingleShard);
+ }
+
+ cluster_aggregation_planner::addMergeCursorsSource(
+ mergePipeline.get(),
+ liteParsedPipeline,
+ shardDispatchResults.commandForTargetedShards,
+ std::move(shardDispatchResults.remoteCursors),
+ targetedShards,
+ shardCursorsSortSpec,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor());
+
+ return mergePipeline;
+}
+
boost::optional<Document> MongoSInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index f2806baf001..6111f3346d8 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -32,6 +32,10 @@
#include "mongo/db/pipeline/mongo_process_common.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/query/cluster_aggregation_planner.h"
+#include "mongo/s/query/owned_remote_cursor.h"
namespace mongo {
@@ -41,6 +45,81 @@ namespace mongo {
*/
class MongoSInterface final : public MongoProcessCommon {
public:
+ struct DispatchShardPipelineResults {
+ // True if this pipeline was split, and the second half of the pipeline needs to be run on
+ // the primary shard for the database.
+ bool needsPrimaryShardMerge;
+
+ // Populated if this *is not* an explain, this vector represents the cursors on the remote
+ // shards.
+ std::vector<OwnedRemoteCursor> remoteCursors;
+
+ // Populated if this *is* an explain, this vector represents the results from each shard.
+ std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
+
+ // The split version of the pipeline if more than one shard was targeted, otherwise
+ // boost::none.
+ boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
+
+ // If the pipeline targeted a single shard, this is the pipeline to run on that shard.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard;
+
+ // The command object to send to the targeted shards.
+ BSONObj commandForTargetedShards;
+
+ // How many exchange producers are running the shard part of splitPipeline.
+ size_t numProducers;
+
+ // The exchange specification if the query can run with the exchange otherwise boost::none.
+ boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ };
+
+ static Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req);
+
+ static BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
+ Pipeline* pipeline,
+ BSONObj collationObj);
+
+ /**
+ * Appends information to the command sent to the shards which should be appended both if this
+ * is a passthrough sent to a single shard and if this is a split pipeline.
+ */
+ static BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
+ const AggregationRequest& request,
+ BSONObj collationObj);
+
+ static BSONObj createCommandForTargetedShards(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const cluster_aggregation_planner::SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ bool needsMerge);
+
+ static std::set<ShardId> getTargetedShards(
+ OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation);
+
+ static bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe);
+
+ static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& execNss);
+
+ static DispatchShardPipelineResults dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj);
+
MongoSInterface() = default;
virtual ~MongoSInterface() = default;
@@ -119,10 +198,8 @@ public:
MONGO_UNREACHABLE;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
- MONGO_UNREACHABLE;
- }
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final {
MONGO_UNREACHABLE;
@@ -133,12 +210,10 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) final {
- MONGO_UNREACHABLE;
- }
+ const MakePipelineOptions pipelineOptions) final;
/**
* The following methods only make sense for data-bearing nodes and should never be called on
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index bf0987ea4ab..1eb3af3aa97 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -274,45 +274,35 @@ void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged(
_client.runCommand("admin", renameCommandObj, info));
}
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoInterfaceStandalone::makePipeline(
+std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
- Status cursorStatus = Status::OK();
-
if (opts.attachCursorSource) {
- cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
- return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
+ return pipeline;
}
-Status MongoInterfaceStandalone::attachCursorSourceToPipeline(
+unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
invariant(pipeline->getSources().empty() ||
!dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
boost::optional<AutoGetCollectionForReadCommand> autoColl;
if (expCtx->uuid) {
- try {
- autoColl.emplace(expCtx->opCtx,
- NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid},
- AutoGetCollection::ViewMode::kViewsForbidden,
- Date_t::max(),
- AutoStatsTracker::LogMode::kUpdateTop);
- } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
- // The UUID doesn't exist anymore
- return ex.toStatus();
- }
+ autoColl.emplace(expCtx->opCtx,
+ NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid},
+ AutoGetCollection::ViewMode::kViewsForbidden,
+ Date_t::max(),
+ AutoStatsTracker::LogMode::kUpdateTop);
} else {
autoColl.emplace(expCtx->opCtx,
expCtx->ns,
@@ -337,7 +327,7 @@ Status MongoInterfaceStandalone::attachCursorSourceToPipeline(
// the initial cursor stage.
pipeline->optimizePipeline();
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const {
@@ -381,7 +371,7 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument(
nss,
collectionUUID,
_getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
- pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx));
+ pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return boost::none;
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 12627655cd5..f1c7fbcc910 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -87,12 +87,12 @@ public:
const NamespaceString& targetNs,
const BSONObj& originalCollectionOptions,
const std::list<BSONObj>& originalIndexes) final;
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final;
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final;
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final;
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
OperationContext* opCtx, const NamespaceString&, UUID) const override;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 08b9e073f35..b9d7befb857 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -116,15 +116,15 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) override {
MONGO_UNREACHABLE;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) override {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
MONGO_UNREACHABLE;
}