summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-11-20 15:40:37 +0000
committerevergreen <evergreen@mongodb.com>2019-11-20 15:40:37 +0000
commitb4f98455b02fa64dd23be3512ef83649a5395b76 (patch)
treef25e371247f960709ca75fa58f8f30a795e8dcb2
parentc65877d82f3fcc9f355c93f9921bf7a332a88817 (diff)
downloadmongo-b4f98455b02fa64dd23be3512ef83649a5395b76.tar.gz
SERVER-42137 Allow aggregation stage to write to a collection that the query also reads from
-rw-r--r--jstests/aggregation/sources/merge/merge_to_referenced_collection.js49
-rw-r--r--jstests/aggregation/sources/merge/merge_to_same_collection.js25
-rw-r--r--jstests/noPassthrough/merge_causes_infinite_loop.js63
-rw-r--r--src/mongo/db/commands/map_reduce_agg_test.cpp5
-rw-r--r--src/mongo/db/commands/mr_common.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_merge_test.cpp18
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp14
8 files changed, 117 insertions, 72 deletions
diff --git a/jstests/aggregation/sources/merge/merge_to_referenced_collection.js b/jstests/aggregation/sources/merge/merge_to_referenced_collection.js
index a9060f58b0a..561bc08ddc5 100644
--- a/jstests/aggregation/sources/merge/merge_to_referenced_collection.js
+++ b/jstests/aggregation/sources/merge/merge_to_referenced_collection.js
@@ -1,8 +1,6 @@
/**
* Tests that the server behaves as expected when an $merge stage is targeting a collection which is
- * involved in the aggregate in some other way, e.g. as the source namespace or via a $lookup. We
- * disallow this combination in an effort to prevent the "halloween problem" of a never-ending
- * query.
+ * involved in the aggregate in some other way, e.g. as the source namespace or via a $lookup.
*
* This test issues queries over views, so cannot be run in passthroughs which implicitly shard
* collections.
@@ -17,28 +15,29 @@ load('jstests/libs/fixture_helpers.js'); // For 'FixtureHelpers'.
const testDB = db.getSiblingDB("merge_to_referenced_coll");
const coll = testDB.test;
-withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
+// Function used to reset state in between tests.
+function reset() {
coll.drop();
-
// Seed the collection to ensure each pipeline will actually do something.
- assert.commandWorked(coll.insert({_id: 0}));
+ assert.commandWorked(coll.insert({_id: 0, y: 0}));
+}
- // Each of the following assertions will somehow use $merge to write to a namespace that is
- // being read from elsewhere in the pipeline.
- const assertFailsWithCode = ((fn) => {
- const error = assert.throws(fn);
- assert.contains(error.code, [51188, 51079]);
- });
+withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
+ // Skip the combination of merge modes which will fail depending on the contents of the
+ // tested collection.
+ if (whenMatchedMode == "fail" || whenNotMatchedMode == "fail")
+ return;
+ reset();
// Test $merge to the aggregate command's source collection.
- assertFailsWithCode(() => coll.aggregate([{
+ assert.doesNotThrow(() => coll.aggregate([{
$merge:
{into: coll.getName(), whenMatched: whenMatchedMode, whenNotMatched: whenNotMatchedMode}
}]));
// Test $merge to the same namespace as a $lookup which is the same as the aggregate
// command's source collection.
- assertFailsWithCode(() => coll.aggregate([
+ assert.doesNotThrow(() => coll.aggregate([
{$lookup: {from: coll.getName(), as: "x", localField: "f_id", foreignField: "_id"}},
{
$merge: {
@@ -51,13 +50,13 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
// Test $merge to the same namespace as a $lookup which is *not* the same as the aggregate
// command's source collection.
- assertFailsWithCode(() => coll.aggregate([
+ assert.doesNotThrow(() => coll.aggregate([
{$lookup: {from: "bar", as: "x", localField: "f_id", foreignField: "_id"}},
{$merge: {into: "bar", whenMatched: whenMatchedMode, whenNotMatched: whenNotMatchedMode}}
]));
// Test $merge to the same namespace as a $graphLookup.
- assertFailsWithCode(() => coll.aggregate([
+ assert.doesNotThrow(() => coll.aggregate([
{
$graphLookup: {
from: "bar",
@@ -77,12 +76,12 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
]));
// Test $merge to the same namespace as a $lookup which is nested within another $lookup.
- assertFailsWithCode(() => coll.aggregate([
+ assert.doesNotThrow(() => coll.aggregate([
{
$lookup: {
from: "bar",
as: "x",
- let : {},
+ let: {},
pipeline: [{$lookup: {from: "TARGET", as: "y", pipeline: []}}]
}
},
@@ -95,7 +94,7 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
}
]));
// Test $merge to the same namespace as a $lookup which is nested within a $facet.
- assertFailsWithCode(() => coll.aggregate([
+ assert.doesNotThrow(() => coll.aggregate([
{
$facet: {
y: [{$lookup: {from: "TARGET", as: "y", pipeline: []}}],
@@ -103,7 +102,7 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
},
{$merge: {into: "TARGET", whenMatched: whenMatchedMode, whenNotMatched: whenNotMatchedMode}}
]));
- assertFailsWithCode(() => coll.aggregate([
+ assert.doesNotThrow(() => coll.aggregate([
{
$facet: {
x: [{$lookup: {from: "other", as: "y", pipeline: []}}],
@@ -113,14 +112,14 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
{$merge: {into: "TARGET", whenMatched: whenMatchedMode, whenNotMatched: whenNotMatchedMode}}
]));
- // Test that we use the resolved namespace of a view to detect this sort of halloween
- // problem.
+ // Test that $merge works when the resolved namespace of a view is the same as the output
+ // collection.
assert.commandWorked(
testDB.runCommand({create: "view_on_TARGET", viewOn: "TARGET", pipeline: []}));
- assertFailsWithCode(() => testDB.view_on_TARGET.aggregate([
+ assert.doesNotThrow(() => testDB.view_on_TARGET.aggregate([
{$merge: {into: "TARGET", whenMatched: whenMatchedMode, whenNotMatched: whenNotMatchedMode}}
]));
- assertFailsWithCode(() => coll.aggregate([
+ assert.doesNotThrow(() => coll.aggregate([
{
$facet: {
x: [{$lookup: {from: "other", as: "y", pipeline: []}}],
@@ -155,7 +154,7 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
const nestedPipeline = generateNestedPipeline("lookup", 20).concat([
{$merge: {into: "lookup", whenMatched: whenMatchedMode, whenNotMatched: whenNotMatchedMode}}
]);
- assertFailsWithCode(() => coll.aggregate(nestedPipeline));
+ assert.doesNotThrow(() => coll.aggregate(nestedPipeline));
testDB.dropDatabase();
});
diff --git a/jstests/aggregation/sources/merge/merge_to_same_collection.js b/jstests/aggregation/sources/merge/merge_to_same_collection.js
index 51435696fdd..4e90036f646 100644
--- a/jstests/aggregation/sources/merge/merge_to_same_collection.js
+++ b/jstests/aggregation/sources/merge/merge_to_same_collection.js
@@ -1,20 +1,33 @@
/**
- * Tests that $merge fails when the target collection is the aggregation collection.
+ * Tests that $merge does not fail when the target collection is the aggregation collection.
*
* @tags: [assumes_unsharded_collection]
*/
(function() {
"use strict";
-// For assertMergeFailsForAllModesWithCode.
-load("jstests/aggregation/extras/merge_helpers.js");
+load("jstests/aggregation/extras/utils.js"); // for assertArrayEq()
const coll = db.name;
coll.drop();
-const nDocs = 10;
+const nDocs = 3;
for (let i = 0; i < nDocs; i++) {
assert.commandWorked(coll.insert({_id: i, a: i}));
}
-assertMergeFailsForAllModesWithCode({source: coll, target: coll, errorCodes: 51188});
-}());
+const pipeline = [
+ {$match: {a: {$lt: 1}}},
+ {
+ $merge: {
+ into: coll.getName(),
+ whenMatched: [{$addFields: {a: {$add: ["$a", 3]}}}],
+ whenNotMatched: "insert"
+ }
+ }
+];
+
+assert.doesNotThrow(() => coll.aggregate(pipeline));
+
+assertArrayEq(
+ {actual: coll.find().toArray(), expected: [{_id: 0, a: 3}, {_id: 1, a: 1}, {_id: 2, a: 2}]});
+}()); \ No newline at end of file
diff --git a/jstests/noPassthrough/merge_causes_infinite_loop.js b/jstests/noPassthrough/merge_causes_infinite_loop.js
new file mode 100644
index 00000000000..166bb2e1b63
--- /dev/null
+++ b/jstests/noPassthrough/merge_causes_infinite_loop.js
@@ -0,0 +1,63 @@
+/**
+ * Test that exposes the Halloween problem.
+ *
+ * The Halloween problem describes the potential for a document to be visited more than once
+ * following an update operation that changes its physical location.
+ */
+(function() {
+"use strict";
+
+const conn = MongoRunner.runMongod();
+const db = conn.getDB("merge_causes_infinite_loop");
+const coll = db.getCollection("merge_causes_infinite_loop");
+const out = db.getCollection("merge_causes_infinite_loop_out");
+coll.drop();
+out.drop();
+
+const nDocs = 50;
+// We seed the documents with large values for a. This enables the pipeline that exposes the
+// halloween problem to overflow and fail more quickly.
+const largeNum = 1000 * 1000 * 1000;
+
+// We set internalQueryExecYieldPeriodMS to 1 ms to have query execution yield as often as
+// possible.
+assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryExecYieldPeriodMS: 1}));
+
+// Insert documents.
+// Note that the largeArray field is included to force documents to be written to disk and not
+// simply be updated in the cache. This is crucial to exposing the halloween problem as the
+// physical location of each document needs to change for each document to visited and updated
+// multiple times.
+var bulk = coll.initializeUnorderedBulkOp();
+for (let i = 0; i < nDocs; i++) {
+ bulk.insert({_id: i, a: i * largeNum, largeArray: (new Array(1024 * 1024).join("a"))});
+}
+assert.commandWorked(bulk.execute());
+
+// Build an index over a, the field to be updated, so that updates will push modified documents
+// forward in the index when outputting to the collection being aggregated.
+assert.commandWorked(coll.createIndex({a: 1}));
+
+// Returns a pipeline which outputs to the specified collection.
+function pipeline(outColl) {
+ return [
+ {$match: {a: {$gt: 0}}},
+ {$merge: {into: outColl, whenMatched: [{$addFields: {a: {$multiply: ["$a", 2]}}}]}}
+ ];
+}
+
+const differentCollPipeline = pipeline(out.getName());
+const sameCollPipeline = pipeline(coll.getName());
+
+// Outputting the result of this pipeline to a different collection will not time out nor will any
+// of the computed values overflow.
+assert.commandWorked(db.runCommand(
+ {aggregate: coll.getName(), pipeline: differentCollPipeline, cursor: {}, maxTimeMS: 2500}));
+
+// Because this pipeline writes to the collection being aggregated, it will cause documents to be
+// updated and pushed forward indefinitely. This will cause the computed values to eventually
+// overflow.
+assert.commandFailedWithCode(
+ db.runCommand({aggregate: coll.getName(), pipeline: sameCollPipeline, cursor: {}}), 31109);
+MongoRunner.stopMongod(conn);
+}());
diff --git a/src/mongo/db/commands/map_reduce_agg_test.cpp b/src/mongo/db/commands/map_reduce_agg_test.cpp
index 877a93b7fdd..7023853ac95 100644
--- a/src/mongo/db/commands/map_reduce_agg_test.cpp
+++ b/src/mongo/db/commands/map_reduce_agg_test.cpp
@@ -233,7 +233,7 @@ TEST(MapReduceAggTest, testOutSameCollection) {
ASSERT(typeid(DocumentSourceOut) == typeid(**iter));
}
-TEST(MapReduceAggTest, testSourceDestinationCollectionsEqualMergeFail) {
+TEST(MapReduceAggTest, testSourceDestinationCollectionsEqualMergeDoesNotFail) {
auto nss = NamespaceString{"db", "coll"};
auto mr = MapReduce{
nss,
@@ -241,8 +241,7 @@ TEST(MapReduceAggTest, testSourceDestinationCollectionsEqualMergeFail) {
MapReduceJavascriptCode{reduceJavascript.toString()},
MapReduceOutOptions{boost::make_optional("db"s), "coll", OutputType::Merge, false}};
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- ASSERT_THROWS_CODE(
- map_reduce_common::translateFromMR(mr, expCtx), DBException, ErrorCodes::InvalidOptions);
+ ASSERT_DOES_NOT_THROW(map_reduce_common::translateFromMR(mr, expCtx));
}
TEST(MapReduceAggTest, testSourceDestinationCollectionsNotEqualMergeDoesNotFail) {
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp
index 80d5f8e9ed0..231b68173ba 100644
--- a/src/mongo/db/commands/mr_common.cpp
+++ b/src/mongo/db/commands/mr_common.cpp
@@ -346,22 +346,12 @@ bool mrSupportsWriteConcern(const BSONObj& cmd) {
std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx) {
- // Verify that source and output collections are different.
- // Note that $out allows for the source and the destination to match, so only reject
- // in the case that the out option is being converted to a $merge.
- auto& inNss = parsedMr.getNamespace();
auto outNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName()
? *parsedMr.getOutOptions().getDatabaseName()
: parsedMr.getNamespace().db(),
parsedMr.getOutOptions().getCollectionName()};
auto outType = parsedMr.getOutOptions().getOutputType();
- if (outType == OutputType::Merge || outType == OutputType::Reduce) {
- uassert(ErrorCodes::InvalidOptions,
- "Source collection cannot be the same as destination collection in MapReduce when "
- "using merge or reduce actions",
- inNss != outNss);
- }
// If non-inline output, verify that the target collection is *not* sharded by anything other
// than _id.
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 9669fc49f22..51d0a1adf2f 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -365,11 +365,6 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
MergeWhenNotMatchedMode_serializer(whenNotMatched)),
isSupportedMergeMode(whenMatched, whenNotMatched));
- uassert(51188,
- "{} is not supported when the output collection is the same as "
- "the aggregation collection"_format(kStageName),
- expCtx->ns != outputNs);
-
uassert(ErrorCodes::InvalidNamespace,
"Invalid {} target namespace: '{}'"_format(kStageName, outputNs.ns()),
outputNs.isValid());
diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp
index b05a30e1507..f309915462f 100644
--- a/src/mongo/db/pipeline/document_source_merge_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_test.cpp
@@ -144,6 +144,15 @@ TEST_F(DocumentSourceMergeTest, CorrectlyParsesIfWhenMatchedIsStringOrArray) {
ASSERT(createMergeStage(spec));
}
+TEST_F(DocumentSourceMergeTest, CorrectlyParsesIfTargetAndAggregationNamespacesAreSame) {
+ const auto targetNsSameAsAggregationNs = getExpCtx()->ns;
+ const auto targetColl = targetNsSameAsAggregationNs.coll();
+ const auto targetDb = targetNsSameAsAggregationNs.db();
+
+ auto spec = BSON("$merge" << BSON("into" << BSON("coll" << targetColl << "db" << targetDb)));
+ ASSERT(createMergeStage(spec));
+}
+
TEST_F(DocumentSourceMergeTest, FailsToParseIncorrectMergeSpecType) {
auto spec = BSON("$merge" << 1);
ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51182);
@@ -290,15 +299,6 @@ TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenNotMatchedModeIsNotString) {
ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
}
-TEST_F(DocumentSourceMergeTest, FailsToParseIfTargetAndAggregationNamespacesAreSame) {
- const auto targetNsSameAsAggregationNs = getExpCtx()->ns;
- const auto targetColl = targetNsSameAsAggregationNs.coll();
- const auto targetDb = targetNsSameAsAggregationNs.db();
-
- auto spec = BSON("$merge" << BSON("into" << BSON("coll" << targetColl << "db" << targetDb)));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51188);
-}
-
TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenMatchedModeIsUnsupportedString) {
auto spec = BSON("$merge" << BSON("into"
<< "target_collection"
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index c20fc4e82cb..03a242a3e29 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -192,20 +192,6 @@ void Pipeline::validateTopLevelPipeline() const {
}
}
}
- // Make sure we aren't reading from and writing to the same namespace for a $merge.
- if (auto mergeStage = dynamic_cast<DocumentSourceMerge*>(_sources.back().get())) {
- const auto& outNss = mergeStage->getOutputNs();
- stdx::unordered_set<NamespaceString> collectionNames;
- // In order to gather only the involved namespaces which we are reading from, not the one we
- // are writing to, skip the final stage as we know it is a $merge stage.
- for (auto it = _sources.begin(); it != std::prev(_sources.end()); it++) {
- (*it)->addInvolvedCollections(&collectionNames);
- }
- uassert(51079,
- "Cannot use $merge to write to the same namespace being read from elsewhere in the "
- "pipeline",
- collectionNames.find(outNss) == collectionNames.end());
- }
}
void Pipeline::validateFacetPipeline() const {