summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-01-16 12:42:27 -0500
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-01-16 14:16:34 -0500
commit4eabf1ea6225f444b3b0b3b2fee785aaa306212f (patch)
tree53fc00f7e31089dcb3ffb4c16f770b0a5468c3b9 /src/mongo/db/pipeline
parent2f788aa745ca1366704b821087225af49ce3285a (diff)
downloadmongo-4eabf1ea6225f444b3b0b3b2fee785aaa306212f.tar.gz
Revert "SERVER-32308: Add the ability for a $lookup stage to execute on mongos against a sharded foreign collection"
This reverts commit 7298d273c0497f2720ec1471ad0f4910bff07af4.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp37
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h20
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp8
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h10
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp15
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h4
10 files changed, 53 insertions, 71 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index d975895dc4d..cf1267eeb03 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -218,7 +218,7 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
return false;
}
@@ -236,16 +236,16 @@ public:
}
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
}
return pipeline;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
private:
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 6db48d43850..05a62606bb6 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -75,16 +75,16 @@ public:
}
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
}
return pipeline;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_results));
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
private:
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index ecc9d572bfe..e4fd70920f6 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -283,8 +283,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
- pipeline = uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- _fromExpCtx, pipeline.release()));
+ uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ _fromExpCtx, pipeline.get()));
}
// If the cache has been abandoned, release it.
@@ -614,39 +614,6 @@ void DocumentSourceLookUp::initializeIntrospectionPipeline() {
sources.empty() || !sources.front()->constraints().isChangeStreamStage());
}
-DocumentSource::StageConstraints DocumentSourceLookUp::constraints(
- Pipeline::SplitState pipeState) const {
-
- const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
- std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
- _parsedIntrospectionPipeline->getSources().end(),
- [](const auto& source) {
- return source->constraints().diskRequirement ==
- DiskUseRequirement::kWritesTmpData;
- });
-
- // If executing on mongos and the foreign collection is sharded, then this stage can run on
- // mongos.
- HostTypeRequirement hostReq;
- if (pExpCtx->inMongos) {
- hostReq = pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)
- ? HostTypeRequirement::kMongoS
- : HostTypeRequirement::kPrimaryShard;
- } else {
- hostReq = HostTypeRequirement::kPrimaryShard;
- }
-
- StageConstraints constraints(StreamType::kStreaming,
- PositionRequirement::kNone,
- hostReq,
- mayUseDisk ? DiskUseRequirement::kWritesTmpData
- : DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed);
-
- constraints.canSwapWithMatch = true;
- return constraints;
-}
-
void DocumentSourceLookUp::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
Document doc;
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 7424b2ef97d..530c62f985c 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -96,7 +96,25 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- StageConstraints constraints(Pipeline::SplitState pipeState) const final;
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
+ std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
+ _parsedIntrospectionPipeline->getSources().end(),
+ [](const auto& source) {
+ return source->constraints().diskRequirement ==
+ DiskUseRequirement::kWritesTmpData;
+ });
+
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ mayUseDisk ? DiskUseRequirement::kWritesTmpData
+ : DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
+ constraints.canSwapWithMatch = true;
+ return constraints;
+ }
GetDepsReturn getDependencies(DepsTracker* deps) const final;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 77feab2f825..175806d5f7e 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -101,16 +101,16 @@ public:
}
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
}
return pipeline;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
boost::optional<Document> lookupSingleDocument(
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index b77c549a855..dfe4ece48ac 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -511,14 +511,14 @@ public:
}
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
}
return pipeline;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
if (pipeline->popFrontWithCriteria("$match") ||
pipeline->popFrontWithCriteria("$sort") ||
@@ -529,7 +529,7 @@ public:
}
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
private:
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 7d78c880d1b..d32e8276c63 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -156,16 +156,12 @@ public:
const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
/**
- * Accepts a pipeline and returns a new one which will draw input from the underlying
- * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be
+ * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This
+ * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be
* returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
* the only case where NamespaceNotFound is returned.
- *
- * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr.
- * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the
- * compiler expects to find an implementation of PipelineDeleter.
*/
- virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ virtual Status attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
/**
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 16d6d9e325c..7ce395f3716 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -585,8 +585,8 @@ DBClientBase* PipelineD::MongoDInterface::directClient() {
}
bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- AutoGetCollectionForRead autoColl(opCtx, nss);
- // TODO SERVER-32198: Use CollectionShardingState::collectionIsSharded() to confirm sharding
+ AutoGetCollectionForReadCommand autoColl(opCtx, nss);
+ // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
// state.
auto css = CollectionShardingState::get(opCtx, nss);
return bool(css->getMetadata());
@@ -689,15 +689,16 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterfac
pipeline.getValue()->optimizePipeline();
}
+ Status cursorStatus = Status::OK();
+
if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
}
- return pipeline;
+ return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
}
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>>
-PipelineD::MongoDInterface::attachCursorSourceToPipeline(
+Status PipelineD::MongoDInterface::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
invariant(pipeline->getSources().empty() ||
!dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
@@ -728,7 +729,7 @@ PipelineD::MongoDInterface::attachCursorSourceToPipeline(
PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
- return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
+ return Status::OK();
}
std::vector<BSONObj> PipelineD::MongoDInterface::getCurrentOps(
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index f626b0f904b..afbb6b8f73f 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -95,8 +95,8 @@ public:
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final;
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final;
std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
CurrentOpConnectionsMode connMode,
CurrentOpUserMode userMode,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 488cf97c58e..0ec37b15eb4 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -108,8 +108,8 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) override {
MONGO_UNREACHABLE;
}