diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-06 06:29:33 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-10 15:11:13 +0100 |
commit | 125fa84f4a6ee5faabae5529d6a6c05816c7050b (patch) | |
tree | 30ba3ed233fd9c3cdaeec4c076d1c8b359832600 | |
parent | 1e2626463b5a7c22484c4556b77da149f4ad1ef9 (diff) | |
download | mongo-125fa84f4a6ee5faabae5529d6a6c05816c7050b.tar.gz |
SERVER-31416 Ban $changeStream from $lookup pipelines
4 files changed, 126 insertions, 5 deletions
diff --git a/jstests/change_streams/change_stream_ban_from_lookup.js b/jstests/change_streams/change_stream_ban_from_lookup.js new file mode 100644 index 00000000000..f17be0ecef1 --- /dev/null +++ b/jstests/change_streams/change_stream_ban_from_lookup.js @@ -0,0 +1,37 @@ +/** + * Test that the $changeStream stage cannot be used in a $lookup pipeline or sub-pipeline. + */ +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + + const coll = db.change_stream_ban_from_lookup; + coll.drop(); + + assert.writeOK(coll.insert({_id: 1})); + + // Verify that we cannot create a $lookup using a pipeline which begins with $changeStream. + assertErrorCode(coll, + [{$lookup: {from: coll.getName(), as: 'as', pipeline: [{$changeStream: {}}]}}], + ErrorCodes.IllegalOperation); + + // Verify that we cannot create a $lookup if its pipeline contains a sub-$lookup whose pipeline + // begins with $changeStream. + assertErrorCode( + coll, + [{ + $lookup: { + from: coll.getName(), + as: 'as', + pipeline: [ + {$match: {_id: 1}}, + { + $lookup: + {from: coll.getName(), as: 'subas', pipeline: [{$changeStream: {}}]} + } + ] + } + }], + ErrorCodes.IllegalOperation); +})(); diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 7e77efe2d80..c7c8873e22b 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -596,6 +596,19 @@ void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variabl } } +void DocumentSourceLookUp::initializeIntrospectionPipeline() { + copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); + _parsedIntrospectionPipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); + + auto& sources = _parsedIntrospectionPipeline->getSources(); + + // Ensure that the pipeline does not contain a $changeStream stage. This check will be + // performed recursively on all sub-pipelines. + uassert(ErrorCodes::IllegalOperation, + "$changeStream is not allowed within a $lookup foreign pipeline", + sources.empty() || !sources.front()->constraints().isChangeStreamStage()); +} + void DocumentSourceLookUp::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { Document doc; diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 7164a479416..7aecbec0fff 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -259,11 +259,7 @@ private: * Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup * pipelines will be built recursively. */ - void initializeIntrospectionPipeline() { - copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); - _parsedIntrospectionPipeline = - uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); - } + void initializeIntrospectionPipeline(); /** * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 25789995667..434bfabc00f 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -44,6 +44,9 @@ #include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/query_knobs.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/db/server_options.h" namespace mongo { namespace { @@ -57,6 +60,41 @@ using DocumentSourceLookUpTest = AggregationContextFixture; const long long kDefaultMaxCacheSize = internalDocumentSourceLookupCacheSizeBytes.load(); const auto kExplain = ExplainOptions::Verbosity::kQueryPlanner; +// Allow tests to temporarily switch to a different FCV. +class EnsureFCV { +public: + using Version = ServerGlobalParams::FeatureCompatibility::Version; + EnsureFCV(Version version) + : _origVersion(serverGlobalParams.featureCompatibility.getVersion()) { + serverGlobalParams.featureCompatibility.setVersion(version); + } + ~EnsureFCV() { + serverGlobalParams.featureCompatibility.setVersion(_origVersion); + } + +private: + const Version _origVersion; +}; + +// For tests which need to run in a replica set context. +class ReplDocumentSourceLookUpTest : public DocumentSourceLookUpTest { +public: + void setUp() override { + auto service = getExpCtx()->opCtx->getServiceContext(); + repl::ReplSettings settings; + + settings.setReplSetString("lookupTestSet/node1:12345"); + settings.setMajorityReadConcernEnabled(true); + + repl::StorageInterface::set(service, stdx::make_unique<repl::StorageInterfaceMock>()); + auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(service, settings); + + // Ensure that we are primary. + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + repl::ReplicationCoordinator::set(service, std::move(replCoord)); + } +}; + // A 'let' variable defined in a $lookup stage is expected to be available to all sub-pipelines. For // sub-pipelines below the immediate one, they are passed to via ExpressionContext. This test // confirms that variables defined in the ExpressionContext are captured by the $lookup stage. @@ -200,6 +238,43 @@ TEST_F(DocumentSourceLookUpTest, LiteParsedDocumentSourceLookupContainsExpectedN ASSERT_EQ(2ul, namespaceSet.size()); } +TEST_F(ReplDocumentSourceLookUpTest, RejectsPipelineWithChangeStreamStage) { + // Temporarily set FCV to 3.6 for $changeStream. + EnsureFCV ensureFCV(EnsureFCV::Version::k36); + + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + // Verify that attempting to create a $lookup pipeline containing a $changeStream stage fails. + ASSERT_THROWS_CODE( + DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {from: 'coll', as: 'as', pipeline: [{$changeStream: {}}]}}") + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::IllegalOperation); +} + +TEST_F(ReplDocumentSourceLookUpTest, RejectsSubPipelineWithChangeStreamStage) { + // Temporarily set FCV to 3.6 for $changeStream. + EnsureFCV ensureFCV(EnsureFCV::Version::k36); + + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + // Verify that attempting to create a sub-$lookup pipeline containing a $changeStream stage + // fails at parse time, even if the outer pipeline does not have a $changeStream stage. + ASSERT_THROWS_CODE( + DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {from: 'coll', as: 'as', pipeline: [{$match: {_id: 1}}, {$lookup: " + "{from: 'coll', as: 'subas', pipeline: [{$changeStream: {}}]}}]}}") + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::IllegalOperation); +} TEST_F(DocumentSourceLookUpTest, RejectsLocalFieldForeignFieldWhenPipelineIsSpecified) { auto expCtx = getExpCtx(); |