summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSara Golemon <sara.golemon@mongodb.com>2017-09-11 16:18:13 -0400
committerSara Golemon <sara.golemon@mongodb.com>2017-09-15 15:40:14 -0400
commit7b7de197a787073ecf47f89d1f9976758f24390a (patch)
treea9faefc16ddec51fdf3b84583db187e498ef277a /src
parent33908bf59707e36b43ac58b50da4bde3e9cbae84 (diff)
downloadmongo-7b7de197a787073ecf47f89d1f9976758f24390a.tar.gz
SERVER-30166 Support pipelines running on mongos only
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/document_source.h5
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h1
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline.h6
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp22
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp20
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h1
7 files changed, 49 insertions, 12 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index b0793113863..5c10259ce4a 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -156,6 +156,11 @@ public:
// must also override getModifiedPaths() to provide information about which particular
// $match predicates be swapped before itself.
bool canSwapWithMatch = false;
+
+ // Extends HostTypeRequirement::kAnyShardOrMongos to indicate that a document source
+ // must run on whatever node initially received the pipeline.
+ // This can be a mongod directly, but mongos must not forward to a mongod.
+ bool allowedToForwardFromMongos = true;
};
using HostTypeRequirement = StageConstraints::HostTypeRequirement;
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index 8c386f0ecaa..23e7ce97592 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -100,6 +100,7 @@ public:
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
constraints.isIndependentOfAnyCollection = true;
+ constraints.allowedToForwardFromMongos = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index c5edf02f50d..d4a00168e3d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -439,6 +439,12 @@ bool Pipeline::canRunOnMongos() const {
});
}
+bool Pipeline::allowedToForwardFromMongos() const {
+ return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) {
+ return stage->constraints().allowedToForwardFromMongos;
+ });
+}
+
std::vector<NamespaceString> Pipeline::getInvolvedCollections() const {
std::vector<NamespaceString> collections;
for (auto&& source : _sources) {
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 7300b8e542e..5da1a018289 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -224,6 +224,12 @@ public:
bool canRunOnMongos() const;
/**
+ * Returns whether or not every DocumentSource in the pipeline is allowed to forward from a
+ * mongos.
+ */
+ bool allowedToForwardFromMongos() const;
+
+ /**
* Modifies the pipeline, optimizing it by combining and swapping stages.
*/
void optimizePipeline();
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 4c050b812ac..ff39cb914ef 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -419,11 +419,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
LiteParsedPipeline liteParsedPipeline(request);
- // TODO SERVER-29141 support forcing pipeline to run on Mongos.
- uassert(40567,
- "Unable to force mongos-only stage to run on mongos",
- liteParsedPipeline.allowedToForwardFromMongos());
-
for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =
uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
@@ -432,8 +427,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
}
- // If this aggregation is on an unsharded collection, pass through to the primary shard.
+ // If this pipeline is on an unsharded collection,
+ // is allowed to be forwarded on to shards,
+ // and doesn't need transformation via DocumentSoruce::serialize(),
+ // then go ahead and pass it through to the owning shard unmodified.
if (!executionNsRoutingInfo.cm() && !namespaces.executionNss.isCollectionlessAggregateNS() &&
+ liteParsedPipeline.allowedToForwardFromMongos() &&
liteParsedPipeline.allowedToPassthroughFromMongos()) {
return aggPassthrough(opCtx,
namespaces,
@@ -466,6 +465,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx));
pipeline->optimizePipeline();
+ if (!liteParsedPipeline.allowedToForwardFromMongos()) {
+ // Pipeline must be run locally.
+ uassert(40567,
+ "Aggregation pipeline contains both mongos-only and non-mongos stages",
+ pipeline->canRunOnMongos());
+ auto cursorResponse = establishMergingMongosCursor(
+ opCtx, request, namespaces.requestedNss, std::move(pipeline), {});
+ Command::filterCommandReplyForPassthrough(cursorResponse, result);
+ return getStatusFromCommandResult(result->asTempObj());
+ }
+
// Begin shard targeting. The process is as follows:
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index d9cf02f85c3..40ef56c8058 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -31,6 +31,8 @@
#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_list_local_sessions.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -91,10 +93,15 @@ private:
RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline)
: RouterExecStage(mergePipeline->getContext()->opCtx),
- _mergePipeline(std::move(mergePipeline)) {
- // Add an adapter to the front of the pipeline to draw results from 'child'.
- _mergePipeline->addInitialSource(
- DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)));
+ _mergePipeline(std::move(mergePipeline)),
+ _mongosOnly(!_mergePipeline->allowedToForwardFromMongos()) {
+ if (_mongosOnly) {
+ invariant(_mergePipeline->canRunOnMongos());
+ } else {
+ // Add an adapter to the front of the pipeline to draw results from 'child'.
+ _mergePipeline->addInitialSource(
+ DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)));
+ }
}
StatusWith<ClusterQueryResult> RouterStagePipeline::next() {
@@ -124,8 +131,9 @@ void RouterStagePipeline::kill(OperationContext* opCtx) {
}
bool RouterStagePipeline::remotesExhausted() {
- return static_cast<DocumentSourceRouterAdapter*>(_mergePipeline->getSources().front().get())
- ->remotesExhausted();
+ return _mongosOnly ||
+ static_cast<DocumentSourceRouterAdapter*>(_mergePipeline->getSources().front().get())
+ ->remotesExhausted();
}
Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index 780f1fe0e47..d47e91e0aa8 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -58,6 +58,7 @@ protected:
private:
std::unique_ptr<Pipeline, Pipeline::Deleter> _mergePipeline;
+ bool _mongosOnly;
};
} // namespace mongo