summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-01-16 12:42:27 -0500
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-01-16 14:16:34 -0500
commit4eabf1ea6225f444b3b0b3b2fee785aaa306212f (patch)
tree53fc00f7e31089dcb3ffb4c16f770b0a5468c3b9 /src
parent2f788aa745ca1366704b821087225af49ce3285a (diff)
downloadmongo-4eabf1ea6225f444b3b0b3b2fee785aaa306212f.tar.gz
Revert "SERVER-32308: Add the ability for a $lookup stage to execute on mongos against a sharded foreign collection"
This reverts commit 7298d273c0497f2720ec1471ad0f4910bff07af4.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp37
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h20
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp8
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h10
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp15
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h4
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp512
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp526
-rw-r--r--src/mongo/s/commands/pipeline_s.h73
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h14
-rw-r--r--src/mongo/s/query/router_stage_internal_cursor.h56
15 files changed, 545 insertions, 760 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index d975895dc4d..cf1267eeb03 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -218,7 +218,7 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
return false;
}
@@ -236,16 +236,16 @@ public:
}
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
}
return pipeline;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
private:
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 6db48d43850..05a62606bb6 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -75,16 +75,16 @@ public:
}
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
}
return pipeline;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_results));
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
private:
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index ecc9d572bfe..e4fd70920f6 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -283,8 +283,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
- pipeline = uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- _fromExpCtx, pipeline.release()));
+ uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ _fromExpCtx, pipeline.get()));
}
// If the cache has been abandoned, release it.
@@ -614,39 +614,6 @@ void DocumentSourceLookUp::initializeIntrospectionPipeline() {
sources.empty() || !sources.front()->constraints().isChangeStreamStage());
}
-DocumentSource::StageConstraints DocumentSourceLookUp::constraints(
- Pipeline::SplitState pipeState) const {
-
- const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
- std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
- _parsedIntrospectionPipeline->getSources().end(),
- [](const auto& source) {
- return source->constraints().diskRequirement ==
- DiskUseRequirement::kWritesTmpData;
- });
-
- // If executing on mongos and the foreign collection is sharded, then this stage can run on
- // mongos.
- HostTypeRequirement hostReq;
- if (pExpCtx->inMongos) {
- hostReq = pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)
- ? HostTypeRequirement::kMongoS
- : HostTypeRequirement::kPrimaryShard;
- } else {
- hostReq = HostTypeRequirement::kPrimaryShard;
- }
-
- StageConstraints constraints(StreamType::kStreaming,
- PositionRequirement::kNone,
- hostReq,
- mayUseDisk ? DiskUseRequirement::kWritesTmpData
- : DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed);
-
- constraints.canSwapWithMatch = true;
- return constraints;
-}
-
void DocumentSourceLookUp::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
Document doc;
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 7424b2ef97d..530c62f985c 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -96,7 +96,25 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- StageConstraints constraints(Pipeline::SplitState pipeState) const final;
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
+ std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
+ _parsedIntrospectionPipeline->getSources().end(),
+ [](const auto& source) {
+ return source->constraints().diskRequirement ==
+ DiskUseRequirement::kWritesTmpData;
+ });
+
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ mayUseDisk ? DiskUseRequirement::kWritesTmpData
+ : DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
+ constraints.canSwapWithMatch = true;
+ return constraints;
+ }
GetDepsReturn getDependencies(DepsTracker* deps) const final;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 77feab2f825..175806d5f7e 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -101,16 +101,16 @@ public:
}
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
}
return pipeline;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
boost::optional<Document> lookupSingleDocument(
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index b77c549a855..dfe4ece48ac 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -511,14 +511,14 @@ public:
}
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
}
return pipeline;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
if (pipeline->popFrontWithCriteria("$match") ||
pipeline->popFrontWithCriteria("$sort") ||
@@ -529,7 +529,7 @@ public:
}
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
private:
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 7d78c880d1b..d32e8276c63 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -156,16 +156,12 @@ public:
const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
/**
- * Accepts a pipeline and returns a new one which will draw input from the underlying
- * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be
+ * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This
+ * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be
* returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
* the only case where NamespaceNotFound is returned.
- *
- * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr.
- * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the
- * compiler expects to find an implementation of PipelineDeleter.
*/
- virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ virtual Status attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
/**
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 16d6d9e325c..7ce395f3716 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -585,8 +585,8 @@ DBClientBase* PipelineD::MongoDInterface::directClient() {
}
bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- AutoGetCollectionForRead autoColl(opCtx, nss);
- // TODO SERVER-32198: Use CollectionShardingState::collectionIsSharded() to confirm sharding
+ AutoGetCollectionForReadCommand autoColl(opCtx, nss);
+ // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
// state.
auto css = CollectionShardingState::get(opCtx, nss);
return bool(css->getMetadata());
@@ -689,15 +689,16 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterfac
pipeline.getValue()->optimizePipeline();
}
+ Status cursorStatus = Status::OK();
+
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
}
- return pipeline;
+ return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
}
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>>
-PipelineD::MongoDInterface::attachCursorSourceToPipeline(
+Status PipelineD::MongoDInterface::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
invariant(pipeline->getSources().empty() ||
!dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
@@ -728,7 +729,7 @@ PipelineD::MongoDInterface::attachCursorSourceToPipeline(
PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
std::vector<BSONObj> PipelineD::MongoDInterface::getCurrentOps(
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index f626b0f904b..afbb6b8f73f 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -95,8 +95,8 @@ public:
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final;
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final;
std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
CurrentOpConnectionsMode connMode,
CurrentOpUserMode userMode,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 488cf97c58e..0ec37b15eb4 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -108,8 +108,8 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index f7e55ee4965..db8e9fed543 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -62,13 +62,43 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/establish_cursors.h"
+#include "mongo/s/query/router_stage_update_on_add_shard.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
namespace mongo {
+MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors);
+
namespace {
+// Given a document representing an aggregation command such as
+//
+// {aggregate: "myCollection", pipeline: [], ...},
+//
+// produces the corresponding explain command:
+//
+// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
+Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
+ MutableDocument explainCommandBuilder;
+ explainCommandBuilder["explain"] = Value(aggregateCommand);
+ // Downstream host targeting code expects queryOptions at the top level of the command object.
+ explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
+ Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
+
+ // readConcern needs to be promoted to the top-level of the request.
+ explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
+ Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
+
+ // Add explain command options.
+ for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
+ explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
+ }
+
+ return explainCommandBuilder.freeze();
+}
+
Status appendExplainResults(
const std::vector<AsyncRequestsSender::Response>& shardResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
@@ -114,6 +144,15 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId,
return getStatusFromCommandResult(result->asTempObj());
}
+bool mustRunOnAllShards(const NamespaceString& nss,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const LiteParsedPipeline& litePipe) {
+ // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection
+ // must run on all shards.
+ const bool nsIsSharded = static_cast<bool>(routingInfo.cm());
+ return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream());
+}
+
StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
const NamespaceString& execNss,
CatalogCache* catalogCache) {
@@ -135,6 +174,65 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte
return swRoutingInfo;
}
+std::set<ShardId> getTargetedShards(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation) {
+ if (mustRunOnAllShards(nss, routingInfo, litePipe)) {
+ // The pipeline begins with a stage which must be run on all shards.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ return {shardIds.begin(), shardIds.end()};
+ }
+
+ if (routingInfo.cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation.
+ std::set<ShardId> shardIds;
+ routingInfo.cm()->getShardIdsForQuery(opCtx, shardQuery, collation, &shardIds);
+ return shardIds;
+ }
+
+ // The collection is unsharded. Target only the primary shard for the database.
+ return {routingInfo.primaryId()};
+}
+
+BSONObj createCommandForTargetedShards(
+ const AggregationRequest& request,
+ const BSONObj originalCmdObj,
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) {
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
+
+ // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough.
+ if (pipelineForTargetedShards) {
+ targetedCmd[AggregationRequest::kPipelineName] =
+ Value(pipelineForTargetedShards->serialize());
+
+ if (pipelineForTargetedShards->isSplitForShards()) {
+ targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+ targetedCmd[AggregationRequest::kCursorName] =
+ Value(DOC(AggregationRequest::kBatchSizeName << 0));
+ }
+ }
+
+ // If this pipeline is not split, ensure that the write concern is propagated if present.
+ if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) {
+ targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
+ }
+
+ // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
+ // explain command.
+ if (auto explainVerbosity = request.getExplain()) {
+ targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity));
+ }
+
+ return targetedCmd.freeze().toBson();
+}
+
BSONObj createCommandForMergingShard(
const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
@@ -157,6 +255,247 @@ BSONObj createCommandForMergingShard(
return mergeCmd.freeze().toBson();
}
+StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ CachedCollectionRoutingInfo* routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
+ LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
+
+ std::set<ShardId> shardIds =
+ getTargetedShards(opCtx, nss, litePipe, *routingInfo, shardQuery, collation);
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+
+ if (mustRunOnAllShards(nss, *routingInfo, litePipe)) {
+ // The pipeline contains a stage which must be run on all shards. Skip versioning and
+ // enqueue the raw command objects.
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+ } else if (routingInfo->cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation, and build versioned requests for them.
+ for (auto& shardId : shardIds) {
+ auto versionedCmdObj =
+ appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
+ requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo->primaryId(),
+ !routingInfo->primary()->isConfig()
+ ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
+ : cmdObj);
+ }
+
+ if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
+ "until fail point is disabled.";
+ while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ sleepsecs(1);
+ }
+ }
+
+ // If we reach this point, we're either trying to establish cursors on a sharded execution
+ // namespace, or handling the case where a sharded collection was dropped and recreated as
+ // unsharded. Since views cannot be sharded, and because we will return an error rather than
+ // attempting to continue in the event that a recreated namespace is a view, we set
+ // viewDefinitionOut to nullptr.
+ BSONObj* viewDefinitionOut = nullptr;
+ auto swCursors = establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */,
+ viewDefinitionOut /* can't receive view definition */);
+
+ // If any shard returned a stale shardVersion error, invalidate the routing table cache.
+ // This will cause the cache to be refreshed the next time it is accessed.
+ if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*routingInfo));
+ }
+
+ return swCursors;
+}
+
+struct DispatchShardPipelineResults {
+ // True if this pipeline was split, and the second half of the pipeline needs to be run on the
+ // primary shard for the database.
+ bool needsPrimaryShardMerge;
+
+ // Populated if this *is not* an explain, this vector represents the cursors on the remote
+ // shards.
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+
+ // Populated if this *is* an explain, this vector represents the results from each shard.
+ std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
+
+ // The half of the pipeline that was sent to each shard, or the entire pipeline if there was
+ // only one shard targeted.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards;
+
+ // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
+
+ // The command object to send to the targeted shards.
+ BSONObj commandForTargetedShards;
+};
+
+/**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
+ * the pipeline that will need to be executed to merge the results from the remotes. If a stale
+ * shard version is encountered, refreshes the routing table and tries again.
+ */
+StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ BSONObj originalCmdObj,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
+ // The process is as follows:
+ // - First, determine whether we need to target more than one shard. If so, we split the
+ // pipeline; if not, we retain the existing pipeline.
+ // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
+ // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
+ // the refreshed routing table data.
+ // - If the pipeline is not split and we now need to target multiple shards, split it. If the
+ // pipeline is already split and we now only need to target a single shard, reassemble the
+ // original pipeline.
+ // - After exhausting 10 attempts to establish the cursors, we give up and throw.
+ auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
+ auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>();
+ auto opCtx = expCtx->opCtx;
+
+ const bool needsPrimaryShardMerge =
+ (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
+
+ const bool needsMongosMerge = pipeline->needsMongosMerger();
+
+ const auto shardQuery = pipeline->getInitialQuery();
+
+ auto pipelineForTargetedShards = std::move(pipeline);
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
+ BSONObj targetedCommand;
+
+ int numAttempts = 0;
+
+ do {
+ // We need to grab a new routing table at the start of each iteration, since a stale config
+ // exception will invalidate the previous one.
+ auto executionNsRoutingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss));
+
+ // Determine whether we can run the entire aggregation on a single shard.
+ std::set<ShardId> shardIds = getTargetedShards(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ executionNsRoutingInfo,
+ shardQuery,
+ aggRequest.getCollation());
+
+ uassert(ErrorCodes::ShardNotFound,
+ "No targets were found for this aggregation. All shards were removed from the "
+ "cluster mid-operation",
+ shardIds.size() > 0);
+
+ // Don't need to split the pipeline if we are only targeting a single shard, unless:
+ // - There is a stage that needs to be run on the primary shard and the single target shard
+ // is not the primary.
+ // - The pipeline contains one or more stages which must always merge on mongoS.
+ const bool needsSplit =
+ (shardIds.size() > 1u || needsMongosMerge ||
+ (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId()));
+
+ const bool isSplit = pipelineForTargetedShards->isSplitForShards();
+
+ // If we have to run on multiple shards and the pipeline is not yet split, split it. If we
+ // can run on a single shard and the pipeline is already split, reassemble it.
+ if (needsSplit && !isSplit) {
+ pipelineForMerging = std::move(pipelineForTargetedShards);
+ pipelineForTargetedShards = pipelineForMerging->splitForSharded();
+ } else if (!needsSplit && isSplit) {
+ pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
+ }
+
+ // Generate the command object for the targeted shards.
+ targetedCommand =
+ createCommandForTargetedShards(aggRequest, originalCmdObj, pipelineForTargetedShards);
+
+ // Refresh the shard registry if we're targeting all shards. We need the shard registry
+ // to be at least as current as the logical time used when creating the command for
+ // $changeStream to work reliably, so we do a "hard" reload.
+ if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ shardRegistry->reload(opCtx);
+ }
+ }
+
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and should
+ // not participate in the shard version protocol.
+ swShardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db().toString(),
+ executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target shards,
+ // and should participate in the shard version protocol.
+ swShardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db().toString(),
+ executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation(),
+ nullptr /* viewDefinition */);
+ }
+ } else {
+ swCursors = establishShardCursors(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ &executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery,
+ aggRequest.getCollation());
+
+ if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
+ LOG(1) << "got stale shardVersion error " << swCursors.getStatus()
+ << " while dispatching " << redact(targetedCommand) << " after "
+ << (numAttempts + 1) << " dispatch attempts";
+ }
+ }
+ } while (++numAttempts < kMaxNumStaleVersionRetries &&
+ (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK()));
+
+ if (!swShardResults.isOK()) {
+ return swShardResults.getStatus();
+ }
+ if (!swCursors.isOK()) {
+ return swCursors.getStatus();
+ }
+ return DispatchShardPipelineResults{needsPrimaryShardMerge,
+ std::move(swCursors.getValue()),
+ std::move(swShardResults.getValue()),
+ std::move(pipelineForTargetedShards),
+ std::move(pipelineForMerging),
+ targetedCommand};
+}
StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor(
OperationContext* opCtx,
@@ -181,6 +520,104 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCurs
return {{std::move(mergingShardId), std::move(shardCmdResponse)}};
}
+BSONObj establishMergingMongosCursor(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const NamespaceString& requestedNss,
+ BSONObj cmdToRunOnNewShards,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
+
+ ClusterClientCursorParams params(
+ requestedNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ ReadPreferenceSetting::get(opCtx));
+
+ params.tailableMode = pipelineForMerging->getContext()->tailableMode;
+ params.mergePipeline = std::move(pipelineForMerging);
+ params.remotes = std::move(cursors);
+
+ // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
+ // size we pass here is used for getMores, so do not specify a batch size if the initial request
+ // had a batch size of 0.
+ params.batchSize = request.getBatchSize() == 0
+ ? boost::none
+ : boost::optional<long long>(request.getBatchSize());
+
+ if (liteParsedPipeline.hasChangeStream()) {
+ // For change streams, we need to set up a custom stage to establish cursors on new shards
+ // when they are added.
+ params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
+ return stdx::make_unique<RouterStageUpdateOnAddShard>(
+ opCtx, executor, params, cmdToRunOnNewShards);
+ };
+ }
+ auto ccc = ClusterClientCursorImpl::make(
+ opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
+
+ auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
+ BSONObjBuilder cursorResponse;
+
+ CursorResponseBuilder responseBuilder(true, &cursorResponse);
+
+ for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
+ ClusterQueryResult next;
+ try {
+ next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind));
+ } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
+ // This exception is thrown when a $changeStream stage encounters an event
+ // that invalidates the cursor. We should close the cursor and return without
+ // error.
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ break;
+ }
+
+ // Check whether we have exhausted the pipeline's results.
+ if (next.isEOF()) {
+ // We reached end-of-stream. If the cursor is not tailable, then we mark it as
+ // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when
+ // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no
+ // hope of returning data and thus we need to close the mongos cursor as well.
+ if (!ccc->isTailable() || ccc->remotesExhausted()) {
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ }
+ break;
+ }
+
+ // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor
+ // to be returned on the next getMore.
+ auto nextObj = *next.getResult();
+
+ if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
+ ccc->queueResult(nextObj);
+ break;
+ }
+
+ responseBuilder.append(nextObj);
+ }
+
+ ccc->detachFromOperationContext();
+
+ CursorId clusterCursorId = 0;
+
+ if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
+ clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
+ opCtx,
+ ccc.releaseCursor(),
+ requestedNss,
+ ClusterCursorManager::CursorType::MultiTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
+ }
+
+ responseBuilder.done(clusterCursorId, requestedNss.ns());
+
+ CommandHelpers::appendCommandStatus(cursorResponse, Status::OK());
+
+ return cursorResponse.obj();
+}
+
BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard,
const NamespaceString& nss) {
ScopedDbConnection conn(primaryShard->getConnString());
@@ -216,10 +653,9 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
BSONObj cmdObj,
BSONObjBuilder* result) {
const auto catalogCache = Grid::get(opCtx)->catalogCache();
- auto executionNss = namespaces.executionNss;
auto executionNsRoutingInfoStatus =
- getExecutionNsRoutingInfo(opCtx, executionNss, catalogCache);
+ getExecutionNsRoutingInfo(opCtx, namespaces.executionNss, catalogCache);
LiteParsedPipeline liteParsedPipeline(request);
@@ -241,33 +677,29 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Determine the appropriate collation and 'resolve' involved namespaces to make the
// ExpressionContext.
- // We may not try to execute anything on mongos, but we still have to populate this map so that
- // any $lookups, etc. will be able to have a resolved view definition when they are parsed.
- // TODO: SERVER-32548 will add support for lookup against a sharded view, so this map needs to
- // be correct to determine whether the aggregate should be passthrough or sent to all shards.
+ // We won't try to execute anything on a mongos, but we still have to populate this map so that
+ // any $lookups, etc. will be able to have a resolved view definition. It's okay that this is
+ // incorrect, we will repopulate the real resolved namespace map on the mongod. Note that we
+ // need to check if any involved collections are sharded before forwarding an aggregation
+ // command on an unsharded collection.
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
- bool involvesShardedCollections = false;
for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =
uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ uassert(
+ 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm());
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
- if (resolvedNsRoutingInfo.cm()) {
- involvesShardedCollections = true;
- }
}
- // A pipeline is allowed to passthrough to the primary shard iff the following conditions are
- // met:
- //
- // 1. The namespace of the aggregate and any other involved namespaces are unsharded.
- // 2. Is allowed to be forwarded to shards.
- // 3. Does not need to run on all shards.
- // 4. Doesn't need transformation via DocumentSource::serialize().
+ // If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, does
+ // not need to run on all shards, and doesn't need transformation via
+ // DocumentSource::serialize(), then go ahead and pass it through to the owning shard
+ // unmodified.
if (!executionNsRoutingInfo.cm() &&
- !PipelineS::mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline) &&
+ !mustRunOnAllShards(namespaces.executionNss, executionNsRoutingInfo, liteParsedPipeline) &&
liteParsedPipeline.allowedToForwardFromMongos() &&
- liteParsedPipeline.allowedToPassthroughFromMongos() && !involvesShardedCollections) {
+ liteParsedPipeline.allowedToPassthroughFromMongos()) {
return aggPassthrough(opCtx,
namespaces,
executionNsRoutingInfo.primary()->getId(),
@@ -288,7 +720,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
} else {
// Unsharded collection. Get collection metadata from primary chunk.
auto collationObj = getDefaultCollationForUnshardedCollection(
- executionNsRoutingInfo.primary().get(), executionNss);
+ executionNsRoutingInfo.primary().get(), namespaces.executionNss);
if (!collationObj.isEmpty()) {
collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(collationObj));
@@ -315,19 +747,23 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
<< " is not capable of producing input",
!pipeline->getSources().front()->constraints().requiresInputDocSource);
- auto cursorResponse = PipelineS::establishMergingMongosCursor(opCtx,
- request,
- namespaces.requestedNss,
- cmdObj,
- liteParsedPipeline,
- std::move(pipeline),
- {});
+ auto cursorResponse = establishMergingMongosCursor(opCtx,
+ request,
+ namespaces.requestedNss,
+ cmdObj,
+ liteParsedPipeline,
+ std::move(pipeline),
+ {});
CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result);
return getStatusFromCommandResult(result->asTempObj());
}
- auto dispatchResults = uassertStatusOK(PipelineS::dispatchShardPipeline(
- mergeCtx, executionNss, cmdObj, request, liteParsedPipeline, std::move(pipeline)));
+ auto dispatchResults = uassertStatusOK(dispatchShardPipeline(mergeCtx,
+ namespaces.executionNss,
+ cmdObj,
+ request,
+ liteParsedPipeline,
+ std::move(pipeline)));
if (mergeCtx->explain) {
// If we reach here, we've either succeeded in running the explain or exhausted all
@@ -372,13 +808,13 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
(!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) {
// Register the new mongoS cursor, and retrieve the initial batch of results.
auto cursorResponse =
- PipelineS::establishMergingMongosCursor(opCtx,
- request,
- namespaces.requestedNss,
- dispatchResults.commandForTargetedShards,
- liteParsedPipeline,
- std::move(mergingPipeline),
- std::move(dispatchResults.remoteCursors));
+ establishMergingMongosCursor(opCtx,
+ request,
+ namespaces.requestedNss,
+ dispatchResults.commandForTargetedShards,
+ liteParsedPipeline,
+ std::move(mergingPipeline),
+ std::move(dispatchResults.remoteCursors));
// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
@@ -393,7 +829,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto mergeResponse = uassertStatusOK(
establishMergingShardCursor(opCtx,
- executionNss,
+ namespaces.executionNss,
dispatchResults.remoteCursors,
mergeCmdObj,
boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge,
@@ -465,7 +901,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- PipelineS::createCommandForTargetedShards(aggRequest, cmdObj, nullptr));
+ createCommandForTargetedShards(aggRequest, cmdObj, nullptr));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index 45662b6aa55..26f8dad4ec2 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -26,38 +26,20 @@
* then also delete it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
-
#include "mongo/platform/basic.h"
#include "mongo/s/commands/pipeline_s.h"
-#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/query/collation/collation_spec.h"
-#include "mongo/db/query/find_common.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/task_executor_pool.h"
-#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
-#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
-#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_cursor_manager.h"
-#include "mongo/s/query/cluster_query_knobs.h"
-#include "mongo/s/query/document_source_router_adapter.h"
-#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_exec_stage.h"
-#include "mongo/s/query/router_stage_internal_cursor.h"
-#include "mongo/s/query/router_stage_merge.h"
-#include "mongo/s/query/router_stage_update_on_add_shard.h"
-#include "mongo/s/query/store_possible_cursor.h"
-#include "mongo/s/stale_exception.h"
-#include "mongo/util/fail_point.h"
-#include "mongo/util/fail_point_service.h"
-#include "mongo/util/log.h"
namespace mongo {
@@ -66,8 +48,6 @@ using std::shared_ptr;
using std::string;
using std::unique_ptr;
-MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors);
-
namespace {
/**
* Determines the single shard to which the given query will be targeted, and its associated
@@ -107,500 +87,8 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(
return swRoutingInfo;
}
-/**
- * Given a document representing an aggregation command such as
- *
- * {aggregate: "myCollection", pipeline: [], ...},
- *
- * produces the corresponding explain command:
- *
- * {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
- */
-Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
- MutableDocument explainCommandBuilder;
- explainCommandBuilder["explain"] = Value(aggregateCommand);
- // Downstream host targeting code expects queryOptions at the top level of the command object.
- explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
- Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
-
- // readConcern needs to be promoted to the top-level of the request.
- explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
- Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
-
- // Add explain command options.
- for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
- explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
- }
-
- return explainCommandBuilder.freeze();
-}
-
-std::set<ShardId> getTargetedShards(OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- const CachedCollectionRoutingInfo& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation) {
- if (PipelineS::mustRunOnAllShards(nss, routingInfo, litePipe)) {
- // The pipeline begins with a stage which must be run on all shards.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
- return {shardIds.begin(), shardIds.end()};
- }
-
- if (routingInfo.cm()) {
- // The collection is sharded. Use the routing table to decide which shards to target
- // based on the query and collation.
- std::set<ShardId> shardIds;
- routingInfo.cm()->getShardIdsForQuery(opCtx, shardQuery, collation, &shardIds);
- return shardIds;
- }
-
- // The collection is unsharded. Target only the primary shard for the database.
- return {routingInfo.primaryId()};
-}
-
-StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
- LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
-
- std::set<ShardId> shardIds =
- getTargetedShards(opCtx, nss, litePipe, *routingInfo, shardQuery, collation);
- std::vector<std::pair<ShardId, BSONObj>> requests;
-
- if (PipelineS::mustRunOnAllShards(nss, *routingInfo, litePipe)) {
- // The pipeline contains a stage which must be run on all shards. Skip versioning and
- // enqueue the raw command objects.
- for (auto&& shardId : shardIds) {
- requests.emplace_back(std::move(shardId), cmdObj);
- }
- } else if (routingInfo->cm()) {
- // The collection is sharded. Use the routing table to decide which shards to target
- // based on the query and collation, and build versioned requests for them.
- for (auto& shardId : shardIds) {
- auto versionedCmdObj =
- appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
- requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
- }
- } else {
- // The collection is unsharded. Target only the primary shard for the database.
- // Don't append shard version info when contacting the config servers.
- requests.emplace_back(routingInfo->primaryId(),
- !routingInfo->primary()->isConfig()
- ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
- : cmdObj);
- }
-
- if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
- "until fail point is disabled.";
- while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- sleepsecs(1);
- }
- }
-
- // If we reach this point, we're either trying to establish cursors on a sharded execution
- // namespace, or handling the case where a sharded collection was dropped and recreated as
- // unsharded. Since views cannot be sharded, and because we will return an error rather than
- // attempting to continue in the event that a recreated namespace is a view, we set
- // viewDefinitionOut to nullptr.
- BSONObj* viewDefinitionOut = nullptr;
- auto swCursors = establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- readPref,
- requests,
- false /* do not allow partial results */,
- viewDefinitionOut /* can't receive view definition */);
-
- // If any shard returned a stale shardVersion error, invalidate the routing table cache.
- // This will cause the cache to be refreshed the next time it is accessed.
- if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
- Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*routingInfo));
- }
-
- return swCursors;
-}
-
} // namespace
-bool PipelineS::mustRunOnAllShards(const NamespaceString& nss,
- const CachedCollectionRoutingInfo& routingInfo,
- const LiteParsedPipeline& litePipe) {
- // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection
- // must run on all shards.
- const bool nsIsSharded = static_cast<bool>(routingInfo.cm());
- return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream());
-}
-
-BSONObj PipelineS::createCommandForTargetedShards(
- const AggregationRequest& request,
- BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) {
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
-
- // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough.
- if (pipelineForTargetedShards) {
- targetedCmd[AggregationRequest::kPipelineName] =
- Value(pipelineForTargetedShards->serialize());
-
- if (pipelineForTargetedShards->isSplitForShards()) {
- targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
- targetedCmd[AggregationRequest::kCursorName] =
- Value(DOC(AggregationRequest::kBatchSizeName << 0));
- }
- }
-
- // If this pipeline is not split, ensure that the write concern is propagated if present.
- if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) {
- targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
- }
-
- // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
- // explain command.
- if (auto explainVerbosity = request.getExplain()) {
- targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity));
- }
-
- return targetedCmd.freeze().toBson();
-}
-
-BSONObj PipelineS::establishMergingMongosCursor(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const NamespaceString& requestedNss,
- BSONObj cmdToRunOnNewShards,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
-
- ClusterClientCursorParams params(
- requestedNss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- ReadPreferenceSetting::get(opCtx));
-
- params.tailableMode = pipelineForMerging->getContext()->tailableMode;
- params.mergePipeline = std::move(pipelineForMerging);
- params.remotes = std::move(cursors);
-
- // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
- // size we pass here is used for getMores, so do not specify a batch size if the initial request
- // had a batch size of 0.
- params.batchSize = request.getBatchSize() == 0
- ? boost::none
- : boost::optional<long long>(request.getBatchSize());
-
- if (liteParsedPipeline.hasChangeStream()) {
- // For change streams, we need to set up a custom stage to establish cursors on new shards
- // when they are added.
- params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- return stdx::make_unique<RouterStageUpdateOnAddShard>(
- opCtx, executor, params, cmdToRunOnNewShards);
- };
- }
- auto ccc = ClusterClientCursorImpl::make(
- opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
-
- auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
- BSONObjBuilder cursorResponse;
-
- CursorResponseBuilder responseBuilder(true, &cursorResponse);
-
- for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
- ClusterQueryResult next;
- try {
- next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind));
- } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
- // This exception is thrown when a $changeStream stage encounters an event
- // that invalidates the cursor. We should close the cursor and return without
- // error.
- cursorState = ClusterCursorManager::CursorState::Exhausted;
- break;
- }
-
- // Check whether we have exhausted the pipeline's results.
- if (next.isEOF()) {
- // We reached end-of-stream. If the cursor is not tailable, then we mark it as
- // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when
- // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no
- // hope of returning data and thus we need to close the mongos cursor as well.
- if (!ccc->isTailable() || ccc->remotesExhausted()) {
- cursorState = ClusterCursorManager::CursorState::Exhausted;
- }
- break;
- }
-
- // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor
- // to be returned on the next getMore.
- auto nextObj = *next.getResult();
-
- if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
- ccc->queueResult(nextObj);
- break;
- }
-
- responseBuilder.append(nextObj);
- }
-
- ccc->detachFromOperationContext();
-
- CursorId clusterCursorId = 0;
-
- if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
- clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
- opCtx,
- ccc.releaseCursor(),
- requestedNss,
- ClusterCursorManager::CursorType::MultiTarget,
- ClusterCursorManager::CursorLifetime::Mortal));
- }
-
- responseBuilder.done(clusterCursorId, requestedNss.ns());
-
- CommandHelpers::appendCommandStatus(cursorResponse, Status::OK());
-
- return cursorResponse.obj();
-}
-
-/**
- * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
- * the pipeline that will need to be executed to merge the results from the remotes. If a stale
- * shard version is encountered, refreshes the routing table and tries again.
- */
-StatusWith<PipelineS::DispatchShardPipelineResults> PipelineS::dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- BSONObj originalCmdObj,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
- // The process is as follows:
- // - First, determine whether we need to target more than one shard. If so, we split the
- // pipeline; if not, we retain the existing pipeline.
- // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
- // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
- // the refreshed routing table data.
- // - If the pipeline is not split and we now need to target multiple shards, split it. If the
- // pipeline is already split and we now only need to target a single shard, reassemble the
- // original pipeline.
- // - After exhausting 10 attempts to establish the cursors, we give up and throw.
- auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
- auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>();
- auto opCtx = expCtx->opCtx;
-
- const bool needsPrimaryShardMerge =
- (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
-
- const bool needsMongosMerge = pipeline->needsMongosMerger();
-
- const auto shardQuery = pipeline->getInitialQuery();
-
- auto pipelineForTargetedShards = std::move(pipeline);
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
- BSONObj targetedCommand;
-
- int numAttempts = 0;
-
- do {
- // We need to grab a new routing table at the start of each iteration, since a stale config
- // exception will invalidate the previous one.
- auto executionNsRoutingInfo = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss));
-
- // Determine whether we can run the entire aggregation on a single shard.
- std::set<ShardId> shardIds = getTargetedShards(opCtx,
- executionNss,
- liteParsedPipeline,
- executionNsRoutingInfo,
- shardQuery,
- aggRequest.getCollation());
-
- uassert(ErrorCodes::ShardNotFound,
- "No targets were found for this aggregation. All shards were removed from the "
- "cluster mid-operation",
- shardIds.size() > 0);
-
- // Don't need to split the pipeline if we are only targeting a single shard, unless:
- // - There is a stage that needs to be run on the primary shard and the single target shard
- // is not the primary.
- // - The pipeline contains one or more stages which must always merge on mongoS.
- const bool needsSplit =
- (shardIds.size() > 1u || needsMongosMerge ||
- (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId()));
-
- const bool isSplit = pipelineForTargetedShards->isSplitForShards();
-
- // If we have to run on multiple shards and the pipeline is not yet split, split it. If we
- // can run on a single shard and the pipeline is already split, reassemble it.
- if (needsSplit && !isSplit) {
- pipelineForMerging = std::move(pipelineForTargetedShards);
- pipelineForTargetedShards = pipelineForMerging->splitForSharded();
- } else if (!needsSplit && isSplit) {
- pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
- }
-
- // Generate the command object for the targeted shards.
- targetedCommand = PipelineS::createCommandForTargetedShards(
- aggRequest, originalCmdObj, pipelineForTargetedShards);
-
- // Refresh the shard registry if we're targeting all shards. We need the shard registry
- // to be at least as current as the logical time used when creating the command for
- // $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
- shardRegistry->reload(opCtx);
- }
- }
-
- // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
- if (expCtx->explain) {
- if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
- // Some stages (such as $currentOp) need to be broadcast to all shards, and should
- // not participate in the shard version protocol.
- swShardResults =
- scatterGatherUnversionedTargetAllShards(opCtx,
- executionNss.db().toString(),
- executionNss,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- } else {
- // Aggregations on a real namespace should use the routing table to target shards,
- // and should participate in the shard version protocol.
- swShardResults =
- scatterGatherVersionedTargetByRoutingTable(opCtx,
- executionNss.db().toString(),
- executionNss,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- shardQuery,
- aggRequest.getCollation(),
- nullptr /* viewDefinition */);
- }
- } else {
- swCursors = establishShardCursors(opCtx,
- executionNss,
- liteParsedPipeline,
- &executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- shardQuery,
- aggRequest.getCollation());
-
- if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
- LOG(1) << "got stale shardVersion error " << swCursors.getStatus()
- << " while dispatching " << redact(targetedCommand) << " after "
- << (numAttempts + 1) << " dispatch attempts";
- }
- }
- } while (++numAttempts < kMaxNumStaleVersionRetries &&
- (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK()));
-
- if (!swShardResults.isOK()) {
- return swShardResults.getStatus();
- }
- if (!swCursors.isOK()) {
- return swCursors.getStatus();
- }
- return DispatchShardPipelineResults{needsPrimaryShardMerge,
- std::move(swCursors.getValue()),
- std::move(swShardResults.getValue()),
- std::move(pipelineForTargetedShards),
- std::move(pipelineForMerging),
- targetedCommand};
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineS::MongoSInterface::makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) {
- // Explain is not supported for auxiliary lookups.
- invariant(!expCtx->explain);
-
- // Temporarily remove any deadline from this operation, we don't want to timeout while doing
- // this lookup.
- OperationContext::DeadlineStash deadlineStash(expCtx->opCtx);
-
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
- if (pipelineOptions.optimize) {
- pipeline.getValue()->optimizePipeline();
- }
- if (pipelineOptions.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
- }
-
- return pipeline;
-}
-
-StatusWith<unique_ptr<Pipeline, PipelineDeleter>>
-PipelineS::MongoSInterface::attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceRouterAdapter*>(pipeline->getSources().front().get()));
-
- // Generate the command object for the targeted shards.
- auto serialization = pipeline->serialize();
- std::vector<BSONObj> rawStages;
- rawStages.reserve(serialization.size());
- std::transform(serialization.begin(),
- serialization.end(),
- std::back_inserter(rawStages),
- [](const Value& stageObj) {
- invariant(stageObj.getType() == BSONType::Object);
- return stageObj.getDocument().toBson();
- });
- AggregationRequest aggRequest(expCtx->ns, rawStages);
- LiteParsedPipeline liteParsedPipeline(aggRequest);
- auto dispatchStatus = PipelineS::dispatchShardPipeline(
- expCtx,
- expCtx->ns,
- aggRequest.serializeToCommandObj().toBson(),
- aggRequest,
- liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)));
-
- if (!dispatchStatus.isOK()) {
- return dispatchStatus.getStatus();
- }
- auto targetingResults = std::move(dispatchStatus.getValue());
-
- auto params = stdx::make_unique<ClusterClientCursorParams>(
- expCtx->ns,
- AuthorizationSession::get(expCtx->opCtx->getClient())->getAuthenticatedUserNames(),
- ReadPreferenceSetting::get(expCtx->opCtx));
- params->remotes = std::move(targetingResults.remoteCursors);
- params->mergePipeline = std::move(targetingResults.pipelineForMerging);
-
- // We will transfer ownership of the params to the RouterStageInternalCursor, but need a
- // reference to them to construct the RouterStageMerge.
- auto* unownedParams = params.get();
- auto root = ClusterClientCursorImpl::buildMergerPlan(
- expCtx->opCtx,
- Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
- unownedParams);
- auto routerExecutionTree = stdx::make_unique<RouterStageInternalCursor>(
- expCtx->opCtx, std::move(params), std::move(root));
-
- return Pipeline::create(
- {DocumentSourceRouterAdapter::create(expCtx, std::move(routerExecutionTree))}, expCtx);
-}
-
boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
@@ -704,16 +192,4 @@ std::vector<GenericCursor> PipelineS::MongoSInterface::getCursors(
return cursorManager->getAllCursors();
}
-bool PipelineS::MongoSInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- const auto catalogCache = Grid::get(opCtx)->catalogCache();
-
- auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss);
-
- if (!routingInfoStatus.isOK()) {
- // db doesn't exist.
- return false;
- }
- return static_cast<bool>(routingInfoStatus.getValue().cm());
-}
-
} // namespace mongo
diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h
index 968e28f36aa..cdf72158e31 100644
--- a/src/mongo/s/commands/pipeline_s.h
+++ b/src/mongo/s/commands/pipeline_s.h
@@ -28,12 +28,8 @@
#pragma once
-#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/mongo_process_interface.h"
#include "mongo/db/pipeline/pipeline.h"
-#include "mongo/s/async_requests_sender.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
namespace mongo {
/**
@@ -59,7 +55,9 @@ public:
MONGO_UNREACHABLE;
}
- bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
+ MONGO_UNREACHABLE;
+ }
BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
@@ -105,8 +103,10 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
+ MONGO_UNREACHABLE;
+ }
std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
CurrentOpConnectionsMode connMode,
@@ -128,7 +128,9 @@ public:
StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) final;
+ const MakePipelineOptions pipelineOptions) final {
+ MONGO_UNREACHABLE;
+ }
boost::optional<Document> lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -141,61 +143,6 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
};
- struct DispatchShardPipelineResults {
- // True if this pipeline was split, and the second half of the pipeline needs to be run on
- // the
- // primary shard for the database.
- bool needsPrimaryShardMerge;
-
- // Populated if this *is not* an explain, this vector represents the cursors on the remote
- // shards.
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
-
- // Populated if this *is* an explain, this vector represents the results from each shard.
- std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
-
- // The half of the pipeline that was sent to each shard, or the entire pipeline if there was
- // only one shard targeted.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards;
-
- // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
-
- // The command object to send to the targeted shards.
- BSONObj commandForTargetedShards;
- };
-
- static BSONObj createCommandForTargetedShards(
- const AggregationRequest&,
- const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards);
-
- static BSONObj establishMergingMongosCursor(
- OperationContext*,
- const AggregationRequest&,
- const NamespaceString& requestedNss,
- BSONObj cmdToRunOnNewShards,
- const LiteParsedPipeline&,
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- std::vector<ClusterClientCursorParams::RemoteCursor>);
-
- /**
- * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
- * the pipeline that will need to be executed to merge the results from the remotes. If a stale
- * shard version is encountered, refreshes the routing table and tries again.
- */
- static StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>&,
- const NamespaceString& executionNss,
- const BSONObj originalCmdObj,
- const AggregationRequest&,
- const LiteParsedPipeline&,
- std::unique_ptr<Pipeline, PipelineDeleter>);
-
- static bool mustRunOnAllShards(const NamespaceString& nss,
- const CachedCollectionRoutingInfo& routingInfo,
- const LiteParsedPipeline& litePipe);
-
private:
PipelineS() = delete; // Should never be instantiated.
};
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index e11174c062b..67e334a7f47 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -115,13 +115,6 @@ public:
boost::optional<ReadPreferenceSetting> getReadPreference() const final;
- /**
- * Constructs the pipeline of MergerPlanStages which will be used to answer the query.
- */
- static std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params);
-
public:
/** private for tests */
/**
@@ -140,6 +133,13 @@ public:
boost::optional<LogicalSessionId> lsid);
private:
+ /**
+ * Constructs the pipeline of MergerPlanStages which will be used to answer the query.
+ */
+ std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params);
+
ClusterClientCursorParams _params;
// Number of documents already returned by next().
diff --git a/src/mongo/s/query/router_stage_internal_cursor.h b/src/mongo/s/query/router_stage_internal_cursor.h
deleted file mode 100644
index 95ca8831648..00000000000
--- a/src/mongo/s/query/router_stage_internal_cursor.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/s/query/router_exec_stage.h"
-
-namespace mongo {
-
-/**
- * This is a special type of RouterExecStage that is used to iterate remote cursors that were
- * created internally and do not represent a client cursor, such as those used in a $lookup.
- *
- * The purpose of this class is to provide ownership over a ClusterClientCursorParams struct without
- * creating a ClusterClientCursor, which would show up in the server stats for this mongos.
- */
-class RouterStageInternalCursor final : public RouterExecStage {
-public:
- RouterStageInternalCursor(OperationContext* opCtx,
- std::unique_ptr<ClusterClientCursorParams>&& params,
- std::unique_ptr<RouterExecStage> child)
- : RouterExecStage(opCtx, std::move(child)), _params(std::move(params)) {}
-
- StatusWith<ClusterQueryResult> next(ExecContext execContext) {
- return getChildStage()->next(execContext);
- }
-
-private:
- std::unique_ptr<ClusterClientCursorParams> _params;
-};
-} // namespace mongo