From 4eabf1ea6225f444b3b0b3b2fee785aaa306212f Mon Sep 17 00:00:00 2001 From: Nick Zolnierz Date: Tue, 16 Jan 2018 12:42:27 -0500 Subject: Revert "SERVER-32308: Add the ability for a $lookup stage to execute on mongos against a sharded foreign collection" This reverts commit 7298d273c0497f2720ec1471ad0f4910bff07af4. --- .../document_source_check_resume_token_test.cpp | 10 +++--- .../pipeline/document_source_graph_lookup_test.cpp | 8 ++--- src/mongo/db/pipeline/document_source_lookup.cpp | 37 ++-------------------- src/mongo/db/pipeline/document_source_lookup.h | 20 +++++++++++- ...cument_source_lookup_change_post_image_test.cpp | 8 ++--- .../db/pipeline/document_source_lookup_test.cpp | 8 ++--- src/mongo/db/pipeline/mongo_process_interface.h | 10 ++---- src/mongo/db/pipeline/pipeline_d.cpp | 15 +++++---- src/mongo/db/pipeline/pipeline_d.h | 4 +-- .../db/pipeline/stub_mongo_process_interface.h | 4 +-- 10 files changed, 53 insertions(+), 71 deletions(-) (limited to 'src/mongo/db/pipeline') diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index d975895dc4d..cf1267eeb03 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -218,7 +218,7 @@ public: MockMongoInterface(deque mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { return false; } @@ -236,16 +236,16 @@ public: } if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); } return pipeline; } - StatusWith> attachCursorSourceToPipeline( - const boost::intrusive_ptr& expCtx, Pipeline* pipeline) final { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr& expCtx, + Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return std::unique_ptr(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 6db48d43850..05a62606bb6 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -75,16 +75,16 @@ public: } if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); } return pipeline; } - StatusWith> attachCursorSourceToPipeline( - const boost::intrusive_ptr& expCtx, Pipeline* pipeline) override { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr& expCtx, + Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_results)); - return std::unique_ptr(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } private: diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index ecc9d572bfe..e4fd70920f6 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -283,8 +283,8 @@ std::unique_ptr DocumentSourceLookUp::buildPipeline( if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. - pipeline = uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( - _fromExpCtx, pipeline.release())); + uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( + _fromExpCtx, pipeline.get())); } // If the cache has been abandoned, release it. @@ -614,39 +614,6 @@ void DocumentSourceLookUp::initializeIntrospectionPipeline() { sources.empty() || !sources.front()->constraints().isChangeStreamStage()); } -DocumentSource::StageConstraints DocumentSourceLookUp::constraints( - Pipeline::SplitState pipeState) const { - - const bool mayUseDisk = wasConstructedWithPipelineSyntax() && - std::any_of(_parsedIntrospectionPipeline->getSources().begin(), - _parsedIntrospectionPipeline->getSources().end(), - [](const auto& source) { - return source->constraints().diskRequirement == - DiskUseRequirement::kWritesTmpData; - }); - - // If executing on mongos and the foreign collection is sharded, then this stage can run on - // mongos. - HostTypeRequirement hostReq; - if (pExpCtx->inMongos) { - hostReq = pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs) - ? HostTypeRequirement::kMongoS - : HostTypeRequirement::kPrimaryShard; - } else { - hostReq = HostTypeRequirement::kPrimaryShard; - } - - StageConstraints constraints(StreamType::kStreaming, - PositionRequirement::kNone, - hostReq, - mayUseDisk ? DiskUseRequirement::kWritesTmpData - : DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed); - - constraints.canSwapWithMatch = true; - return constraints; -} - void DocumentSourceLookUp::serializeToArray( std::vector& array, boost::optional explain) const { Document doc; diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 7424b2ef97d..530c62f985c 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -96,7 +96,25 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints(Pipeline::SplitState pipeState) const final; + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + const bool mayUseDisk = wasConstructedWithPipelineSyntax() && + std::any_of(_parsedIntrospectionPipeline->getSources().begin(), + _parsedIntrospectionPipeline->getSources().end(), + [](const auto& source) { + return source->constraints().diskRequirement == + DiskUseRequirement::kWritesTmpData; + }); + + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + mayUseDisk ? DiskUseRequirement::kWritesTmpData + : DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + + constraints.canSwapWithMatch = true; + return constraints; + } GetDepsReturn getDependencies(DepsTracker* deps) const final; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 77feab2f825..175806d5f7e 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -101,16 +101,16 @@ public: } if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); } return pipeline; } - StatusWith> attachCursorSourceToPipeline( - const boost::intrusive_ptr& expCtx, Pipeline* pipeline) final { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr& expCtx, + Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return std::unique_ptr(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } boost::optional lookupSingleDocument( diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index b77c549a855..dfe4ece48ac 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -511,14 +511,14 @@ public: } if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); } return pipeline; } - StatusWith> attachCursorSourceToPipeline( - const boost::intrusive_ptr& expCtx, Pipeline* pipeline) final { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr& expCtx, + Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { if (pipeline->popFrontWithCriteria("$match") || pipeline->popFrontWithCriteria("$sort") || @@ -529,7 +529,7 @@ public: } pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return std::unique_ptr(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } private: diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 7d78c880d1b..d32e8276c63 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -156,16 +156,12 @@ public: const MakePipelineOptions opts = MakePipelineOptions{}) = 0; /** - * Accepts a pipeline and returns a new one which will draw input from the underlying - * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be + * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This + * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be * the only case where NamespaceNotFound is returned. - * - * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr. - * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the - * compiler expects to find an implementation of PipelineDeleter. */ - virtual StatusWith> attachCursorSourceToPipeline( + virtual Status attachCursorSourceToPipeline( const boost::intrusive_ptr& expCtx, Pipeline* pipeline) = 0; /** diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 16d6d9e325c..7ce395f3716 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -585,8 +585,8 @@ DBClientBase* PipelineD::MongoDInterface::directClient() { } bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - AutoGetCollectionForRead autoColl(opCtx, nss); - // TODO SERVER-32198: Use CollectionShardingState::collectionIsSharded() to confirm sharding + AutoGetCollectionForReadCommand autoColl(opCtx, nss); + // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding // state. auto css = CollectionShardingState::get(opCtx, nss); return bool(css->getMetadata()); @@ -689,15 +689,16 @@ StatusWith> PipelineD::MongoDInterfac pipeline.getValue()->optimizePipeline(); } + Status cursorStatus = Status::OK(); + if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); } - return pipeline; + return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; } -StatusWith> -PipelineD::MongoDInterface::attachCursorSourceToPipeline( +Status PipelineD::MongoDInterface::attachCursorSourceToPipeline( const boost::intrusive_ptr& expCtx, Pipeline* pipeline) { invariant(pipeline->getSources().empty() || !dynamic_cast(pipeline->getSources().front().get())); @@ -728,7 +729,7 @@ PipelineD::MongoDInterface::attachCursorSourceToPipeline( PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); - return std::unique_ptr(pipeline, PipelineDeleter(expCtx->opCtx)); + return Status::OK(); } std::vector PipelineD::MongoDInterface::getCurrentOps( diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index f626b0f904b..afbb6b8f73f 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -95,8 +95,8 @@ public: const std::vector& rawPipeline, const boost::intrusive_ptr& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final; - StatusWith> attachCursorSourceToPipeline( - const boost::intrusive_ptr& expCtx, Pipeline* pipeline) final; + Status attachCursorSourceToPipeline(const boost::intrusive_ptr& expCtx, + Pipeline* pipeline) final; std::vector getCurrentOps(OperationContext* opCtx, CurrentOpConnectionsMode connMode, CurrentOpUserMode userMode, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 488cf97c58e..0ec37b15eb4 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -108,8 +108,8 @@ public: MONGO_UNREACHABLE; } - StatusWith> attachCursorSourceToPipeline( - const boost::intrusive_ptr& expCtx, Pipeline* pipeline) override { + Status attachCursorSourceToPipeline(const boost::intrusive_ptr& expCtx, + Pipeline* pipeline) override { MONGO_UNREACHABLE; } -- cgit v1.2.1