summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-10-06 06:29:33 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-10-10 15:11:13 +0100
commit125fa84f4a6ee5faabae5529d6a6c05816c7050b (patch)
tree30ba3ed233fd9c3cdaeec4c076d1c8b359832600
parent1e2626463b5a7c22484c4556b77da149f4ad1ef9 (diff)
downloadmongo-125fa84f4a6ee5faabae5529d6a6c05816c7050b.tar.gz
SERVER-31416 Ban $changeStream from $lookup pipelines
-rw-r--r--jstests/change_streams/change_stream_ban_from_lookup.js37
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp75
4 files changed, 126 insertions, 5 deletions
diff --git a/jstests/change_streams/change_stream_ban_from_lookup.js b/jstests/change_streams/change_stream_ban_from_lookup.js
new file mode 100644
index 00000000000..f17be0ecef1
--- /dev/null
+++ b/jstests/change_streams/change_stream_ban_from_lookup.js
@@ -0,0 +1,37 @@
+/**
+ * Test that the $changeStream stage cannot be used in a $lookup pipeline or sub-pipeline.
+ */
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+
+ const coll = db.change_stream_ban_from_lookup;
+ coll.drop();
+
+ assert.writeOK(coll.insert({_id: 1}));
+
+ // Verify that we cannot create a $lookup using a pipeline which begins with $changeStream.
+ assertErrorCode(coll,
+ [{$lookup: {from: coll.getName(), as: 'as', pipeline: [{$changeStream: {}}]}}],
+ ErrorCodes.IllegalOperation);
+
+ // Verify that we cannot create a $lookup if its pipeline contains a sub-$lookup whose pipeline
+ // begins with $changeStream.
+ assertErrorCode(
+ coll,
+ [{
+ $lookup: {
+ from: coll.getName(),
+ as: 'as',
+ pipeline: [
+ {$match: {_id: 1}},
+ {
+ $lookup:
+ {from: coll.getName(), as: 'subas', pipeline: [{$changeStream: {}}]}
+ }
+ ]
+ }
+ }],
+ ErrorCodes.IllegalOperation);
+})();
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 7e77efe2d80..c7c8873e22b 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -596,6 +596,19 @@ void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variabl
}
}
+void DocumentSourceLookUp::initializeIntrospectionPipeline() {
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+ _parsedIntrospectionPipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
+
+ auto& sources = _parsedIntrospectionPipeline->getSources();
+
+ // Ensure that the pipeline does not contain a $changeStream stage. This check will be
+ // performed recursively on all sub-pipelines.
+ uassert(ErrorCodes::IllegalOperation,
+ "$changeStream is not allowed within a $lookup foreign pipeline",
+ sources.empty() || !sources.front()->constraints().isChangeStreamStage());
+}
+
void DocumentSourceLookUp::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
Document doc;
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 7164a479416..7aecbec0fff 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -259,11 +259,7 @@ private:
* Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup
* pipelines will be built recursively.
*/
- void initializeIntrospectionPipeline() {
- copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
- _parsedIntrospectionPipeline =
- uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
- }
+ void initializeIntrospectionPipeline();
/**
* Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 25789995667..434bfabc00f 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -44,6 +44,9 @@
#include "mongo/db/pipeline/stub_mongo_process_interface.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/query_knobs.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/db/server_options.h"
namespace mongo {
namespace {
@@ -57,6 +60,41 @@ using DocumentSourceLookUpTest = AggregationContextFixture;
const long long kDefaultMaxCacheSize = internalDocumentSourceLookupCacheSizeBytes.load();
const auto kExplain = ExplainOptions::Verbosity::kQueryPlanner;
+// Allow tests to temporarily switch to a different FCV.
+class EnsureFCV {
+public:
+ using Version = ServerGlobalParams::FeatureCompatibility::Version;
+ EnsureFCV(Version version)
+ : _origVersion(serverGlobalParams.featureCompatibility.getVersion()) {
+ serverGlobalParams.featureCompatibility.setVersion(version);
+ }
+ ~EnsureFCV() {
+ serverGlobalParams.featureCompatibility.setVersion(_origVersion);
+ }
+
+private:
+ const Version _origVersion;
+};
+
+// For tests which need to run in a replica set context.
+class ReplDocumentSourceLookUpTest : public DocumentSourceLookUpTest {
+public:
+ void setUp() override {
+ auto service = getExpCtx()->opCtx->getServiceContext();
+ repl::ReplSettings settings;
+
+ settings.setReplSetString("lookupTestSet/node1:12345");
+ settings.setMajorityReadConcernEnabled(true);
+
+ repl::StorageInterface::set(service, stdx::make_unique<repl::StorageInterfaceMock>());
+ auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(service, settings);
+
+ // Ensure that we are primary.
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ repl::ReplicationCoordinator::set(service, std::move(replCoord));
+ }
+};
+
// A 'let' variable defined in a $lookup stage is expected to be available to all sub-pipelines. For
// sub-pipelines below the immediate one, they are passed to via ExpressionContext. This test
// confirms that variables defined in the ExpressionContext are captured by the $lookup stage.
@@ -200,6 +238,43 @@ TEST_F(DocumentSourceLookUpTest, LiteParsedDocumentSourceLookupContainsExpectedN
ASSERT_EQ(2ul, namespaceSet.size());
}
+TEST_F(ReplDocumentSourceLookUpTest, RejectsPipelineWithChangeStreamStage) {
+ // Temporarily set FCV to 3.6 for $changeStream.
+ EnsureFCV ensureFCV(EnsureFCV::Version::k36);
+
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ // Verify that attempting to create a $lookup pipeline containing a $changeStream stage fails.
+ ASSERT_THROWS_CODE(
+ DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {from: 'coll', as: 'as', pipeline: [{$changeStream: {}}]}}")
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+}
+
+TEST_F(ReplDocumentSourceLookUpTest, RejectsSubPipelineWithChangeStreamStage) {
+ // Temporarily set FCV to 3.6 for $changeStream.
+ EnsureFCV ensureFCV(EnsureFCV::Version::k36);
+
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ // Verify that attempting to create a sub-$lookup pipeline containing a $changeStream stage
+ // fails at parse time, even if the outer pipeline does not have a $changeStream stage.
+ ASSERT_THROWS_CODE(
+ DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {from: 'coll', as: 'as', pipeline: [{$match: {_id: 1}}, {$lookup: "
+ "{from: 'coll', as: 'subas', pipeline: [{$changeStream: {}}]}}]}}")
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+}
TEST_F(DocumentSourceLookUpTest, RejectsLocalFieldForeignFieldWhenPipelineIsSpecified) {
auto expCtx = getExpCtx();