summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/SConscript3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h7
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp17
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h16
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp468
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h91
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp36
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h6
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h6
-rw-r--r--src/mongo/s/query/SConscript12
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp525
17 files changed, 702 insertions, 574 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 2f1c2e27cca..71270f0e72f 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -320,7 +320,10 @@ env.Library(
'mongos_process_interface.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/s/query/async_results_merger',
+ '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner',
+ '$BUILD_DIR/mongo/s/query/cluster_query',
'mongo_process_common',
]
)
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 17622131061..737c316c9bf 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -246,8 +246,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
// with the resume token.
auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
auto matchSpec = BSON("$match" << BSONObj());
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
+ auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx);
if (auto first = pipeline->getNext()) {
auto firstOplogEntry = Value(*first);
uassert(40576,
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index a7b0567b66b..a94179c1431 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -458,34 +458,31 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index d93535dc030..1b6583bb655 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -207,8 +207,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
// We've already allocated space for the trailing $match stage in '_fromPipeline'.
_fromPipeline.back() = *matchStage;
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx));
+ auto pipeline =
+ pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx);
while (auto next = pipeline->getNext()) {
uassert(40271,
str::stream()
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 6309f89597a..47f37939f73 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -63,30 +63,27 @@ public:
MockMongoInterface(std::deque<DocumentSource::GetNextResult> results)
: _results(std::move(results)) {}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_results));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 04094a1d45b..7223f854521 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -196,9 +196,16 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
txnRequirement = resolvedRequirements.second;
}
+ // If executing on mongos and the foreign collection is sharded, then this stage can run on
+ // mongos.
+ HostTypeRequirement hostRequirement =
+ (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs))
+ ? HostTypeRequirement::kMongoS
+ : HostTypeRequirement::kPrimaryShard;
+
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
- HostTypeRequirement::kPrimaryShard,
+ hostRequirement,
diskRequirement,
FacetRequirement::kAllowed,
txnRequirement);
@@ -289,8 +296,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
// If we don't have a cache, build and return the pipeline immediately.
if (!_cache || _cache->isAbandoned()) {
- return uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx));
+ return pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx);
}
// Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
@@ -300,8 +306,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
pipelineOpts.attachCursorSource = false;
// Construct the basic pipeline without a cache stage.
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
+ auto pipeline =
+ pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
// Add the cache stage at the end and optimize. During the optimization process, the cache will
// either move itself to the correct position in the pipeline, or will abandon itself if no
@@ -313,8 +319,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
- uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- _fromExpCtx, pipeline.get()));
+ pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx,
+ pipeline.release());
}
// If the cache has been abandoned, release it.
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index d17b1a73c9c..2a2b3338789 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -81,13 +81,6 @@ public:
return requiredPrivileges;
}
- /**
- * Lookup from a sharded collection is not allowed.
- */
- bool allowShardedForeignCollection(NamespaceString nss) const final {
- return (_foreignNssSet.find(nss) == _foreignNssSet.end());
- }
-
private:
const NamespaceString _fromNss;
const stdx::unordered_set<NamespaceString> _foreignNssSet;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 148197d4d5b..9c3dbf5777b 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -85,34 +85,31 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
boost::optional<Document> lookupSingleDocument(
@@ -125,11 +122,12 @@ public:
// case of a change stream on a whole database so we need to make a copy of the
// ExpressionContext with the new namespace.
auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none);
- auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
- if (swPipeline == ErrorCodes::NamespaceNotFound) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
+ try {
+ pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
+ } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
return boost::none;
}
- auto pipeline = uassertStatusOK(std::move(swPipeline));
auto lookedUpDocument = pipeline->getNext();
if (auto next = pipeline->getNext()) {
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 4f461d6705c..bacfc5c6cda 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -544,28 +544,25 @@ public:
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") ||
pipeline->popFrontWithName("$project")) {
@@ -575,7 +572,7 @@ public:
}
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 5f092107c1a..be934e88707 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -184,20 +184,24 @@ public:
* - If opts.attachCursorSource is false, the pipeline will be returned without attempting to
* add an initial cursor source.
*
- * This function returns a non-OK status if parsing the pipeline failed.
+ * This function throws if parsing the pipeline failed.
*/
- virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ virtual std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
/**
- * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This
- * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be
- * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
+ * Accepts a pipeline and returns a new one which will draw input from the underlying
+ * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be
+ * thrown if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
* the only case where NamespaceNotFound is returned.
+ *
+ * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr.
+ * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the
+ * compiler expects to find an implementation of PipelineDeleter.
*/
- virtual Status attachCursorSourceToPipeline(
+ virtual std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
/**
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 85f141c9277..6336910029c 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -28,10 +28,13 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/mongos_process_interface.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/curop.h"
#include "mongo/db/index/index_descriptor.h"
@@ -45,17 +48,109 @@
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/query/cluster_query_knobs.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_exec_stage.h"
+#include "mongo/s/transaction_router.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/log.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
+
using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
namespace {
+// Given a document representing an aggregation command such as
+//
+// {aggregate: "myCollection", pipeline: [], ...},
+//
+// produces the corresponding explain command:
+//
+// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
+Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
+ MutableDocument explainCommandBuilder;
+ explainCommandBuilder["explain"] = Value(aggregateCommand);
+ // Downstream host targeting code expects queryOptions at the top level of the command object.
+ explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
+ Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
+
+ // readConcern needs to be promoted to the top-level of the request.
+ explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
+ Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
+
+ // Add explain command options.
+ for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
+ explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
+ }
+
+ return explainCommandBuilder.freeze();
+}
+
+std::vector<RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj& cmdObj,
+ const AggregationRequest& request,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery) {
+ LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
+
+ const bool mustRunOnAll = MongoSInterface::mustRunOnAllShards(nss, litePipe);
+ std::set<ShardId> shardIds = MongoSInterface::getTargetedShards(
+ opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo || mustRunOnAll);
+
+ if (mustRunOnAll) {
+ // The pipeline contains a stage which must be run on all shards. Skip versioning and
+ // enqueue the raw command objects.
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+ } else if (routingInfo->cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation, and build versioned requests for them.
+ for (auto& shardId : shardIds) {
+ auto versionedCmdObj =
+ appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
+ requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo->db().primaryId(),
+ !routingInfo->db().primary()->isConfig()
+ ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
+ : cmdObj);
+ }
+
+ if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
+ "until fail point is disabled.";
+ while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ sleepsecs(1);
+ }
+ }
+
+ return establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */,
+ MongoSInterface::getDesiredRetryPolicy(request));
+}
+
/**
* Determines the single shard to which the given query will be targeted, and its associated
* shardVersion. Throws if the query targets more than one shard.
@@ -115,6 +210,379 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
+Shard::RetryPolicy MongoSInterface::getDesiredRetryPolicy(const AggregationRequest& req) {
+ // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
+ // pipeline does not support writeConcern.
+ if (req.getWriteConcern()) {
+ return Shard::RetryPolicy::kNotIdempotent;
+ }
+ return Shard::RetryPolicy::kIdempotent;
+}
+
+BSONObj MongoSInterface::createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
+ Pipeline* pipeline,
+ BSONObj collationObj) {
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ if (pipeline) {
+ targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+ }
+
+ return MongoSInterface::genericTransformForShards(
+ std::move(targetedCmd), opCtx, shardId, request, collationObj);
+}
+
+BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
+ const AggregationRequest& request,
+ BSONObj collationObj) {
+ cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
+ // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
+ // explain command.
+ if (auto explainVerbosity = request.getExplain()) {
+ cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
+ }
+
+ if (!collationObj.isEmpty()) {
+ cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
+ }
+
+ if (opCtx->getTxnNumber()) {
+ invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(),
+ str::stream() << "Command for shards unexpectedly had the "
+ << OperationSessionInfo::kTxnNumberFieldName
+ << " field set: "
+ << cmdForShards.peek().toString());
+ cmdForShards[OperationSessionInfo::kTxnNumberFieldName] =
+ Value(static_cast<long long>(*opCtx->getTxnNumber()));
+ }
+
+ auto aggCmd = cmdForShards.freeze().toBson();
+
+ if (shardId) {
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd);
+ }
+ }
+
+ // agg creates temp collection and should handle implicit create separately.
+ return appendAllowImplicitCreate(aggCmd, true);
+}
+
+BSONObj MongoSInterface::createCommandForTargetedShards(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const cluster_aggregation_planner::SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ bool needsMerge) {
+
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
+ // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
+ // have detected a logged in user and appended that user name to the $listSessions spec to
+ // send to the shards.
+ targetedCmd[AggregationRequest::kPipelineName] =
+ Value(splitPipeline.shardsPipeline->serialize());
+
+ // When running on many shards with the exchange we may not need merging.
+ if (needsMerge) {
+ targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+
+ // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
+ // part. Otherwise this is part of an exchange and in that case we should include the
+ // writeConcern.
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
+ }
+
+ targetedCmd[AggregationRequest::kCursorName] =
+ Value(DOC(AggregationRequest::kBatchSizeName << 0));
+
+ targetedCmd[AggregationRequest::kExchangeName] =
+ exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
+
+ return genericTransformForShards(
+ std::move(targetedCmd), opCtx, boost::none, request, collationObj);
+}
+
+std::set<ShardId> MongoSInterface::getTargetedShards(
+ OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation) {
+ if (mustRunOnAllShards) {
+ // The pipeline begins with a stage which must be run on all shards.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ return {shardIds.begin(), shardIds.end()};
+ }
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo);
+
+ return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
+}
+
+bool MongoSInterface::mustRunOnAllShards(const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe) {
+ // The following aggregations must be routed to all shards:
+ // - Any collectionless aggregation, such as non-localOps $currentOp.
+ // - Any aggregation which begins with a $changeStream stage.
+ return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
+}
+
+StatusWith<CachedCollectionRoutingInfo> MongoSInterface::getExecutionNsRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& execNss) {
+ // First, verify that there are shards present in the cluster. If not, then we return the
+ // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because
+ // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
+ // a collection before its enclosing database is created. However, if there are no shards
+ // present, then $changeStream should immediately return an empty cursor just as other
+ // aggregations do when the database does not exist.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ if (shardIds.size() == 0) {
+ return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
+ }
+
+ // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not
+ // exist.
+ return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
+}
+
+/**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
+ * the pipeline that will need to be executed to merge the results from the remotes. If a stale
+ * shard version is encountered, refreshes the routing table and tries again.
+ */
+MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj) {
+ // The process is as follows:
+ // - First, determine whether we need to target more than one shard. If so, we split the
+ // pipeline; if not, we retain the existing pipeline.
+ // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
+ // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
+ // entire aggregation commmand.
+ auto cursors = std::vector<RemoteCursor>();
+ auto shardResults = std::vector<AsyncRequestsSender::Response>();
+ auto opCtx = expCtx->opCtx;
+
+ const bool needsPrimaryShardMerge =
+ (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
+
+ const bool needsMongosMerge = pipeline->needsMongosMerger();
+
+ const auto shardQuery = pipeline->getInitialQuery();
+
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+
+ // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
+ // Otherwise, uassert on all exceptions here.
+ if (!(liteParsedPipeline.hasChangeStream() &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ uassertStatusOK(executionNsRoutingInfoStatus);
+ }
+
+ auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
+ ? std::move(executionNsRoutingInfoStatus.getValue())
+ : boost::optional<CachedCollectionRoutingInfo>{};
+
+ // Determine whether we can run the entire aggregation on a single shard.
+ const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline);
+ std::set<ShardId> shardIds = getTargetedShards(
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
+
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->computeAndSetAtClusterTime(
+ opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
+ }
+
+ // Don't need to split the pipeline if we are only targeting a single shard, unless:
+ // - There is a stage that needs to be run on the primary shard and the single target shard
+ // is not the primary.
+ // - The pipeline contains one or more stages which must always merge on mongoS.
+ const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
+ (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
+
+ boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
+
+ if (needsSplit) {
+ splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
+
+ exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ opCtx, splitPipeline->mergePipeline.get());
+ }
+
+ // Generate the command object for the targeted shards.
+ BSONObj targetedCommand = splitPipeline
+ ? createCommandForTargetedShards(
+ opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
+ : createPassthroughCommandForShard(
+ opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
+
+ // Refresh the shard registry if we're targeting all shards. We need the shard registry
+ // to be at least as current as the logical time used when creating the command for
+ // $changeStream to work reliably, so we do a "hard" reload.
+ if (mustRunOnAll) {
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ shardRegistry->reload(opCtx);
+ }
+ }
+
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAll) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and
+ // should not participate in the shard version protocol.
+ shardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db(),
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target
+ // shards, and should participate in the shard version protocol.
+ invariant(executionNsRoutingInfo);
+ shardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db(),
+ executionNss,
+ *executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation());
+ }
+ } else {
+ cursors = establishShardCursors(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ executionNsRoutingInfo,
+ targetedCommand,
+ aggRequest,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery);
+ invariant(cursors.size() % shardIds.size() == 0,
+ str::stream() << "Number of cursors (" << cursors.size()
+ << ") is not a multiple of producers ("
+ << shardIds.size()
+ << ")");
+ }
+
+ // Convert remote cursors into a vector of "owned" cursors.
+ std::vector<OwnedRemoteCursor> ownedCursors;
+ for (auto&& cursor : cursors) {
+ ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
+ }
+
+ // Record the number of shards involved in the aggregation. If we are required to merge on
+ // the primary shard, but the primary shard was not in the set of targeted shards, then we
+ // must increment the number of involved shards.
+ CurOp::get(opCtx)->debug().nShards =
+ shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ !shardIds.count(executionNsRoutingInfo->db().primaryId()));
+
+ return DispatchShardPipelineResults{needsPrimaryShardMerge,
+ std::move(ownedCursors),
+ std::move(shardResults),
+ std::move(splitPipeline),
+ std::move(pipeline),
+ targetedCommand,
+ shardIds.size(),
+ exchangeSpec};
+}
+
+std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions pipelineOptions) {
+ // Explain is not supported for auxiliary lookups.
+ invariant(!expCtx->explain);
+
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
+ if (pipelineOptions.optimize) {
+ pipeline->optimizePipeline();
+ }
+ if (pipelineOptions.attachCursorSource) {
+ // 'attachCursorSourceToPipeline' handles any complexity related to sharding.
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
+ }
+
+ return pipeline;
+}
+
+
+std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ // Generate the command object for the targeted shards.
+ std::vector<BSONObj> rawStages = [pipeline]() {
+ auto serialization = pipeline->serialize();
+ std::vector<BSONObj> stages;
+ stages.reserve(serialization.size());
+
+ for (const auto& stageObj : serialization) {
+ invariant(stageObj.getType() == BSONType::Object);
+ stages.push_back(stageObj.getDocument().toBson());
+ }
+
+ return stages;
+ }();
+
+ AggregationRequest aggRequest(expCtx->ns, rawStages);
+ LiteParsedPipeline liteParsedPipeline(aggRequest);
+ auto shardDispatchResults = MongoSInterface::dispatchShardPipeline(
+ expCtx,
+ expCtx->ns,
+ aggRequest,
+ liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)),
+ expCtx->collation);
+
+ std::vector<ShardId> targetedShards;
+ targetedShards.reserve(shardDispatchResults.remoteCursors.size());
+ for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
+ targetedShards.emplace_back(remoteCursor->getShardId().toString());
+ }
+
+ std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
+ boost::optional<BSONObj> shardCursorsSortSpec = boost::none;
+ if (shardDispatchResults.splitPipeline) {
+ mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
+ shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
+ } else {
+ mergePipeline = std::move(shardDispatchResults.pipelineForSingleShard);
+ }
+
+ cluster_aggregation_planner::addMergeCursorsSource(
+ mergePipeline.get(),
+ liteParsedPipeline,
+ shardDispatchResults.commandForTargetedShards,
+ std::move(shardDispatchResults.remoteCursors),
+ targetedShards,
+ shardCursorsSortSpec,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor());
+
+ return mergePipeline;
+}
+
boost::optional<Document> MongoSInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index f2806baf001..6111f3346d8 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -32,6 +32,10 @@
#include "mongo/db/pipeline/mongo_process_common.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/query/cluster_aggregation_planner.h"
+#include "mongo/s/query/owned_remote_cursor.h"
namespace mongo {
@@ -41,6 +45,81 @@ namespace mongo {
*/
class MongoSInterface final : public MongoProcessCommon {
public:
+ struct DispatchShardPipelineResults {
+ // True if this pipeline was split, and the second half of the pipeline needs to be run on
+ // the primary shard for the database.
+ bool needsPrimaryShardMerge;
+
+ // Populated if this *is not* an explain, this vector represents the cursors on the remote
+ // shards.
+ std::vector<OwnedRemoteCursor> remoteCursors;
+
+ // Populated if this *is* an explain, this vector represents the results from each shard.
+ std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
+
+ // The split version of the pipeline if more than one shard was targeted, otherwise
+ // boost::none.
+ boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
+
+ // If the pipeline targeted a single shard, this is the pipeline to run on that shard.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard;
+
+ // The command object to send to the targeted shards.
+ BSONObj commandForTargetedShards;
+
+ // How many exchange producers are running the shard part of splitPipeline.
+ size_t numProducers;
+
+ // The exchange specification if the query can run with the exchange otherwise boost::none.
+ boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ };
+
+ static Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req);
+
+ static BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
+ Pipeline* pipeline,
+ BSONObj collationObj);
+
+ /**
+ * Appends information to the command sent to the shards which should be appended both if this
+ * is a passthrough sent to a single shard and if this is a split pipeline.
+ */
+ static BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
+ const AggregationRequest& request,
+ BSONObj collationObj);
+
+ static BSONObj createCommandForTargetedShards(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const cluster_aggregation_planner::SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ bool needsMerge);
+
+ static std::set<ShardId> getTargetedShards(
+ OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation);
+
+ static bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe);
+
+ static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& execNss);
+
+ static DispatchShardPipelineResults dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj);
+
MongoSInterface() = default;
virtual ~MongoSInterface() = default;
@@ -119,10 +198,8 @@ public:
MONGO_UNREACHABLE;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
- MONGO_UNREACHABLE;
- }
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final {
MONGO_UNREACHABLE;
@@ -133,12 +210,10 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) final {
- MONGO_UNREACHABLE;
- }
+ const MakePipelineOptions pipelineOptions) final;
/**
* The following methods only make sense for data-bearing nodes and should never be called on
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index bf0987ea4ab..1eb3af3aa97 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -274,45 +274,35 @@ void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged(
_client.runCommand("admin", renameCommandObj, info));
}
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoInterfaceStandalone::makePipeline(
+std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
- Status cursorStatus = Status::OK();
-
if (opts.attachCursorSource) {
- cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
- return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
+ return pipeline;
}
-Status MongoInterfaceStandalone::attachCursorSourceToPipeline(
+unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
invariant(pipeline->getSources().empty() ||
!dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
boost::optional<AutoGetCollectionForReadCommand> autoColl;
if (expCtx->uuid) {
- try {
- autoColl.emplace(expCtx->opCtx,
- NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid},
- AutoGetCollection::ViewMode::kViewsForbidden,
- Date_t::max(),
- AutoStatsTracker::LogMode::kUpdateTop);
- } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
- // The UUID doesn't exist anymore
- return ex.toStatus();
- }
+ autoColl.emplace(expCtx->opCtx,
+ NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid},
+ AutoGetCollection::ViewMode::kViewsForbidden,
+ Date_t::max(),
+ AutoStatsTracker::LogMode::kUpdateTop);
} else {
autoColl.emplace(expCtx->opCtx,
expCtx->ns,
@@ -337,7 +327,7 @@ Status MongoInterfaceStandalone::attachCursorSourceToPipeline(
// the initial cursor stage.
pipeline->optimizePipeline();
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const {
@@ -381,7 +371,7 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument(
nss,
collectionUUID,
_getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
- pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx));
+ pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return boost::none;
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 12627655cd5..f1c7fbcc910 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -87,12 +87,12 @@ public:
const NamespaceString& targetNs,
const BSONObj& originalCollectionOptions,
const std::list<BSONObj>& originalIndexes) final;
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final;
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final;
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final;
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
OperationContext* opCtx, const NamespaceString&, UUID) const override;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 08b9e073f35..b9d7befb857 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -116,15 +116,15 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) override {
MONGO_UNREACHABLE;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) override {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 015ea1c66d1..d06c2e91373 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -26,10 +26,10 @@ env.Library(
target='cluster_aggregate',
source=[
'cluster_aggregate.cpp',
- 'cluster_aggregation_planner.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/pipeline/pipeline',
+ '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner',
'$BUILD_DIR/mongo/s/query/cluster_client_cursor',
'$BUILD_DIR/mongo/db/pipeline/mongos_process_interface',
'$BUILD_DIR/mongo/db/views/views',
@@ -37,6 +37,16 @@ env.Library(
]
)
+env.Library(
+ target='cluster_aggregation_planner',
+ source=[
+ 'cluster_aggregation_planner.cpp',
+ ],
+ LIBDEPS=[
+ 'cluster_query',
+ ]
+)
+
env.CppUnitTest(
target="cluster_aggregate_test",
source=[
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 9fc978b46ee..506842da29c 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -82,7 +82,6 @@ namespace mongo {
using SplitPipeline = cluster_aggregation_planner::SplitPipeline;
-MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToEstablishMergingShardCursor);
MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToDispatchExchangeConsumerPipeline);
@@ -90,41 +89,6 @@ constexpr unsigned ClusterAggregate::kMaxViewRetries;
namespace {
-Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) {
- // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
- // pipeline does not support writeConcern.
- if (req.getWriteConcern()) {
- return Shard::RetryPolicy::kNotIdempotent;
- }
- return Shard::RetryPolicy::kIdempotent;
-}
-
-// Given a document representing an aggregation command such as
-//
-// {aggregate: "myCollection", pipeline: [], ...},
-//
-// produces the corresponding explain command:
-//
-// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
-Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
- MutableDocument explainCommandBuilder;
- explainCommandBuilder["explain"] = Value(aggregateCommand);
- // Downstream host targeting code expects queryOptions at the top level of the command object.
- explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
- Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
-
- // readConcern needs to be promoted to the top-level of the request.
- explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
- Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
-
- // Add explain command options.
- for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
- explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
- }
-
- return explainCommandBuilder.freeze();
-}
-
Status appendCursorResponseToCommandResult(const ShardId& shardId,
const BSONObj cursorResponse,
BSONObjBuilder* result) {
@@ -138,143 +102,6 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId,
return getStatusFromCommandResult(result->asTempObj());
}
-bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) {
- // The following aggregations must be routed to all shards:
- // - Any collectionless aggregation, such as non-localOps $currentOp.
- // - Any aggregation which begins with a $changeStream stage.
- return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
-}
-
-StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
- const NamespaceString& execNss) {
- // First, verify that there are shards present in the cluster. If not, then we return the
- // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because
- // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
- // a collection before its enclosing database is created. However, if there are no shards
- // present, then $changeStream should immediately return an empty cursor just as other
- // aggregations do when the database does not exist.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
- if (shardIds.size() == 0) {
- return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
- }
-
- // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not
- // exist.
- return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
-}
-
-std::set<ShardId> getTargetedShards(OperationContext* opCtx,
- bool mustRunOnAllShards,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation) {
- if (mustRunOnAllShards) {
- // The pipeline begins with a stage which must be run on all shards.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
- return {shardIds.begin(), shardIds.end()};
- }
-
- // If we don't need to run on all shards, then we should always have a valid routing table.
- invariant(routingInfo);
-
- return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
-}
-
-/**
- * Appends information to the command sent to the shards which should be appended both if this is a
- * passthrough sent to a single shard and if this is a split pipeline.
- */
-BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
- OperationContext* opCtx,
- const boost::optional<ShardId>& shardId,
- const AggregationRequest& request,
- BSONObj collationObj) {
- cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
- // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
- // explain command.
- if (auto explainVerbosity = request.getExplain()) {
- cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
- }
-
- if (!collationObj.isEmpty()) {
- cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
- }
-
- if (opCtx->getTxnNumber()) {
- invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(),
- str::stream() << "Command for shards unexpectedly had the "
- << OperationSessionInfo::kTxnNumberFieldName
- << " field set: "
- << cmdForShards.peek().toString());
- cmdForShards[OperationSessionInfo::kTxnNumberFieldName] =
- Value(static_cast<long long>(*opCtx->getTxnNumber()));
- }
-
- auto aggCmd = cmdForShards.freeze().toBson();
-
- if (shardId) {
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd);
- }
- }
-
- // agg creates temp collection and should handle implicit create separately.
- return appendAllowImplicitCreate(aggCmd, true);
-}
-
-BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<ShardId>& shardId,
- Pipeline* pipeline,
- BSONObj collationObj) {
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- if (pipeline) {
- targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
- }
-
- return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj);
-}
-
-BSONObj createCommandForTargetedShards(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const SplitPipeline& splitPipeline,
- const BSONObj collationObj,
- const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
- bool needsMerge) {
-
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
- // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
- // have detected a logged in user and appended that user name to the $listSessions spec to
- // send to the shards.
- targetedCmd[AggregationRequest::kPipelineName] =
- Value(splitPipeline.shardsPipeline->serialize());
-
- // When running on many shards with the exchange we may not need merging.
- if (needsMerge) {
- targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
-
- // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
- // part. Otherwise this is part of an exchange and in that case we should include the
- // writeConcern.
- targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
- }
-
- targetedCmd[AggregationRequest::kCursorName] =
- Value(DOC(AggregationRequest::kBatchSizeName << 0));
-
- targetedCmd[AggregationRequest::kExchangeName] =
- exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
-
- return genericTransformForShards(
- std::move(targetedCmd), opCtx, boost::none, request, collationObj);
-}
-
BSONObj createCommandForMergingShard(const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const ShardId& shardId,
@@ -302,252 +129,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
return appendAllowImplicitCreate(aggCmd, true);
}
-std::vector<RemoteCursor> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj& cmdObj,
- const AggregationRequest& request,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery) {
- LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
-
- const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe);
- std::set<ShardId> shardIds =
- getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
- std::vector<std::pair<ShardId, BSONObj>> requests;
-
- // If we don't need to run on all shards, then we should always have a valid routing table.
- invariant(routingInfo || mustRunOnAll);
-
- if (mustRunOnAll) {
- // The pipeline contains a stage which must be run on all shards. Skip versioning and
- // enqueue the raw command objects.
- for (auto&& shardId : shardIds) {
- requests.emplace_back(std::move(shardId), cmdObj);
- }
- } else if (routingInfo->cm()) {
- // The collection is sharded. Use the routing table to decide which shards to target
- // based on the query and collation, and build versioned requests for them.
- for (auto& shardId : shardIds) {
- auto versionedCmdObj =
- appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
- requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
- }
- } else {
- // The collection is unsharded. Target only the primary shard for the database.
- // Don't append shard version info when contacting the config servers.
- requests.emplace_back(routingInfo->db().primaryId(),
- !routingInfo->db().primary()->isConfig()
- ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
- : cmdObj);
- }
-
- if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
- "until fail point is disabled.";
- while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- sleepsecs(1);
- }
- }
-
- return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- readPref,
- requests,
- false /* do not allow partial results */,
- getDesiredRetryPolicy(request));
-}
-
-struct DispatchShardPipelineResults {
- // True if this pipeline was split, and the second half of the pipeline needs to be run on the
- // primary shard for the database.
- bool needsPrimaryShardMerge;
-
- // Populated if this *is not* an explain, this vector represents the cursors on the remote
- // shards.
- std::vector<OwnedRemoteCursor> remoteCursors;
-
- // Populated if this *is* an explain, this vector represents the results from each shard.
- std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
-
- // The split version of the pipeline if more than one shard was targeted, otherwise boost::none.
- boost::optional<SplitPipeline> splitPipeline;
-
- // If the pipeline targeted a single shard, this is the pipeline to run on that shard.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard;
-
- // The command object to send to the targeted shards.
- BSONObj commandForTargetedShards;
-
- // How many exchange producers are running the shard part of splitPipeline.
- size_t numProducers;
-
- // The exchange specification if the query can run with the exchange otherwise boost::none.
- boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
-};
-
-/**
- * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
- * the pipeline that will need to be executed to merge the results from the remotes. If a stale
- * shard version is encountered, refreshes the routing table and tries again.
- */
-DispatchShardPipelineResults dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- BSONObj collationObj) {
- // The process is as follows:
- // - First, determine whether we need to target more than one shard. If so, we split the
- // pipeline; if not, we retain the existing pipeline.
- // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
- // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
- // entire aggregation commmand.
- auto cursors = std::vector<RemoteCursor>();
- auto shardResults = std::vector<AsyncRequestsSender::Response>();
- auto opCtx = expCtx->opCtx;
-
- const bool needsPrimaryShardMerge =
- (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
-
- const bool needsMongosMerge = pipeline->needsMongosMerger();
-
- const auto shardQuery = pipeline->getInitialQuery();
-
- auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
-
- // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
- // Otherwise, uassert on all exceptions here.
- if (!(liteParsedPipeline.hasChangeStream() &&
- executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
- uassertStatusOK(executionNsRoutingInfoStatus);
- }
-
- auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
- ? std::move(executionNsRoutingInfoStatus.getValue())
- : boost::optional<CachedCollectionRoutingInfo>{};
-
- // Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline);
- std::set<ShardId> shardIds = getTargetedShards(
- opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
-
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->computeAndSetAtClusterTime(
- opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
- }
-
- // Don't need to split the pipeline if we are only targeting a single shard, unless:
- // - There is a stage that needs to be run on the primary shard and the single target shard
- // is not the primary.
- // - The pipeline contains one or more stages which must always merge on mongoS.
- const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
- (needsPrimaryShardMerge && executionNsRoutingInfo &&
- *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
-
- boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
- boost::optional<SplitPipeline> splitPipeline;
-
- if (needsSplit) {
- splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
-
- exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
- opCtx, splitPipeline->mergePipeline.get());
- }
-
- // Generate the command object for the targeted shards.
- BSONObj targetedCommand = splitPipeline
- ? createCommandForTargetedShards(
- opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
- : createPassthroughCommandForShard(
- opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
-
- // Refresh the shard registry if we're targeting all shards. We need the shard registry
- // to be at least as current as the logical time used when creating the command for
- // $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAll) {
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
- shardRegistry->reload(opCtx);
- }
- }
-
- // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
- if (expCtx->explain) {
- if (mustRunOnAll) {
- // Some stages (such as $currentOp) need to be broadcast to all shards, and
- // should not participate in the shard version protocol.
- shardResults =
- scatterGatherUnversionedTargetAllShards(opCtx,
- executionNss.db(),
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- } else {
- // Aggregations on a real namespace should use the routing table to target
- // shards, and should participate in the shard version protocol.
- invariant(executionNsRoutingInfo);
- shardResults =
- scatterGatherVersionedTargetByRoutingTable(opCtx,
- executionNss.db(),
- executionNss,
- *executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- shardQuery,
- aggRequest.getCollation());
- }
- } else {
- cursors = establishShardCursors(opCtx,
- executionNss,
- liteParsedPipeline,
- executionNsRoutingInfo,
- targetedCommand,
- aggRequest,
- ReadPreferenceSetting::get(opCtx),
- shardQuery);
- invariant(cursors.size() % shardIds.size() == 0,
- str::stream() << "Number of cursors (" << cursors.size()
- << ") is not a multiple of producers ("
- << shardIds.size()
- << ")");
- }
-
- // Convert remote cursors into a vector of "owned" cursors.
- std::vector<OwnedRemoteCursor> ownedCursors;
- for (auto&& cursor : cursors) {
- ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
- }
-
- // Record the number of shards involved in the aggregation. If we are required to merge on
- // the primary shard, but the primary shard was not in the set of targeted shards, then we
- // must increment the number of involved shards.
- CurOp::get(opCtx)->debug().nShards =
- shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
- !shardIds.count(executionNsRoutingInfo->db().primaryId()));
-
- return DispatchShardPipelineResults{needsPrimaryShardMerge,
- std::move(ownedCursors),
- std::move(shardResults),
- std::move(splitPipeline),
- std::move(pipeline),
- targetedCommand,
- shardIds.size(),
- exchangeSpec};
-}
-
-DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
+MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
BSONObj collationObj,
- DispatchShardPipelineResults* shardDispatchResults) {
+ MongoSInterface::DispatchShardPipelineResults* shardDispatchResults) {
invariant(!liteParsedPipeline.hasChangeStream());
auto opCtx = expCtx->opCtx;
@@ -584,7 +172,7 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none);
- auto consumerCmdObj = createCommandForTargetedShards(
+ auto consumerCmdObj = MongoSInterface::createCommandForTargetedShards(
opCtx, aggRequest, consumerPipelines.back(), collationObj, boost::none, false);
requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx],
@@ -617,16 +205,16 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
mergeCursors->dismissCursorOwnership();
}
- return DispatchShardPipelineResults{false,
- std::move(ownedCursors),
- {} /*TODO SERVER-36279*/,
- std::move(splitPipeline),
- nullptr,
- BSONObj(),
- numConsumers};
+ return MongoSInterface::DispatchShardPipelineResults{false,
+ std::move(ownedCursors),
+ {} /*TODO SERVER-36279*/,
+ std::move(splitPipeline),
+ nullptr,
+ BSONObj(),
+ numConsumers};
}
-Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults,
+Status appendExplainResults(MongoSInterface::DispatchShardPipelineResults&& dispatchResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
BSONObjBuilder* result) {
if (dispatchResults.splitPipeline) {
@@ -688,7 +276,7 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
- Shard::RetryPolicy retryPolicy = getDesiredRetryPolicy(request);
+ Shard::RetryPolicy retryPolicy = MongoSInterface::getDesiredRetryPolicy(request);
return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts(
opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy));
}
@@ -914,36 +502,16 @@ ShardId pickMergingShard(OperationContext* opCtx,
: targetedShards[prng.nextInt32(targetedShards.size())];
}
-// "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the
-// pipeline. We won't try to execute anything on a mongos, but we still have to populate this map so
-// that any $lookups, etc. will be able to have a resolved view definition. It's okay that this is
-// incorrect, we will repopulate the real namespace map on the mongod. Note that this function must
-// be called before forwarding an aggregation command on an unsharded collection, in order to verify
-// that the involved namespaces are allowed to be sharded.
-StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces(
- OperationContext* opCtx, const LiteParsedPipeline& litePipe) {
-
- StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
- for (auto&& nss : litePipe.getInvolvedNamespaces()) {
- const auto resolvedNsRoutingInfo =
- uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
- uassert(28769,
- str::stream() << nss.ns() << " cannot be sharded",
- !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss));
- resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
- }
- return resolvedNamespaces;
-}
-
-// Build an appropriate ExpressionContext for the pipeline. This helper validates that all involved
-// namespaces are unsharded, instantiates an appropriate collator, creates a MongoProcessInterface
-// for use by the pipeline's stages, and optionally extracts the UUID from the collection info if
-// present.
-boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- BSONObj collationObj,
- boost::optional<UUID> uuid) {
+// Build an appropriate ExpressionContext for the pipeline. This helper instantiates an appropriate
+// collator, creates a MongoProcessInterface for use by the pipeline's stages, and optionally
+// extracts the UUID from the collection info if present.
+boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
+ BSONObj collationObj,
+ boost::optional<UUID> uuid,
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces) {
std::unique_ptr<CollatorInterface> collation;
if (!collationObj.isEmpty()) {
@@ -958,7 +526,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext*
request,
std::move(collation),
std::make_shared<MongoSInterface>(),
- resolveInvolvedNamespaces(opCtx, litePipe),
+ std::move(resolvedNamespaces),
uuid);
mergeCtx->inMongos = true;
@@ -1002,7 +570,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
const AggregationRequest& request,
const LiteParsedPipeline& litePipe,
const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- DispatchShardPipelineResults&& shardDispatchResults,
+ MongoSInterface::DispatchShardPipelineResults&& shardDispatchResults,
BSONObjBuilder* result) {
// We should never be in a situation where we call this function on a non-merge pipeline.
invariant(shardDispatchResults.splitPipeline);
@@ -1089,7 +657,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const AggregationRequest& request,
BSONObjBuilder* result) {
uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec());
- auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
+ auto executionNsRoutingInfoStatus =
+ MongoSInterface::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
boost::optional<CachedCollectionRoutingInfo> routingInfo;
LiteParsedPipeline litePipe(request);
@@ -1109,18 +678,38 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// Determine whether this aggregation must be dispatched to all shards in the cluster.
- const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, litePipe);
+ const bool mustRunOnAll =
+ MongoSInterface::mustRunOnAllShards(namespaces.executionNss, litePipe);
// If we don't have a routing table, then this is a $changeStream which must run on all shards.
invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream()));
- // If this pipeline is not on a sharded collection, is allowed to be forwarded to shards, does
- // not need to run on all shards, and doesn't need to go through DocumentSource::serialize(),
- // then go ahead and pass it through to the owning shard unmodified. Note that we first call
- // resolveInvolvedNamespaces to validate that none of the namespaces are sharded.
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ bool involvesShardedCollections = false;
+ for (auto&& nss : litePipe.getInvolvedNamespaces()) {
+ const auto resolvedNsRoutingInfo =
+ uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
+
+ uassert(28769,
+ str::stream() << nss.ns() << " cannot be sharded",
+ !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss));
+
+ resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
+ if (resolvedNsRoutingInfo.cm()) {
+ involvesShardedCollections = true;
+ }
+ }
+
+ // A pipeline is allowed to passthrough to the primary shard iff the following conditions are
+ // met:
+ //
+ // 1. The namespace of the aggregate and any other involved namespaces are unsharded.
+ // 2. Is allowed to be forwarded to shards.
+ // 3. Does not need to run on all shards.
+ // 4. Doesn't need transformation via DocumentSource::serialize().
if (routingInfo && !routingInfo->cm() && !mustRunOnAll &&
- litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) {
- resolveInvolvedNamespaces(opCtx, litePipe);
+ litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos() &&
+ !involvesShardedCollections) {
const auto primaryShardId = routingInfo->db().primary()->getId();
return aggPassthrough(opCtx, namespaces, primaryShardId, request, litePipe, result);
}
@@ -1133,7 +722,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
// resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the
// pipeline's stages.
- auto expCtx = makeExpressionContext(opCtx, request, litePipe, collationObj, uuid);
+ auto expCtx = makeExpressionContext(
+ opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces));
// Parse and optimize the full pipeline.
auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
@@ -1154,7 +744,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = dispatchShardPipeline(
+ auto shardDispatchResults = MongoSInterface::dispatchShardPipeline(
expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
// If the operation is an explain, then we verify that it succeeded on all targeted shards,
@@ -1237,7 +827,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, BSONObj()));
+ MongoSInterface::createPassthroughCommandForShard(
+ opCtx, aggRequest, shardId, nullptr, BSONObj()));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,