diff options
author | Sara Golemon <sara.golemon@mongodb.com> | 2017-09-11 16:18:13 -0400 |
---|---|---|
committer | Sara Golemon <sara.golemon@mongodb.com> | 2017-09-15 15:40:14 -0400 |
commit | 7b7de197a787073ecf47f89d1f9976758f24390a (patch) | |
tree | a9faefc16ddec51fdf3b84583db187e498ef277a /src | |
parent | 33908bf59707e36b43ac58b50da4bde3e9cbae84 (diff) | |
download | mongo-7b7de197a787073ecf47f89d1f9976758f24390a.tar.gz |
SERVER-30166 Support pipelines running on mongos only
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_list_local_sessions.h | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 6 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 22 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_pipeline.cpp | 20 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_pipeline.h | 1 |
7 files changed, 49 insertions, 12 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index b0793113863..5c10259ce4a 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -156,6 +156,11 @@ public: // must also override getModifiedPaths() to provide information about which particular // $match predicates be swapped before itself. bool canSwapWithMatch = false; + + // Extends HostTypeRequirement::kAnyShardOrMongos to indicate that a document source + // must run on whatever node initially received the pipeline. + // This can be a mongod directly, but mongos must not forward to a mongod. + bool allowedToForwardFromMongos = true; }; using HostTypeRequirement = StageConstraints::HostTypeRequirement; diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 8c386f0ecaa..23e7ce97592 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -100,6 +100,7 @@ public: constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; constraints.isIndependentOfAnyCollection = true; + constraints.allowedToForwardFromMongos = false; return constraints; } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index c5edf02f50d..d4a00168e3d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -439,6 +439,12 @@ bool Pipeline::canRunOnMongos() const { }); } +bool Pipeline::allowedToForwardFromMongos() const { + return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) { + return stage->constraints().allowedToForwardFromMongos; + }); +} + std::vector<NamespaceString> Pipeline::getInvolvedCollections() const { std::vector<NamespaceString> collections; for (auto&& source : _sources) { diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 7300b8e542e..5da1a018289 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -224,6 +224,12 @@ public: bool canRunOnMongos() const; /** + * Returns whether or not every DocumentSource in the pipeline is allowed to forward from a + * mongos. + */ + bool allowedToForwardFromMongos() const; + + /** * Modifies the pipeline, optimizing it by combining and swapping stages. */ void optimizePipeline(); diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 4c050b812ac..ff39cb914ef 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -419,11 +419,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; LiteParsedPipeline liteParsedPipeline(request); - // TODO SERVER-29141 support forcing pipeline to run on Mongos. - uassert(40567, - "Unable to force mongos-only stage to run on mongos", - liteParsedPipeline.allowedToForwardFromMongos()); - for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) { const auto resolvedNsRoutingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); @@ -432,8 +427,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); } - // If this aggregation is on an unsharded collection, pass through to the primary shard. + // If this pipeline is on an unsharded collection, + // is allowed to be forwarded on to shards, + // and doesn't need transformation via DocumentSoruce::serialize(), + // then go ahead and pass it through to the owning shard unmodified. if (!executionNsRoutingInfo.cm() && !namespaces.executionNss.isCollectionlessAggregateNS() && + liteParsedPipeline.allowedToForwardFromMongos() && liteParsedPipeline.allowedToPassthroughFromMongos()) { return aggPassthrough(opCtx, namespaces, @@ -466,6 +465,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx)); pipeline->optimizePipeline(); + if (!liteParsedPipeline.allowedToForwardFromMongos()) { + // Pipeline must be run locally. + uassert(40567, + "Aggregation pipeline contains both mongos-only and non-mongos stages", + pipeline->canRunOnMongos()); + auto cursorResponse = establishMergingMongosCursor( + opCtx, request, namespaces.requestedNss, std::move(pipeline), {}); + Command::filterCommandReplyForPassthrough(cursorResponse, result); + return getStatusFromCommandResult(result->asTempObj()); + } + // Begin shard targeting. The process is as follows: // - First, determine whether we need to target more than one shard. If so, we split the // pipeline; if not, we retain the existing pipeline. diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index d9cf02f85c3..40ef56c8058 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -31,6 +31,8 @@ #include "mongo/s/query/router_stage_pipeline.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_list_local_sessions.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/expression_context.h" @@ -91,10 +93,15 @@ private: RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child, std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline) : RouterExecStage(mergePipeline->getContext()->opCtx), - _mergePipeline(std::move(mergePipeline)) { - // Add an adapter to the front of the pipeline to draw results from 'child'. - _mergePipeline->addInitialSource( - DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child))); + _mergePipeline(std::move(mergePipeline)), + _mongosOnly(!_mergePipeline->allowedToForwardFromMongos()) { + if (_mongosOnly) { + invariant(_mergePipeline->canRunOnMongos()); + } else { + // Add an adapter to the front of the pipeline to draw results from 'child'. + _mergePipeline->addInitialSource( + DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child))); + } } StatusWith<ClusterQueryResult> RouterStagePipeline::next() { @@ -124,8 +131,9 @@ void RouterStagePipeline::kill(OperationContext* opCtx) { } bool RouterStagePipeline::remotesExhausted() { - return static_cast<DocumentSourceRouterAdapter*>(_mergePipeline->getSources().front().get()) - ->remotesExhausted(); + return _mongosOnly || + static_cast<DocumentSourceRouterAdapter*>(_mergePipeline->getSources().front().get()) + ->remotesExhausted(); } Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index 780f1fe0e47..d47e91e0aa8 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -58,6 +58,7 @@ protected: private: std::unique_ptr<Pipeline, Pipeline::Deleter> _mergePipeline; + bool _mongosOnly; }; } // namespace mongo |