diff options
38 files changed, 1571 insertions, 611 deletions
diff --git a/jstests/aggregation/sources/merge/all_modes.js b/jstests/aggregation/sources/merge/all_modes.js index 3854008072c..278d18de6c3 100644 --- a/jstests/aggregation/sources/merge/all_modes.js +++ b/jstests/aggregation/sources/merge/all_modes.js @@ -199,7 +199,7 @@ const target = db.all_modes_target; })(); // Test 'whenMatched=[pipeline] whenNotMatched=insert' mode. This is an equivalent of a -// pipeline-style update with upsert=true. +// pipeline-style update with upsert=true and upsertSupplied=true. (function testWhenMatchedPipelineUpdateWhenNotMatchedInsert() { assert(target.drop()); assert.commandWorked(target.insert({_id: 1, b: 1})); @@ -207,9 +207,11 @@ const target = db.all_modes_target; $merge: {into: target.getName(), whenMatched: [{$addFields: {x: 2}}], whenNotMatched: "insert"} }])); + // We match {_id: 1} and apply the pipeline to add the field {x: 2}. The other source collection + // documents are copied directly into the target collection. assertArrayEq({ actual: target.find().toArray(), - expected: [{_id: 1, b: 1, x: 2}, {_id: 2, x: 2}, {_id: 3, x: 2}] + expected: [{_id: 1, b: 1, x: 2}, {_id: 2, a: 2, b: "b"}, {_id: 3, a: 3, b: "c"}] }); })(); diff --git a/jstests/aggregation/sources/merge/mode_pipeline_insert.js b/jstests/aggregation/sources/merge/mode_pipeline_insert.js index df3414e0950..e37f5467b81 100644 --- a/jstests/aggregation/sources/merge/mode_pipeline_insert.js +++ b/jstests/aggregation/sources/merge/mode_pipeline_insert.js @@ -37,19 +37,19 @@ target.drop(); assertArrayEq({ actual: target.find().toArray(), expected: [ - {_id: 1, x: 1}, + {_id: 1, a: 1, b: "a"}, ] }); })(); -// Test $merge inserts a document into an existing target collection if no matching document -// is found. +// Test $merge inserts the original source document into an existing target collection if no +// matching document is found. (function testMergeInsertsDocumentIfMatchNotFound() { assert.commandWorked(target.deleteMany({})); assert.doesNotThrow( () => source.aggregate(makeMergePipeline( {target: target.getName(), updatePipeline: [{$addFields: {x: 1, y: 2}}]}))); - assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 2}]}); + assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, a: 1, b: "a"}]}); })(); // Test $merge updates an existing document in the target collection by applying a @@ -57,9 +57,10 @@ target.drop(); (function testMergeUpdatesDocumentIfMatchFound() { assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ target: target.getName(), - updatePipeline: [{$project: {x: {$add: ["$x", 1]}, y: {$add: ["$y", 2]}}}] + updatePipeline: + [{$project: {x: {$add: ["$a", 1]}, y: {$sum: ["$y", 2]}, z: {$add: ["$y", 2]}}}] }))); - assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2, y: 4}]}); + assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2, y: 2, z: null}]}); })(); // Test $merge with various pipeline stages which are currently supported by the pipeline-style @@ -76,7 +77,7 @@ target.drop(); () => source.aggregate(makeMergePipeline( {target: target.getName(), updatePipeline: [{$addFields: {x: {$add: ["$b", 1]}}}]}))); assertArrayEq( - {actual: target.find().toArray(), expected: [{_id: 1, b: 1, x: 2}, {_id: 2, x: null}]}); + {actual: target.find().toArray(), expected: [{_id: 1, b: 1, x: 2}, {_id: 2, a: 2}]}); // Test $project stage. assert(target.drop()); @@ -84,7 +85,7 @@ target.drop(); assert.doesNotThrow( () => source.aggregate(makeMergePipeline( {target: target.getName(), updatePipeline: [{$project: {x: {$add: ["$b", 1]}}}]}))); - assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2}, {_id: 2, x: null}]}); + assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2}, {_id: 2, a: 2}]}); // Test $replaceWith stage. assert(target.drop()); @@ -106,8 +107,8 @@ target.drop(); {actual: target.find().toArray(), expected: [{_id: 1, x: {y: 1}}, {_id: 2, x: {y: 2}}]}); })(); -// Test $merge inserts a new document into the target collection if not matching document is -// found by applying a pipeline-style update with upsert=true semantics. +// Test $merge inserts a new document into the target collection if no matching document is +// found by applying a pipeline-style update with upsert=true and upsertSupplied=true. (function testMergeInsertDocumentIfMatchNotFound() { assert(source.drop()); assert(target.drop()); @@ -115,7 +116,7 @@ target.drop(); assert.commandWorked(target.insert({_id: 2, a: 2})); assert.doesNotThrow(() => source.aggregate(makeMergePipeline( {target: target.getName(), updatePipeline: [{$addFields: {x: 1}}]}))); - assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 1}, {_id: 2, a: 2}]}); + assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, a: 1}, {_id: 2, a: 2}]}); })(); // Test $merge doesn't modify the target collection if a document has been removed from the @@ -129,7 +130,7 @@ target.drop(); assertArrayEq({ actual: target.find().toArray(), expected: [ - {_id: 1, x: 1}, + {_id: 1, a: 1}, {_id: 2, a: 2}, ] }); @@ -186,7 +187,7 @@ target.drop(); // differ from the _id field. (function testMergeWithOnFields() { if (FixtureHelpers.isSharded(source)) { - // Skip this test if the collection sharded, because an implicitly created sharded + // Skip this test if the collection is sharded, because an implicitly created sharded // key of {_id: 1} will not be covered by a unique index created in this test, which // is not allowed. return; @@ -208,7 +209,7 @@ target.drop(); }))); assertArrayEq({ actual: target.find({}, {_id: 0}).toArray(), - expected: [{a: 1, b: 1, z: 1}, {a: 2, z: 1}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}] + expected: [{a: 1, b: 1, z: 1}, {a: 2}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}] }); // The 'on' fields contains multiple document fields. @@ -228,7 +229,7 @@ target.drop(); }))); assertArrayEq({ actual: target.find({}, {_id: 0}).toArray(), - expected: [{a: 1, b: 1, z: 1}, {a: 2, b: 4, z: 1}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}] + expected: [{a: 1, b: 1, z: 1}, {a: 2, b: 4}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}] }); assert.commandWorked(source.dropIndex({a: 1, b: 1})); assert.commandWorked(target.dropIndex({a: 1, b: 1})); @@ -260,12 +261,8 @@ target.drop(); updatePipeline: [{$addFields: {z: 1}}] }))); assertArrayEq({ - actual: target.find().toArray(), - expected: [ - {_id: 1, a: {b: "b"}, z: 1}, - {_id: 2, a: {b: "c"}, c: "y", z: 1}, - {_id: 3, a: {b: 30}, z: 1} - ] + actual: target.find({}, {_id: 0}).toArray(), + expected: [{a: {b: "b"}, c: "x"}, {a: {b: "c"}, c: "y", z: 1}, {a: {b: 30}, b: "c"}] }); })(); @@ -326,9 +323,8 @@ target.drop(); assertArrayEq({ actual: target.find({}, {_id: 0}).toArray(), // There is a matching document in the target with {_id: 1}, but since we cannot match - // it (no _id in projection), we just insert two new documents from the source - // collection by applying a pipeline-style update. - expected: [{b: "c"}, {z: 1}, {z: 1}] + // it (no _id in projection), we insert the two documents from the source collection. + expected: [{b: "c"}, {a: 1, b: "a"}, {a: 2, b: "b"}] }); pipeline = makeMergePipeline({ @@ -346,7 +342,7 @@ target.drop(); assert.doesNotThrow(() => source.aggregate(pipeline)); assertArrayEq({ actual: target.find({}, {_id: 0}).toArray(), - expected: [{b: "c"}, {a: 1, z: 1}, {a: 2, z: 1}] + expected: [{b: "c"}, {a: 1, b: "a"}, {a: 2, b: "b"}] }); assert.commandWorked(source.dropIndex({_id: 1, a: -1})); assert.commandWorked(target.dropIndex({_id: 1, a: -1})); @@ -358,6 +354,7 @@ target.drop(); assert(target.drop()); assert.commandWorked(db.createCollection(target.getName(), {validator: validator})); assert.commandWorked(target.createIndex({a: 1})); + assert.commandWorked(target.insert([{_id: 1, z: 5}, {_id: 2, z: 5}])); assert.doesNotThrow(() => source.aggregate(makeMergePipeline( {target: target.getName(), updatePipeline: [{$addFields: {z: 1}}]}))); assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, z: 1}, {_id: 2, z: 1}]}); @@ -368,8 +365,7 @@ target.drop(); assert.eq(validator, listColl.cursor.firstBatch[0].options["validator"]); })(); -// Test $merge implicitly creates a new database when the target collection's database doesn't -// exist. +// Test that $merge implicitly creates a new database when the target collection's db doesn't exist. (function testMergeImplicitlyCreatesTargetDatabase() { assert(source.drop()); assert.commandWorked(source.insert({_id: 1, a: 1, b: "a"})); @@ -384,19 +380,19 @@ target.drop(); if (!FixtureHelpers.isMongos(db)) { assert.doesNotThrow(() => source.aggregate(foreignPipeline)); - assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, z: 1}]}); + assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, a: 1, b: "a"}]}); } else { // Implicit database creation is prohibited in a cluster. const error = assert.throws(() => source.aggregate(foreignPipeline)); assert.commandFailedWithCode(error, ErrorCodes.NamespaceNotFound); - // Force a creation of the database and collection, then fall through the test - // below. - assert.commandWorked(foreignTarget.insert({_id: 1})); + // Force creation of the database and collection, then fall through the test below. + assert.commandWorked(foreignTarget.insert({_id: 1, a: 1, b: "a"})); } assert.doesNotThrow(() => source.aggregate(foreignPipeline)); - assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, z: 1}]}); + assertArrayEq( + {actual: foreignTarget.find().toArray(), expected: [{_id: 1, a: 1, b: "a", z: 1}]}); assert.commandWorked(foreignDb.dropDatabase()); })(); @@ -414,27 +410,28 @@ target.drop(); updatePipeline: [{$set: {x: {$add: ["$$new.a", "$$new.b"]}}}] }))); assertArrayEq( - {actual: target.find().toArray(), expected: [{_id: 1, c: 1, x: 2}, {_id: 2, x: 4}]}); + {actual: target.find().toArray(), expected: [{_id: 1, c: 1, x: 2}, {_id: 2, a: 2, b: 2}]}); })(); -// Test that the default 'let' variable 'new' is not available once the 'let' argument to the -// $merge stage is specified explicitly. +// Test that the default 'let' variable 'new' is always available even when the 'let' argument to +// the $merge stage is specified explicitly. (function testMergeCannotUseDefaultLetVariableIfLetIsSpecified() { assert(source.drop()); assert(target.drop()); assert.commandWorked(source.insert([{_id: 1, a: 1, b: 1}, {_id: 2, a: 2, b: 2}])); assert.commandWorked(target.insert({_id: 1, c: 1})); - const error = assert.throws(() => source.aggregate(makeMergePipeline({ + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ letVars: {foo: "bar"}, target: target.getName(), updatePipeline: [{$project: {x: "$$new.a", y: "$$new.b"}}] }))); - assert.commandFailedWithCode(error, 17276); + assertArrayEq( + {actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 1}, {_id: 2, a: 2, b: 2}]}); })(); // Test that $merge can accept an empty object holding no variables and the default 'new' -// variable is not available. +// variable is still available. (function testMergeWithEmptyLetVariables() { assert(source.drop()); assert(target.drop()); @@ -447,16 +444,42 @@ target.drop(); {letVars: {}, target: target.getName(), updatePipeline: [{$set: {x: "foo"}}]}))); assertArrayEq({ actual: target.find().toArray(), - expected: [{_id: 1, c: 1, x: "foo"}, {_id: 2, x: "foo"}] + expected: [{_id: 1, c: 1, x: "foo"}, {_id: 2, a: 2, b: 2}] }); // No default variable 'new' is available. - const error = assert.throws(() => source.aggregate(makeMergePipeline({ + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ letVars: {}, target: target.getName(), updatePipeline: [{$project: {x: "$$new.a", y: "$$new.b"}}] }))); - assert.commandFailedWithCode(error, 17276); + assertArrayEq( + {actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 1}, {_id: 2, x: 2, y: 2}]}); +})(); + +// Test that $merge will reject a 'let' specification which attempts to redefine 'new'. +(function testMergeRejectsLetVariablesWhichRedefineNew() { + assert(source.drop()); + assert(target.drop()); + assert.commandWorked(source.insert([{_id: 1, a: 1, b: 1}, {_id: 2, a: 2, b: 2}])); + assert.commandWorked(target.insert({_id: 1, c: 1})); + + // Cannot override 'new' with an arbitrary value. + const error = assert.throws(() => source.aggregate(makeMergePipeline({ + letVars: {new: "$a"}, + target: target.getName(), + updatePipeline: [{$set: {x: "foo"}}] + }))); + assert.commandFailedWithCode(error, 51273); + + // If the user's 'let' explicitly sets 'new' to "$$ROOT", we allow it. + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ + letVars: {new: "$$ROOT"}, + target: target.getName(), + updatePipeline: [{$project: {x: "$$new.a", y: "$$new.b"}}] + }))); + assertArrayEq( + {actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 1}, {_id: 2, a: 2, b: 2}]}); })(); // Test that $merge can accept a null value as the 'let' argument and the default variable 'new' @@ -465,7 +488,7 @@ target.drop(); // cannot differentiate between an optional field specified explicitly as 'null', or not // specified at all. In both cases it will treat the field like it wasn't specified. So, this // test ensures that we're aware of this limitation. Once the limitation is addressed in -// SERVER-41272, this test should be updated to accordingly. +// SERVER-41272, this test should be updated accordingly. (function testMergeWithNullLetVariables() { assert(source.drop()); assert(target.drop()); @@ -478,7 +501,7 @@ target.drop(); {letVars: null, target: target.getName(), updatePipeline: [{$set: {x: "foo"}}]}))); assertArrayEq({ actual: target.find().toArray(), - expected: [{_id: 1, c: 1, x: "foo"}, {_id: 2, x: "foo"}] + expected: [{_id: 1, c: 1, x: "foo"}, {_id: 2, a: 2, b: 2}] }); // Can use the default 'new' variable. @@ -507,7 +530,7 @@ target.drop(); }))); assertArrayEq({ actual: target.find().toArray(), - expected: [{_id: 1, c: 1, x: 1, y: "foo", z: true}, {_id: 2, x: 1, y: "foo", z: true}] + expected: [{_id: 1, c: 1, x: 1, y: "foo", z: true}, {_id: 2, a: 2, b: 2}] }); // Constant array. @@ -520,7 +543,7 @@ target.drop(); updatePipeline: [{$set: {x: {$arrayElemAt: ["$$a", 1]}}}] }))); assertArrayEq( - {actual: target.find().toArray(), expected: [{_id: 1, c: 1, x: 2}, {_id: 2, x: 2}]}); + {actual: target.find().toArray(), expected: [{_id: 1, c: 1, x: 2}, {_id: 2, a: 2, b: 2}]}); })(); // Test that variables referencing the fields in the source document can be specified in the @@ -538,7 +561,7 @@ target.drop(); updatePipeline: [{$set: {z: {$add: ["$$x", "$$y"]}}}] }))); assertArrayEq( - {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 2}, {_id: 2, z: 4}]}); + {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 2}, {_id: 2, a: 2, b: 2}]}); // Array field with expressions in the pipeline. assert(source.drop()); @@ -551,8 +574,10 @@ target.drop(); target: target.getName(), updatePipeline: [{$set: {z: {$arrayElemAt: ["$$x", 1]}}}] }))); - assertArrayEq( - {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 2}, {_id: 2, z: 5}]}); + assertArrayEq({ + actual: target.find().toArray(), + expected: [{_id: 1, c: 1, z: 2}, {_id: 2, a: [4, 5, 6]}] + }); // Array field with expressions in the 'let' argument. assert(target.drop()); @@ -563,8 +588,10 @@ target.drop(); target: target.getName(), updatePipeline: [{$set: {z: "$$x"}}] }))); - assertArrayEq( - {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 3}, {_id: 2, z: 6}]}); + assertArrayEq({ + actual: target.find().toArray(), + expected: [{_id: 1, c: 1, z: 3}, {_id: 2, a: [4, 5, 6]}] + }); })(); // Test that variables using the dotted path can be specified in the 'let' argument and @@ -580,8 +607,10 @@ target.drop(); target: target.getName(), updatePipeline: [{$set: {z: {$pow: ["$$x", 2]}}}] }))); - assertArrayEq( - {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 4}, {_id: 2, z: 9}]}); + assertArrayEq({ + actual: target.find().toArray(), + expected: [{_id: 1, c: 1, z: 4}, {_id: 2, a: {b: {c: 3}}}] + }); })(); // Test that 'let' variables are referred to the computed document in the aggregation pipeline, @@ -605,7 +634,7 @@ target.drop(); }))); assertArrayEq({ actual: target.find().toArray(), - expected: [{_id: 1, c: 1, z: {_id: 1, a: 3}}, {_id: 2, z: {_id: 2, a: 3}}] + expected: [{_id: 1, c: 1, z: {_id: 1, a: 3}}, {_id: 2, a: 3}] }); // Test custom 'let' variables. @@ -622,6 +651,6 @@ target.drop(); updatePipeline: [{$set: {z: "$$x"}}] }))); assertArrayEq( - {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 49}, {_id: 2, z: 9}]}); + {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 49}, {_id: 2, a: 3}]}); })(); }()); diff --git a/jstests/core/update_with_pipeline.js b/jstests/core/update_with_pipeline.js index 963d72b6592..4f044a19352 100644 --- a/jstests/core/update_with_pipeline.js +++ b/jstests/core/update_with_pipeline.js @@ -117,8 +117,62 @@ testUpsertDoesInsert({_id: 1, x: 1}, [{$project: {x: 1}}], {_id: 1, x: 1}); testUpsertDoesInsert({_id: 1, x: 1}, [{$project: {x: "foo"}}], {_id: 1, x: "foo"}); testUpsertDoesInsert({_id: 1, x: 1, y: 1}, [{$unset: ["x"]}], {_id: 1, y: 1}); -// Update fails when invalid stage is specified. This is a sanity check rather than an -// exhaustive test of all stages. +// Upsert with 'upsertSupplied' inserts the given document and populates _id from the query. +assert.commandWorked(db.runCommand({ + update: coll.getName(), + updates: [{ + q: {_id: "supplied_doc"}, + u: [{$set: {x: 1}}], + upsert: true, + upsertSupplied: true, + c: {new: {suppliedDoc: true}} + }] +})); +assert(coll.findOne({_id: "supplied_doc", suppliedDoc: true})); + +// Update with 'upsertSupplied:true' fails if 'upsert' is false. +assert.commandFailedWithCode(db.runCommand({ + update: coll.getName(), + updates: [{ + q: {_id: "supplied_doc"}, + u: [{$set: {x: 1}}], + upsert: false, + upsertSupplied: true, + c: {new: {suppliedDoc: true}} + }] +}), + ErrorCodes.FailedToParse); + +// Upsert with 'upsertSupplied' fails if no constants are provided. +assert.commandFailedWithCode(db.runCommand({ + update: coll.getName(), + updates: [{q: {_id: "supplied_doc"}, u: [{$set: {x: 1}}], upsert: true, upsertSupplied: true}] +}), + ErrorCodes.FailedToParse); + +// Upsert with 'upsertSupplied' fails if constants do not include a field called 'new'. +assert.commandFailedWithCode(db.runCommand({ + update: coll.getName(), + updates: + [{q: {_id: "supplied_doc"}, u: [{$set: {x: 1}}], upsert: true, upsertSupplied: true, c: {}}] +}), + ErrorCodes.FailedToParse); + +// Upsert with 'upsertSupplied' fails if c.new is not an object. +assert.commandFailedWithCode(db.runCommand({ + update: coll.getName(), + updates: [{ + q: {_id: "supplied_doc"}, + u: [{$set: {x: 1}}], + upsert: true, + upsertSupplied: true, + c: {new: "string"} + }] +}), + ErrorCodes.FailedToParse); + +// Update fails when invalid stage is specified. This is a sanity check rather than an exhaustive +// test of all stages. assert.commandFailedWithCode(coll.update({x: 1}, [{$match: {x: 1}}]), ErrorCodes.InvalidOptions); assert.commandFailedWithCode(coll.update({x: 1}, [{$sort: {x: 1}}]), ErrorCodes.InvalidOptions); assert.commandFailedWithCode(coll.update({x: 1}, [{$facet: {a: [{$match: {x: 1}}]}}]), diff --git a/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js b/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js new file mode 100644 index 00000000000..25d4433b25c --- /dev/null +++ b/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js @@ -0,0 +1,226 @@ +/** + * Tests that $merge with {whenMatched: [], whenNotMatched: 'insert'} is handled correctly during + * upgrade from and downgrade to a pre-backport version of 4.2 on a sharded cluster. + */ +(function() { +"use strict"; + +load("jstests/multiVersion/libs/causal_consistency_helpers.js"); // supportsMajorityReadConcern +load("jstests/multiVersion/libs/multi_cluster.js"); // upgradeCluster +load("jstests/multiVersion/libs/multi_rs.js"); // upgradeSet + +// The UUID consistency check can hit NotMasterNoSlaveOk when it attempts to obtain a list of +// collections from the shard Primaries through mongoS at the end of this test. +TestData.skipCheckingUUIDsConsistentAcrossCluster = true; + +if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; +} + +const preBackport42Version = "4.2.1"; +const latestVersion = "latest"; + +const st = new ShardingTest({ + shards: 2, + mongos: 1, + rs: {nodes: 3}, + other: { + mongosOptions: {binVersion: preBackport42Version}, + configOptions: {binVersion: preBackport42Version}, + rsOptions: {binVersion: preBackport42Version}, + } +}); + +// Obtain references to the test database, the source and target collections. +let mongosDB = st.s.getDB(jsTestName()); +let sourceSharded = mongosDB.source_coll_sharded; +let targetSharded = mongosDB.target_coll_sharded; +let sourceUnsharded = mongosDB.source_coll_unsharded; +let targetUnsharded = mongosDB.target_coll_unsharded; + +// Updates the specified cluster components and then refreshes our references to each of them. +function refreshCluster(version, components, singleShard) { + // Default to only upgrading the explicitly specified components. + const defaultComponents = {upgradeMongos: false, upgradeShards: false, upgradeConfigs: false}; + components = Object.assign(defaultComponents, components); + + if (singleShard) { + singleShard.upgradeSet({binVersion: version}); + } else { + st.upgradeCluster(version, components); + } + + // Wait for the config server and shards to become available, and restart mongoS. + st.configRS.awaitSecondaryNodes(); + st.rs0.awaitSecondaryNodes(); + st.rs1.awaitSecondaryNodes(); + st.restartMongoses(); + + // Having upgraded the cluster, reacquire references to each component. + mongosDB = st.s.getDB(jsTestName()); + sourceSharded = mongosDB.source_coll_sharded; + targetSharded = mongosDB.target_coll_sharded; + sourceUnsharded = mongosDB.source_coll_unsharded; + targetUnsharded = mongosDB.target_coll_unsharded; +} + +// Run the aggregation and swallow applicable exceptions for as long as we receive them, up to the +// assert.soon timeout. This is necessary because there is a period after one shard's Primary steps +// down during upgrade where a $merge on the other shard may still target the previous Primary. +function tryWhileNotMaster(sourceColl, targetColl, pipeline, options) { + assert.soon(() => { + const aggCmdParams = Object.assign({pipeline: pipeline, cursor: {}}, options); + const cmdRes = sourceColl.runCommand("aggregate", aggCmdParams); + if (cmdRes.ok) { + return true; + } + // The only errors we are prepared to swallow are ErrorCodes.NotMaster and CursorNotFound. + // The latter can be thrown as a consequence of a NotMaster on one shard when the $merge + // stage is dispatched to a merging shard as part of the latter half of the pipeline. + const errorsToSwallow = [ErrorCodes.NotMaster, ErrorCodes.CursorNotFound]; + assert(errorsToSwallow.includes(cmdRes.code), () => tojson(cmdRes)); + // TODO SERVER-43851: this may be susceptible to zombie writes. Ditto for all other + // occurrences of remove({}) throughout this test. + assert.commandWorked(targetColl.remove({})); + return false; + }); +} + +// Enable sharding on the the test database and ensure that the primary is shard0. +assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); +st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + +// Shard the source collection on {_id: 1}, split across the shards at {_id: 0}. +st.shardColl(sourceSharded, {_id: 1}, {_id: 0}, {_id: 1}); + +// Shard the target collection on {_id: "hashed"}, so that the target shard for each document will +// not necessarily be the same as the source shard. +st.shardColl(targetSharded, {_id: "hashed"}, false, false); + +// Insert an identical set of test data into both the sharded and unsharded source collections. In +// the former case, the documents are spread across both shards. +for (let i = -20; i < 20; ++i) { + assert.commandWorked(sourceSharded.insert({_id: i})); + assert.commandWorked(sourceUnsharded.insert({_id: i})); +} + +// Define a series of test cases covering all $merge distributed planning scenarios. +const testCases = [ + // $merge from unsharded to unsharded, passthrough from mongoS and write locally. + { + sourceColl: () => sourceUnsharded, + targetColl: () => targetUnsharded, + preMergePipeline: [], + allowDiskUse: false, + disableExchange: false + }, + // $merge from unsharded to sharded, passthrough from mongoS and write cross-shard. + { + sourceColl: () => sourceUnsharded, + targetColl: () => targetSharded, + preMergePipeline: [], + allowDiskUse: false, + disableExchange: false + }, + // $merge from sharded to sharded, writes from shard to shard in parallel. + { + sourceColl: () => sourceSharded, + targetColl: () => targetSharded, + preMergePipeline: [], + allowDiskUse: false, + disableExchange: false + }, + // $group with exchange, sends input documents to relevant shard and $merges locally. + { + sourceColl: () => sourceSharded, + targetColl: () => targetSharded, + preMergePipeline: [{$group: {_id: "$_id"}}], + allowDiskUse: false, + disableExchange: false + }, + // $group, exchange prohibited, $merge is executed on mongoS. + { + sourceColl: () => sourceSharded, + targetColl: () => targetSharded, + preMergePipeline: [{$group: {_id: "$_id"}}], + allowDiskUse: false, + disableExchange: true + }, + // $group, exchange prohibited, $merge sent to single shard and writes cross-shard. + { + sourceColl: () => sourceSharded, + targetColl: () => targetSharded, + preMergePipeline: [{$group: {_id: "$_id"}}], + allowDiskUse: true, + disableExchange: true + }, +]; + +// The 'whenMatched' pipeline to apply as part of the $merge. When the old 4.2.1 behaviour is in +// effect, output documents will all have an _id field and the field added by this pipeline. +const mergePipe = [{$addFields: {docWasGeneratedFromWhenMatchedPipeline: true}}]; + +// Generate the array of output documents we expect to see under the old upsert behaviour. +const expectedOldBehaviourOutput = Array.from(sourceSharded.find().toArray(), (doc) => { + return {_id: doc._id, docWasGeneratedFromWhenMatchedPipeline: true}; +}); + +for (let testCaseNum = 0; testCaseNum < testCases.length; ++testCaseNum) { + // Perform initial test-case setup. Disable the exchange optimization if appropriate. + const testCase = testCases[testCaseNum]; + assert.commandWorked(mongosDB.adminCommand( + {setParameter: 1, internalQueryDisableExchange: testCase.disableExchange})); + + // Construct the options object that will be supplied along with the pipeline. + const aggOptions = {allowDiskUse: testCase.allowDiskUse}; + + // Construct the final pipeline by appending $merge to the the testCase's preMergePipeline. + const finalPipeline = testCase.preMergePipeline.concat([{ + $merge: { + into: testCase.targetColl().getName(), + whenMatched: mergePipe, + whenNotMatched: "insert" + } + }]); + + // Run a $merge with the whole cluster on 'preBackport42Version' and confirm that the output + // documents are produced using the old upsert behaviour. + tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); + assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput); + assert.commandWorked(testCase.targetColl().remove({})); + + // Upgrade a single shard to latest but leave the mongoS on 'preBackport42Version'. The upgraded + // shard continues to produce upsert requests that are compatible with the pre-backport shards. + refreshCluster(latestVersion, null, st.rs1); + tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); + assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput); + assert.commandWorked(testCase.targetColl().remove({})); + + // Upgrade the configs and the remaining shard to latest but leave mongoS on pre-backport 4.2. + // The shards continue to produce upsert requests that use the pre-backport behaviour. This is + // to ensure that the pipeline produces the same behaviour regardless of whether $merge is + // pushed down to the shards or run on the mongoS itself. + refreshCluster(latestVersion, {upgradeShards: true, upgradeConfigs: true}); + tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); + assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput); + assert.commandWorked(testCase.targetColl().remove({})); + + // Upgrade the mongoS to latest. We should now see that the $merge adopts the new behaviour, and + // inserts the exact source document rather than generating one from the whenMatched pipeline. + refreshCluster(latestVersion, {upgradeMongos: true}); + tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions); + assert.sameMembers(testCase.targetColl().find().toArray(), + testCase.sourceColl().find().toArray()); + assert.commandWorked(testCase.targetColl().remove({})); + + // Finally, downgrade the cluster to pre-backport 4.2 in preparation for the next test case. No + // need to do this after the final test, as it will simply extend the runtime for no reason. + if (testCaseNum < testCases.length - 1) { + refreshCluster(preBackport42Version, + {upgradeMongos: true, upgradeShards: true, upgradeConfigs: true}); + } +} + +st.stop(); +})();
\ No newline at end of file diff --git a/jstests/multiVersion/agg_merge_upsert_supplied_replset.js b/jstests/multiVersion/agg_merge_upsert_supplied_replset.js new file mode 100644 index 00000000000..2ca233e8e48 --- /dev/null +++ b/jstests/multiVersion/agg_merge_upsert_supplied_replset.js @@ -0,0 +1,90 @@ +/** + * Tests that $merge with {whenMatched: [], whenNotMatched: 'insert'} is handled correctly during + * upgrade from and downgrade to a pre-backport version of 4.2 on a single replica set. + */ +(function() { +"use strict"; + +load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet. +load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + +const preBackport42Version = "4.2.1"; +const latestVersion = "latest"; + +const rst = new ReplSetTest({ + nodes: 3, + nodeOptions: {binVersion: preBackport42Version}, +}); +if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; +} +rst.initiate(); + +// Obtain references to the test database and create the test collection. +let testDB = rst.getPrimary().getDB(jsTestName()); +let sourceColl = testDB.source_coll; +let targetColl = testDB.target_coll; + +// Up- or downgrades the replset and then refreshes our references to the test collection. +function refreshReplSet(version, secondariesOnly) { + // Upgrade the set and wait for it to become available again. + if (secondariesOnly) { + rst.upgradeSecondaries(rst.getPrimary(), {binVersion: version}); + } else { + rst.upgradeSet({binVersion: version}); + } + rst.awaitSecondaryNodes(); + + // Having upgraded the set, reacquire references to the db and collection. + testDB = rst.getPrimary().getDB(jsTestName()); + sourceColl = testDB.source_coll; + targetColl = testDB.target_coll; +} + +// Insert a set of test data. +for (let i = -20; i < 20; ++i) { + assert.commandWorked(sourceColl.insert({_id: i})); +} + +// The 'whenMatched' pipeline to apply as part of the $merge. When the old 4.2.1 behaviour is in +// effect, output documents will all have an _id field and the field added by this pipeline. +const mergePipe = [{$addFields: {docWasGeneratedFromWhenMatchedPipeline: true}}]; + +// Generate the array of output documents we expect to see under the old upsert behaviour. +const expectedOldBehaviourOutput = Array.from(sourceColl.find().toArray(), (doc) => { + return {_id: doc._id, docWasGeneratedFromWhenMatchedPipeline: true}; +}); + +// The pipeline to run for each test. Results in different output depending on upsert mode used. +const finalPipeline = + [{$merge: {into: targetColl.getName(), whenMatched: mergePipe, whenNotMatched: "insert"}}]; + +// Run a $merge with the whole cluster on 'preBackport42Version' and confirm that the output +// documents are produced using the old upsert behaviour. +sourceColl.aggregate(finalPipeline); +assert.sameMembers(targetColl.find().toArray(), expectedOldBehaviourOutput); +assert.commandWorked(targetColl.remove({})); + +// Upgrade the Secondaries but leave the Primary on 'preBackport42Version'. The set continues to +// produce output documents using the old upsert behaviour. +refreshReplSet(latestVersion, true); +sourceColl.aggregate(finalPipeline); +assert.sameMembers(targetColl.find().toArray(), expectedOldBehaviourOutput); +assert.commandWorked(targetColl.remove({})); + +// Since we cannot run $merge on a Secondary, we cannot end up in a situation where an upgraded +// Secondary issues an 'upsertSupplied' request to the pre-backport Primary. +assert.throws( + () => rst.getSecondaries()[0].getCollection(sourceColl.getFullName()).aggregate(finalPipeline)); + +// Upgrade the Primary to latest. We should now see that the $merge adopts the new behaviour, and +// inserts the exact source document rather than generating one from the whenMatched pipeline. +refreshReplSet(latestVersion); +sourceColl.aggregate(finalPipeline); +assert.sameMembers(targetColl.find().toArray(), sourceColl.find().toArray()); +assert.commandWorked(targetColl.remove({})); + +rst.stopSet(); +})();
\ No newline at end of file diff --git a/jstests/sharding/upsert_sharded.js b/jstests/sharding/upsert_sharded.js index 32a59b9a586..41b873db084 100644 --- a/jstests/sharding/upsert_sharded.js +++ b/jstests/sharding/upsert_sharded.js @@ -5,33 +5,46 @@ (function() { 'use strict'; -var st = new ShardingTest({shards: 2, mongos: 1}); +const st = new ShardingTest({shards: 2, mongos: 1}); -var mongos = st.s0; -var admin = mongos.getDB("admin"); -var coll = mongos.getCollection("foo.bar"); +const mongos = st.s0; +const admin = mongos.getDB("admin"); +const coll = mongos.getCollection("foo.bar"); assert(admin.runCommand({enableSharding: coll.getDB() + ""}).ok); st.ensurePrimaryShard(coll.getDB().getName(), st.shard1.shardName); -var upsertedResult = function(query, expr) { - coll.remove({}); - return coll.update(query, expr, {upsert: true}); +const upsertSuppliedResult = function(upsertColl, query, newDoc) { + assert.commandWorked(upsertColl.remove({})); + return coll.runCommand({ + update: coll.getName(), + updates: [{ + q: query, + u: [{$addFields: {unused: true}}], + c: {new: newDoc}, + upsert: true, + upsertSupplied: true + }] + }); }; -var upsertedField = function(query, expr, fieldName) { - assert.writeOK(upsertedResult(query, expr)); - return coll.findOne()[fieldName]; +const upsertedResult = function(upsertColl, query, expr) { + assert.commandWorked(upsertColl.remove({})); + return upsertColl.update(query, expr, {upsert: true}); }; -var upsertedId = function(query, expr) { - return upsertedField(query, expr, "_id"); +const upsertedField = function(upsertColl, query, expr, fieldName) { + assert.commandWorked(upsertedResult(upsertColl, query, expr)); + return upsertColl.findOne()[fieldName]; }; -var upsertedXVal = function(query, expr) { - return upsertedField(query, expr, "x"); +const upsertedXVal = function(upsertColl, query, expr) { + return upsertedField(upsertColl, query, expr, "x"); }; +// +// Tests for non-nested shard key. +// st.ensurePrimaryShard(coll.getDB() + "", st.shard0.shardName); assert.commandWorked(admin.runCommand({shardCollection: coll + "", key: {x: 1}})); assert.commandWorked(admin.runCommand({split: coll + "", middle: {x: 0}})); @@ -40,44 +53,59 @@ assert.commandWorked(admin.runCommand( st.printShardingStatus(); -// upserted update replacement would result in no shard key -assert.writeError(upsertedResult({x: 1}, {})); +// Upserted replacement fails if it would result in no shard key. +assert.commandFailedWithCode(upsertedResult(coll, {x: -1}, {_id: 1}), ErrorCodes.ShardKeyNotFound); + +// Upserted with supplied document fails if it would result in no shard key. +assert.commandFailedWithCode(upsertSuppliedResult(coll, {x: 1}, {_id: 1}), ErrorCodes.NoSuchKey); + +// Upserted op style update will propagate shard key by default. +assert.commandWorked(upsertedResult(coll, {x: -1}, {$set: {_id: 1}})); +assert.docEq(coll.findOne({}), {_id: 1, x: -1}); -// updates with upsert must contain shard key in query when $op style -assert.eq(1, upsertedXVal({x: 1}, {$set: {a: 1}})); -assert.eq(1, upsertedXVal({x: {$eq: 1}}, {$set: {a: 1}})); -assert.eq(1, upsertedXVal({x: {$all: [1]}}, {$set: {a: 1}})); -assert.eq(1, upsertedXVal({x: {$in: [1]}}, {$set: {a: 1}})); -assert.eq(1, upsertedXVal({$and: [{x: {$eq: 1}}]}, {$set: {a: 1}})); -assert.eq(1, upsertedXVal({$or: [{x: {$eq: 1}}]}, {$set: {a: 1}})); +// Updates with upsert must contain shard key in query when $op style +assert.eq(1, upsertedXVal(coll, {x: 1}, {$set: {a: 1}})); +assert.eq(1, upsertedXVal(coll, {x: {$eq: 1}}, {$set: {a: 1}})); +assert.eq(1, upsertedXVal(coll, {x: {$all: [1]}}, {$set: {a: 1}})); +assert.eq(1, upsertedXVal(coll, {x: {$in: [1]}}, {$set: {a: 1}})); +assert.eq(1, upsertedXVal(coll, {$and: [{x: {$eq: 1}}]}, {$set: {a: 1}})); +assert.eq(1, upsertedXVal(coll, {$or: [{x: {$eq: 1}}]}, {$set: {a: 1}})); // Missing shard key in query. -assert.commandFailedWithCode(upsertedResult({}, {$set: {a: 1, x: 1}}), ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {}, {$set: {a: 1, x: 1}}), + ErrorCodes.ShardKeyNotFound); // Missing equality match on shard key in query. -assert.commandFailedWithCode(upsertedResult({x: {$gt: 10}}, {$set: {a: 1, x: 5}}), +assert.commandFailedWithCode(upsertedResult(coll, {x: {$gt: 10}}, {$set: {a: 1, x: 5}}), ErrorCodes.ShardKeyNotFound); // Regex shard key value in query is ambigious and cannot be extracted for an equality match. -assert.commandFailedWithCode(upsertedResult({x: {$eq: /abc*/}}, {$set: {a: 1, x: "regexValue"}}), - ErrorCodes.ShardKeyNotFound); -assert.commandFailedWithCode(upsertedResult({x: {$eq: /abc/}}, {$set: {a: 1, x: /abc/}}), +assert.commandFailedWithCode( + upsertedResult(coll, {x: {$eq: /abc*/}}, {$set: {a: 1, x: "regexValue"}}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {x: {$eq: /abc/}}, {$set: {a: 1, x: /abc/}}), ErrorCodes.ShardKeyNotFound); // Shard key in query not extractable. -assert.commandFailedWithCode(upsertedResult({x: undefined}, {$set: {a: 1}}), ErrorCodes.BadValue); -assert.commandFailedWithCode(upsertedResult({x: [1, 2]}, {$set: {a: 1}}), +assert.commandFailedWithCode(upsertedResult(coll, {x: undefined}, {$set: {a: 1}}), + ErrorCodes.BadValue); +assert.commandFailedWithCode(upsertedResult(coll, {x: [1, 2]}, {$set: {a: 1}}), ErrorCodes.ShardKeyNotFound); -assert.commandFailedWithCode(upsertedResult({x: {$eq: {$gt: 5}}}, {$set: {a: 1}}), +assert.commandFailedWithCode(upsertedResult(coll, {x: {$eq: {$gt: 5}}}, {$set: {a: 1}}), ErrorCodes.ShardKeyNotFound); -// nested field extraction always fails with non-nested key - like _id, we require setting the +// Nested field extraction always fails with non-nested key - like _id, we require setting the // elements directly -assert.writeError(upsertedResult({"x.x": 1}, {$set: {a: 1}})); -assert.writeError(upsertedResult({"x.x": {$eq: 1}}, {$set: {a: 1}})); +assert.commandFailedWithCode(upsertedResult(coll, {"x.x": 1}, {$set: {a: 1}}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {"x.x": {$eq: 1}}, {$set: {a: 1}}), + ErrorCodes.ShardKeyNotFound); coll.drop(); +// +// Tests for nested shard key. +// st.ensurePrimaryShard(coll.getDB() + "", st.shard0.shardName); assert.commandWorked(admin.runCommand({shardCollection: coll + "", key: {'x.x': 1}})); assert.commandWorked(admin.runCommand({split: coll + "", middle: {'x.x': 0}})); @@ -86,24 +114,160 @@ assert.commandWorked(admin.runCommand( st.printShardingStatus(); -// nested field extraction with nested shard key -assert.docEq({x: 1}, upsertedXVal({"x.x": 1}, {$set: {a: 1}})); -assert.docEq({x: 1}, upsertedXVal({"x.x": {$eq: 1}}, {$set: {a: 1}})); -assert.docEq({x: 1}, upsertedXVal({"x.x": {$all: [1]}}, {$set: {a: 1}})); -assert.docEq({x: 1}, upsertedXVal({$and: [{"x.x": {$eq: 1}}]}, {$set: {a: 1}})); -assert.docEq({x: 1}, upsertedXVal({$or: [{"x.x": {$eq: 1}}]}, {$set: {a: 1}})); +// Upserted replacement update fails if it result in no shard key with nested shard key. +assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {_id: 1}), + ErrorCodes.ShardKeyNotFound); + +// Upserted with supplied document fails if it result in no shard key with nested shard key. +assert.commandFailedWithCode(upsertSuppliedResult(coll, {"x.x": -1}, {_id: 1}), + ErrorCodes.NoSuchKey); + +// Upserted op style update will propagate shard key by default with nested shard key. +assert.commandWorked(upsertedResult(coll, {"x.x": -1}, {$set: {_id: 1}})); +assert.docEq(coll.findOne({}), {_id: 1, x: {x: -1}}); + +// Nested field extraction with nested shard key +assert.docEq({x: 1}, upsertedXVal(coll, {"x.x": 1}, {$set: {a: 1}})); +assert.docEq({x: 1}, upsertedXVal(coll, {"x.x": {$eq: 1}}, {$set: {a: 1}})); +assert.docEq({x: 1}, upsertedXVal(coll, {"x.x": {$all: [1]}}, {$set: {a: 1}})); +assert.docEq({x: 1}, upsertedXVal(coll, {$and: [{"x.x": {$eq: 1}}]}, {$set: {a: 1}})); +assert.docEq({x: 1}, upsertedXVal(coll, {$or: [{"x.x": {$eq: 1}}]}, {$set: {a: 1}})); // Can specify siblings of nested shard keys -assert.docEq({x: 1, y: 1}, upsertedXVal({"x.x": 1, "x.y": 1}, {$set: {a: 1}})); -assert.docEq({x: 1, y: {z: 1}}, upsertedXVal({"x.x": 1, "x.y.z": 1}, {$set: {a: 1}})); +assert.docEq({x: 1, y: 1}, upsertedXVal(coll, {"x.x": 1, "x.y": 1}, {$set: {a: 1}})); +assert.docEq({x: 1, y: {z: 1}}, upsertedXVal(coll, {"x.x": 1, "x.y.z": 1}, {$set: {a: 1}})); + +// No arrays at any level for targeting. +assert.commandFailedWithCode(upsertedResult(coll, {"x.x": []}, {$set: {a: 1}}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {x: {x: []}}, {$set: {a: 1}}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {x: [{x: 1}]}, {$set: {a: 1}}), + ErrorCodes.ShardKeyNotFound); + +// No arrays at any level for document insertion for replacement, supplied, and op updates. +assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {$set: {x: {x: []}}}), + ErrorCodes.NotSingleValueField); +assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {$set: {x: [{x: 1}]}}), + ErrorCodes.NotSingleValueField); + +assert.commandFailedWithCode(upsertSuppliedResult(coll, {"x.x": -1}, {x: {x: []}}), + ErrorCodes.NotSingleValueField); +assert.commandFailedWithCode(upsertSuppliedResult(coll, {"x.x": -1}, {x: [{x: 1}]}), + ErrorCodes.NotSingleValueField); -// No arrays at any level -assert.writeError(upsertedResult({"x.x": []}, {$set: {a: 1}})); -assert.writeError(upsertedResult({x: {x: []}}, {$set: {a: 1}})); -assert.writeError(upsertedResult({x: [{x: 1}]}, {$set: {a: 1}})); +assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {x: {x: []}}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {x: [{x: 1}]}), + ErrorCodes.ShardKeyNotFound); // Can't set sub-fields of nested key -assert.writeError(upsertedResult({"x.x.x": {$eq: 1}}, {$set: {a: 1}})); +assert.commandFailedWithCode(upsertedResult(coll, {"x.x.x": {$eq: 1}}, {$set: {a: 1}}), + ErrorCodes.ShardKeyNotFound); + +coll.drop(); + +// +// Tests for nested _id shard key. +// +st.ensurePrimaryShard(coll.getDB() + "", st.shard0.shardName); +assert.commandWorked(admin.runCommand({shardCollection: coll + "", key: {'_id.x': 1}})); +assert.commandWorked(admin.runCommand({split: coll + "", middle: {'_id.x': 0}})); +assert.commandWorked(admin.runCommand( + {moveChunk: coll + "", find: {'_id.x': 0}, to: st.shard1.shardName, _waitForDelete: true})); + +st.printShardingStatus(); + +// No upsert type can result in a missing shard key for nested _id key. +assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1}}, {}), ErrorCodes.ShardKeyNotFound); + +assert.commandWorked(upsertSuppliedResult(coll, {_id: {x: -1}}, {})); +assert.docEq(coll.findOne({}), {_id: {x: -1}}); + +assert.commandWorked(upsertedResult(coll, {_id: {x: -1}}, {$set: {y: 1}})); +assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1}); + +assert.commandFailedWithCode( + upsertedResult(coll, {_id: {x: -1}}, {$set: {y: 1}, $unset: {"_id.x": 1}}), + ErrorCodes.ImmutableField); + +// All update types can re-state shard key for nested _id key. +assert.commandWorked(upsertedResult(coll, {_id: {x: -1}}, {_id: {x: -1}, y: 1})); +assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1}); + +assert.commandWorked(upsertSuppliedResult(coll, {_id: {x: -1}}, {_id: {x: -1}, y: 1})); +assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1}); + +assert.commandWorked(upsertedResult(coll, {_id: {x: -1}}, {$set: {_id: {x: -1}, y: 1}})); +assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1}); + +assert.commandWorked(upsertedResult(coll, {_id: {x: -1}}, {$set: {"_id.x": -1, y: 1}})); +assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1}); + +// No upsert type can modify shard key for nested _id key. +assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1}}, {_id: {x: -2}}), + ErrorCodes.ImmutableField); + +assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: {x: -1}}, {_id: {x: -2}}), + ErrorCodes.ImmutableField); + +assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1}}, {$set: {_id: {x: -2}}}), + ErrorCodes.ImmutableField); + +// No upsert type can add new _id subfield for nested _id key. +assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1}}, {_id: {x: -1, y: -1}}), + ErrorCodes.ImmutableField); + +assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: {x: -1}}, {_id: {x: -1, y: -1}}), + ErrorCodes.ImmutableField); + +assert.commandFailedWithCode( + upsertedResult(coll, {_id: {x: -1}}, {$set: {"_id.x": -1, "_id.y": -1}}), + ErrorCodes.ImmutableField); + +// No upsert type can remove non-shardkey _id subfield for nested _id key. +assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1, y: -1}}, {_id: {x: -1}}), + ErrorCodes.ImmutableField); + +assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: {x: -1, y: -1}}, {_id: {x: -1}}), + ErrorCodes.ImmutableField); + +assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1, y: -1}}, {$unset: {"_id.y": 1}}), + ErrorCodes.ImmutableField); + +// No upsert type can set array element for nested _id key. +assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: [1]}}, {}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {"_id.x": [1]}, {}), ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {_id: [{x: 1}]}, {}), + ErrorCodes.ShardKeyNotFound); + +assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: {x: [1]}}, {}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertSuppliedResult(coll, {"_id.x": [1]}, {}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: [{x: 1}]}, {}), + ErrorCodes.ShardKeyNotFound); + +assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: [1]}}, {$set: {y: 1}}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {"_id.x": [1]}, {$set: {y: 1}}), + ErrorCodes.ShardKeyNotFound); +assert.commandFailedWithCode(upsertedResult(coll, {_id: [{x: 1}]}, {$set: {y: 1}}), + ErrorCodes.ShardKeyNotFound); + +// Replacement and op-style {$set _id} fail when using dotted-path query on nested _id key. +assert.commandFailedWithCode(upsertedResult(coll, {"_id.x": -1}, {_id: {x: -1}}), + ErrorCodes.NotExactValueField); + +assert.commandWorked(upsertSuppliedResult(coll, {"_id.x": -1}, {_id: {x: -1}})); +assert.docEq(coll.findOne({}), {_id: {x: -1}}); + +assert.commandFailedWithCode(upsertedResult(coll, {"_id.x": -1}, {$set: {_id: {x: -1}}}), + ErrorCodes.ImmutableField); + +assert.commandWorked(upsertedResult(coll, {"_id.x": -1}, {$set: {"_id.x": -1}})); +assert.docEq(coll.findOne({}), {_id: {x: -1}}); st.stop(); })(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index c473d43c3d5..1bb1d5ff863 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1239,6 +1239,7 @@ env.Library( 'exec/text_or.cpp', 'exec/trial_stage.cpp', 'exec/update_stage.cpp', + 'exec/upsert_stage.cpp', 'exec/working_set_common.cpp', 'exec/write_stage_common.cpp', 'ops/parsed_delete.cpp', diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 422b19f6be0..7743d73d695 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -368,6 +368,7 @@ private: updateRequest.setArrayFilters(write_ops::arrayFiltersOf(_batch.getUpdates()[0])); updateRequest.setMulti(_batch.getUpdates()[0].getMulti()); updateRequest.setUpsert(_batch.getUpdates()[0].getUpsert()); + updateRequest.setUpsertSuppliedDocument(_batch.getUpdates()[0].getUpsertSupplied()); updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); updateRequest.setHint(_batch.getUpdates()[0].getHint()); updateRequest.setExplain(); diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index a7b307bf49c..10696c6cb65 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -39,7 +39,6 @@ #include "mongo/bson/bson_comparator_interface_base.h" #include "mongo/bson/mutable/algorithm.h" #include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/exec/write_stage_common.h" @@ -57,11 +56,9 @@ #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" -#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" namespace mongo { -MONGO_FAIL_POINT_DEFINE(hangBeforeUpsertPerformsInsert); MONGO_FAIL_POINT_DEFINE(hangBeforeThrowWouldChangeOwningShard); using std::string; @@ -76,59 +73,13 @@ namespace { const char idFieldName[] = "_id"; const FieldRef idFieldRef(idFieldName); -Status ensureIdFieldIsFirst(mb::Document* doc) { - mb::Element idElem = mb::findFirstChildNamed(doc->root(), idFieldName); - - if (!idElem.ok()) { - return {ErrorCodes::InvalidIdField, "_id field is missing"}; - } - - if (idElem.leftSibling().ok()) { - // Move '_id' to be the first element - Status s = idElem.remove(); - if (!s.isOK()) - return s; - s = doc->root().pushFront(idElem); - if (!s.isOK()) - return s; - } - - return Status::OK(); -} - void addObjectIDIdField(mb::Document* doc) { const auto idElem = doc->makeElementNewOID(idFieldName); - if (!idElem.ok()) - uasserted(17268, "Could not create new ObjectId '_id' field."); - + uassert(17268, "Could not create new ObjectId '_id' field.", idElem.ok()); uassertStatusOK(doc->root().pushFront(idElem)); } /** - * Uasserts if any of the paths in 'requiredPaths' are not present in 'document', or if they are - * arrays or array descendants. - */ -void assertRequiredPathsPresent(const mb::Document& document, const FieldRefSet& requiredPaths) { - for (const auto& path : requiredPaths) { - auto elem = document.root(); - for (size_t i = 0; i < (*path).numParts(); ++i) { - elem = elem[(*path).getPart(i)]; - uassert(ErrorCodes::NoSuchKey, - str::stream() << "After applying the update, the new document was missing the " - "required field '" - << (*path).dottedField() << "'", - elem.ok()); - uassert( - ErrorCodes::NotSingleValueField, - str::stream() << "After applying the update to the document, the required field '" - << (*path).dottedField() - << "' was found to be an array or array descendant.", - elem.getType() != BSONType::Array); - } - } -} - -/** * Returns true if we should throw a WriteConflictException in order to retry the operation in the * case of a conflict. Returns false if we should skip the document and keep going. */ @@ -158,19 +109,30 @@ const char* UpdateStage::kStageType = "UPDATE"; const UpdateStats UpdateStage::kEmptyUpdateStats; +// Public constructor. UpdateStage::UpdateStage(OperationContext* opCtx, const UpdateStageParams& params, WorkingSet* ws, Collection* collection, PlanStage* child) + : UpdateStage(opCtx, params, ws, collection) { + // We should never reach here if the request is an upsert. + invariant(!_params.request->isUpsert()); + _children.emplace_back(child); +} + +// Protected constructor. +UpdateStage::UpdateStage(OperationContext* opCtx, + const UpdateStageParams& params, + WorkingSet* ws, + Collection* collection) : RequiresMutableCollectionStage(kStageType, opCtx, collection), _params(params), _ws(ws), + _doc(params.driver->getDocument()), _idRetrying(WorkingSet::INVALID_ID), _idReturning(WorkingSet::INVALID_ID), - _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : NULL), - _doc(params.driver->getDocument()) { - _children.emplace_back(child); + _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : nullptr) { // Should the modifiers validate their embedded docs via storage_validation::storageValid()? // Only user updates should be checked. Any system or replication stuff should pass through. @@ -226,9 +188,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco if (getOpCtx()->writesAreReplicated() && !request->isFromMigration()) { if (metadata->isSharded() && (!OperationShardingState::isOperationVersioned(getOpCtx()) || !isFCV42)) { - auto& immutablePathsVector = metadata->getKeyPatternFields(); - immutablePaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(immutablePathsVector)); + immutablePaths.fillFrom(metadata->getKeyPatternFields()); } immutablePaths.keepShortest(&idFieldRef); } @@ -270,16 +230,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco // neither grow nor shrink). const auto createIdField = !collection()->isCapped(); - // Ensure if _id exists it is first - status = ensureIdFieldIsFirst(&_doc); - if (status.code() == ErrorCodes::InvalidIdField) { - // Create ObjectId _id field if we are doing that - if (createIdField) { - addObjectIDIdField(&_doc); - } - } else { - uassertStatusOK(status); - } + // Ensure _id is first if it exists, and generate a new OID if appropriate. + _ensureIdFieldIsFirst(&_doc, createIdField); // See if the changes were applied in place const char* source = NULL; @@ -394,99 +346,6 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco return newObj; } -BSONObj UpdateStage::applyUpdateOpsForInsert(OperationContext* opCtx, - const CanonicalQuery* cq, - const BSONObj& query, - UpdateDriver* driver, - mutablebson::Document* doc, - bool isInternalRequest, - const NamespaceString& ns, - bool enforceOkForStorage, - UpdateStats* stats) { - // Since this is an insert (no docs found and upsert:true), we will be logging it - // as an insert in the oplog. We don't need the driver's help to build the - // oplog record, then. We also set the context of the update driver to the INSERT_CONTEXT. - // Some mods may only work in that context (e.g. $setOnInsert). - driver->setLogOp(false); - - auto* const css = CollectionShardingState::get(opCtx, ns); - auto metadata = css->getCurrentMetadata(); - - const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() && - serverGlobalParams.featureCompatibility.getVersion() == - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42; - - FieldRefSet immutablePaths; - if (metadata->isSharded() && - (!OperationShardingState::isOperationVersioned(opCtx) || !isFCV42)) { - auto& immutablePathsVector = metadata->getKeyPatternFields(); - immutablePaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(immutablePathsVector)); - } - immutablePaths.keepShortest(&idFieldRef); - - if (cq) { - FieldRefSet requiredPaths; - if (metadata->isSharded()) { - const auto& shardKeyPathsVector = metadata->getKeyPatternFields(); - requiredPaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector)); - } - requiredPaths.keepShortest(&idFieldRef); - uassertStatusOK(driver->populateDocumentWithQueryFields(*cq, requiredPaths, *doc)); - } else { - fassert(17354, CanonicalQuery::isSimpleIdQuery(query)); - BSONElement idElt = query[idFieldName]; - fassert(17352, doc->root().appendElement(idElt)); - } - - // Apply the update modifications here. Do not validate for storage, since we will validate the - // entire document after the update. However, we ensure that no immutable fields are updated. - const bool validateForStorage = false; - const bool isInsert = true; - if (isInternalRequest) { - immutablePaths.clear(); - } - Status updateStatus = - driver->update(StringData(), doc, validateForStorage, immutablePaths, isInsert); - if (!updateStatus.isOK()) { - uasserted(16836, updateStatus.reason()); - } - - // Ensure _id exists and is first - auto idAndFirstStatus = ensureIdFieldIsFirst(doc); - if (idAndFirstStatus.code() == ErrorCodes::InvalidIdField) { // _id field is missing - addObjectIDIdField(doc); - } else { - uassertStatusOK(idAndFirstStatus); - } - - // Validate that the object replacement or modifiers resulted in a document - // that contains all the required keys and can be stored if it isn't coming - // from a migration or via replication. - if (!isInternalRequest) { - if (enforceOkForStorage) { - storage_validation::storageValid(*doc); - } - FieldRefSet requiredPaths; - if (metadata->isSharded()) { - const auto& shardKeyPathsVector = metadata->getKeyPatternFields(); - requiredPaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector)); - } - requiredPaths.keepShortest(&idFieldRef); - assertRequiredPathsPresent(*doc, requiredPaths); - } - - BSONObj newObj = doc->getObject(); - if (newObj.objsize() > BSONObjMaxUserSize) { - uasserted(17420, - str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize); - } - - return newObj; -} - bool UpdateStage::matchContainsOnlyAndedEqualityNodes(const MatchExpression& root) { if (root.matchType() == MatchExpression::EQ) { return true; @@ -563,131 +422,18 @@ bool UpdateStage::shouldRetryDuplicateKeyException(const ParsedUpdate& parsedUpd return true; } -void UpdateStage::doInsert() { - _specificStats.inserted = true; - - const UpdateRequest* request = _params.request; - bool isInternalRequest = !getOpCtx()->writesAreReplicated() || request->isFromMigration(); - - // Reset the document we will be writing to. - _doc.reset(); - - BSONObj newObj = applyUpdateOpsForInsert(getOpCtx(), - _params.canonicalQuery, - request->getQuery(), - _params.driver, - &_doc, - isInternalRequest, - request->getNamespaceString(), - _enforceOkForStorage, - &_specificStats); - - _specificStats.objInserted = newObj; - - // If this is an explain, bail out now without doing the insert. - if (request->isExplain()) { - return; - } - - // If in FCV 4.2 and this collection is sharded, check if the doc we plan to insert belongs to - // this shard. MongoS uses the query field to target a shard, and it is possible the shard key - // fields in the 'q' field belong to this shard, but those in the 'u' field do not. In this case - // we need to throw so that MongoS can target the insert to the correct shard. - if (_shouldCheckForShardKeyUpdate) { - const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() && - serverGlobalParams.featureCompatibility.getVersion() == - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42; - auto* const css = CollectionShardingState::get(getOpCtx(), collection()->ns()); - const auto& metadata = css->getCurrentMetadata(); - - if (isFCV42 && metadata->isSharded()) { - const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); - auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj); - - if (!metadata->keyBelongsToMe(newShardKey)) { - // An attempt to upsert a document with a shard key value that belongs on another - // shard must either be a retryable write or inside a transaction. - uassert(ErrorCodes::IllegalOperation, - "The upsert document could not be inserted onto the shard targeted by the " - "query, since its shard key belongs on a different shard. Cross-shard " - "upserts are only allowed when running in a transaction or with " - "retryWrites: true.", - getOpCtx()->getTxnNumber()); - uasserted( - WouldChangeOwningShardInfo(request->getQuery(), newObj, true /* upsert */), - "The document we are inserting belongs on a different shard"); - } - } - } - - if (MONGO_FAIL_POINT(hangBeforeUpsertPerformsInsert)) { - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &hangBeforeUpsertPerformsInsert, getOpCtx(), "hangBeforeUpsertPerformsInsert"); - } - - writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] { - WriteUnitOfWork wunit(getOpCtx()); - uassertStatusOK(collection()->insertDocument(getOpCtx(), - InsertStatement(request->getStmtId(), newObj), - _params.opDebug, - request->isFromMigration())); - - // Technically, we should save/restore state here, but since we are going to return - // immediately after, it would just be wasted work. - wunit.commit(); - }); -} - -bool UpdateStage::doneUpdating() { +bool UpdateStage::isEOF() { // We're done updating if either the child has no more results to give us, or we've // already gotten a result back and we're not a multi-update. return _idRetrying == WorkingSet::INVALID_ID && _idReturning == WorkingSet::INVALID_ID && (child()->isEOF() || (_specificStats.nMatched > 0 && !_params.request->isMulti())); } -bool UpdateStage::needInsert() { - // We need to insert if - // 1) we haven't inserted already, - // 2) the child stage returned zero matches, and - // 3) the user asked for an upsert. - return !_specificStats.inserted && _specificStats.nMatched == 0 && _params.request->isUpsert(); -} - -bool UpdateStage::isEOF() { - return doneUpdating() && !needInsert(); -} - PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { if (isEOF()) { return PlanStage::IS_EOF; } - if (doneUpdating()) { - // Even if we're done updating, we may have some inserting left to do. - if (needInsert()) { - - doInsert(); - - invariant(isEOF()); - if (_params.request->shouldReturnNewDocs()) { - // Want to return the document we just inserted, create it as a WorkingSetMember - // so that we can return it. - BSONObj newObj = _specificStats.objInserted; - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - member->obj = Snapshotted<BSONObj>(getOpCtx()->recoveryUnit()->getSnapshotId(), - newObj.getOwned()); - member->transitionToOwnedObj(); - return PlanStage::ADVANCED; - } - } - - // At this point either we're done updating and there was no insert to do, - // or we're done updating and we're done inserting. Either way, we're EOF. - invariant(isEOF()); - return PlanStage::IS_EOF; - } - // It is possible that after an update was applied, a WriteConflictException // occurred and prevented us from returning ADVANCED with the requested version // of the document. @@ -834,9 +580,8 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { return PlanStage::NEED_TIME; } else if (PlanStage::IS_EOF == status) { - // The child is out of results, but we might not be done yet because we still might - // have to do an insert. - return PlanStage::NEED_TIME; + // The child is out of results, and therefore so are we. + return PlanStage::IS_EOF; } else if (PlanStage::FAILURE == status) { *out = id; // If a stage fails, it may create a status WSM to indicate why it failed, in which case @@ -855,6 +600,40 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { return status; } +void UpdateStage::_assertRequiredPathsPresent(const mb::Document& document, + const FieldRefSet& requiredPaths) { + for (const auto& path : requiredPaths) { + auto elem = document.root(); + for (size_t i = 0; i < (*path).numParts(); ++i) { + elem = elem[(*path).getPart(i)]; + uassert(ErrorCodes::NoSuchKey, + str::stream() << "After applying the update, the new document was missing the " + "required field '" + << (*path).dottedField() << "'", + elem.ok()); + uassert( + ErrorCodes::NotSingleValueField, + str::stream() << "After applying the update to the document, the required field '" + << (*path).dottedField() + << "' was found to be an array or array descendant.", + elem.getType() != BSONType::Array); + } + } +} + +void UpdateStage::_ensureIdFieldIsFirst(mb::Document* doc, bool generateOIDIfMissing) { + mb::Element idElem = mb::findFirstChildNamed(doc->root(), idFieldName); + + // If the document has no _id and the caller has requested that we generate one, do so. + if (!idElem.ok() && generateOIDIfMissing) { + addObjectIDIdField(doc); + } else if (idElem.ok() && idElem.leftSibling().ok()) { + // If the document does have an _id but it is not the first element, move it to the front. + uassertStatusOK(idElem.remove()); + uassertStatusOK(doc->root().pushFront(idElem)); + } +} + void UpdateStage::doRestoreStateRequiresCollection() { const UpdateRequest& request = *_params.request; const NamespaceString& nsString(request.getNamespaceString()); @@ -947,13 +726,11 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(ScopedCollectionMetadata meta return false; } - FieldRefSet shardKeyPaths; - const auto& shardKeyPathsVector = metadata->getKeyPatternFields(); - shardKeyPaths.fillFrom(transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector)); + FieldRefSet shardKeyPaths(metadata->getKeyPatternFields()); // Assert that the updated doc has all shard key fields and none are arrays or array // descendants. - assertRequiredPathsPresent(_doc, shardKeyPaths); + _assertRequiredPathsPresent(_doc, shardKeyPaths); // We do not allow modifying shard key value without specifying the full shard key in the query. // If the query is a simple equality match on _id, then '_params.canonicalQuery' will be null. @@ -968,7 +745,7 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(ScopedCollectionMetadata meta pathsupport::extractFullEqualityMatches( *(_params.canonicalQuery->root()), shardKeyPaths, &equalities) .isOK() && - equalities.size() == shardKeyPathsVector.size()); + equalities.size() == metadata->getKeyPatternFields().size()); // We do not allow updates to the shard key when 'multi' is true. uassert(ErrorCodes::InvalidOptions, diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h index 93936166544..f865fb9d162 100644 --- a/src/mongo/db/exec/update_stage.h +++ b/src/mongo/db/exec/update_stage.h @@ -69,14 +69,14 @@ private: }; /** - * Execution stage responsible for updates to documents and upserts. If the prior or - * newly-updated version of the document was requested to be returned, then ADVANCED is - * returned after updating or inserting a document. Otherwise, NEED_TIME is returned after - * updating or inserting a document. + * Execution stage responsible for updates to documents. If the prior or newly-updated version of + * the document was requested to be returned, then ADVANCED is returned after updating a document. + * Otherwise, NEED_TIME is returned after updating a document if further updates are pending, + * and IS_EOF is returned if no documents were found or all updates have been performed. * * Callers of doWork() must be holding a write lock. */ -class UpdateStage final : public RequiresMutableCollectionStage { +class UpdateStage : public RequiresMutableCollectionStage { UpdateStage(const UpdateStage&) = delete; UpdateStage& operator=(const UpdateStage&) = delete; @@ -87,8 +87,8 @@ public: Collection* collection, PlanStage* child); - bool isEOF() final; - StageState doWork(WorkingSetID* out) final; + bool isEOF() override; + StageState doWork(WorkingSetID* out) override; StageType stageType() const final { return STAGE_UPDATE; @@ -119,32 +119,6 @@ public: static UpdateResult makeUpdateResult(const UpdateStats* updateStats); /** - * Computes the document to insert if the upsert flag is set to true and no matching - * documents are found in the database. The document to upsert is computing using the - * query 'cq' and the update mods contained in 'driver'. - * - * If 'cq' is NULL, which can happen for the idhack update fast path, then 'query' is - * used to compute the doc to insert instead of 'cq'. - * - * 'doc' is the mutable BSON document which you would like the update driver to use - * when computing the document to insert. - * - * Set 'isInternalRequest' to true if the upsert was issued by the replication or - * sharding systems. - * - * Returns the document to insert. - */ - static BSONObj applyUpdateOpsForInsert(OperationContext* opCtx, - const CanonicalQuery* cq, - const BSONObj& query, - UpdateDriver* driver, - mutablebson::Document* doc, - bool isInternalRequest, - const NamespaceString& ns, - bool enforceOkForStorage, - UpdateStats* stats); - - /** * Returns true if an update failure due to a given DuplicateKey error is eligible for retry. * Requires that parsedUpdate.hasParsedQuery() is true. */ @@ -152,10 +126,38 @@ public: const DuplicateKeyErrorInfo& errorInfo); protected: + UpdateStage(OperationContext* opCtx, + const UpdateStageParams& params, + WorkingSet* ws, + Collection* collection); + void doSaveStateRequiresCollection() final {} void doRestoreStateRequiresCollection() final; + void _ensureIdFieldIsFirst(mutablebson::Document* doc, bool generateOIDIfMissing); + + void _assertRequiredPathsPresent(const mutablebson::Document& document, + const FieldRefSet& requiredPaths); + + UpdateStageParams _params; + + // Not owned by us. + WorkingSet* _ws; + + // Stats + UpdateStats _specificStats; + + // True if the request should be checked for an update to the shard key. + bool _shouldCheckForShardKeyUpdate; + + // True if updated documents should be validated with storage_validation::storageValid(). + bool _enforceOkForStorage; + + // These get reused for each update. + mutablebson::Document& _doc; + mutablebson::DamageVector _damages; + private: static const UpdateStats kEmptyUpdateStats; @@ -173,24 +175,6 @@ private: BSONObj transformAndUpdate(const Snapshotted<BSONObj>& oldObj, RecordId& recordId); /** - * Computes the document to insert and inserts it into the collection. Used if the - * user requested an upsert and no matching documents were found. - */ - void doInsert(); - - /** - * Have we performed all necessary updates? Even if this is true, we might not be EOF, - * as we might still have to do an insert. - */ - bool doneUpdating(); - - /** - * Examines the stats / update request and returns whether there is still an insert left - * to do. If so then this stage is not EOF yet. - */ - bool needInsert(); - - /** * Stores 'idToRetry' in '_idRetrying' so the update can be retried during the next call to * doWork(). Always returns NEED_YIELD and sets 'out' to WorkingSet::INVALID_ID. */ @@ -210,26 +194,12 @@ private: bool checkUpdateChangesShardKeyFields(ScopedCollectionMetadata metadata, const Snapshotted<BSONObj>& oldObj); - UpdateStageParams _params; - - // Not owned by us. - WorkingSet* _ws; - // If not WorkingSet::INVALID_ID, we use this rather than asking our child what to do next. WorkingSetID _idRetrying; // If not WorkingSet::INVALID_ID, we return this member to our caller. WorkingSetID _idReturning; - // Stats - UpdateStats _specificStats; - - // True if updated documents should be validated with storage_validation::storageValid(). - bool _enforceOkForStorage; - - // True if the request should be checked for an update to the shard key. - bool _shouldCheckForShardKeyUpdate; - // If the update was in-place, we may see it again. This only matters if we're doing // a multi-update; if we're not doing a multi-update we stop after one update and we // won't see any more docs. @@ -244,10 +214,6 @@ private: // So, no matter what, we keep track of where the doc wound up. typedef stdx::unordered_set<RecordId, RecordId::Hasher> RecordIdSet; const std::unique_ptr<RecordIdSet> _updatedRecordIds; - - // These get reused for each update. - mutablebson::Document& _doc; - mutablebson::DamageVector _damages; }; } // namespace mongo diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp new file mode 100644 index 00000000000..dadbaf522f7 --- /dev/null +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -0,0 +1,301 @@ +/** + * Copyright (C) 2019 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/exec/upsert_stage.h" + +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop_failpoint_helpers.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/update/storage_validation.h" +#include "mongo/s/would_change_owning_shard_exception.h" + +namespace mongo { + +MONGO_FAIL_POINT_DEFINE(hangBeforeUpsertPerformsInsert); + +namespace mb = mutablebson; + +namespace { + +const char idFieldName[] = "_id"; +const FieldRef idFieldRef(idFieldName); + +/** + * Populates the given FieldRefSets with the fields that are required to be present in the document + * and the fields which cannot be changed, respectively. + */ +void getRequiredAndImmutablePaths(OperationContext* opCtx, + const ScopedCollectionMetadata& metadata, + bool isInternalRequest, + FieldRefSet* requiredPaths, + FieldRefSet* immutablePaths) { + // Each document has a set of required paths and a potentially different set of immutable paths. + // If the collection is sharded, add the shard key fields to the required paths vector. + if (metadata->isSharded()) { + requiredPaths->fillFrom(metadata->getKeyPatternFields()); + } + // Add the _id field, replacing any existing paths that are prefixed by _id if present. + requiredPaths->keepShortest(&idFieldRef); + + // If this is an internal request, no fields are immutable and we leave 'immutablePaths' empty. + if (!isInternalRequest) { + // An unversioned request cannot update the shard key, so all required fields are immutable. + // The shard key is also immutable if we have not yet upgraded to 4.2 feature compatibility. + const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42; + if (!OperationShardingState::isOperationVersioned(opCtx) || !isFCV42) { + for (auto&& reqPath : *requiredPaths) { + immutablePaths->insert(reqPath); + } + } + // The _id field is always immutable to user requests, even if the shard key is mutable. + immutablePaths->keepShortest(&idFieldRef); + } +} +} // namespace + +UpsertStage::UpsertStage(OperationContext* opCtx, + const UpdateStageParams& params, + WorkingSet* ws, + Collection* collection, + PlanStage* child) + : UpdateStage(opCtx, params, ws, collection) { + // We should never create this stage for a non-upsert request. + invariant(_params.request->isUpsert()); + _children.emplace_back(child); +}; + +// We're done when updating is finished and we have either matched or inserted. +bool UpsertStage::isEOF() { + return UpdateStage::isEOF() && (_specificStats.nMatched > 0 || _specificStats.inserted); +} + +PlanStage::StageState UpsertStage::doWork(WorkingSetID* out) { + if (isEOF()) { + return StageState::IS_EOF; + } + + // First, attempt to perform the update on a matching document. + auto updateState = UpdateStage::doWork(out); + + // If the update returned anything other than EOF, just forward it along. There's a chance we + // still may find a document to update and will not have to insert anything. If it did return + // EOF and we do not need to insert a new document, return EOF immediately here. + if (updateState != PlanStage::IS_EOF || isEOF()) { + return updateState; + } + + // If the update resulted in EOF without matching anything, we must insert a new document. + invariant(updateState == PlanStage::IS_EOF && !isEOF()); + + // Since this is an insert, we will be logging it as such in the oplog. We don't need the + // driver's help to build the oplog record. We also set the 'inserted' stats flag here. + _params.driver->setLogOp(false); + _specificStats.inserted = true; + + // Determine whether this is a user-initiated or internal request. + const bool isInternalRequest = + !getOpCtx()->writesAreReplicated() || _params.request->isFromMigration(); + + // Generate the new document to be inserted. + _specificStats.objInserted = _produceNewDocumentForInsert(isInternalRequest); + + // If this is an explain, skip performing the actual insert. + if (!_params.request->isExplain()) { + _performInsert(_specificStats.objInserted); + } + + // We should always be EOF at this point. + invariant(isEOF()); + + // If we want to return the document we just inserted, create it as a WorkingSetMember. + if (_params.request->shouldReturnNewDocs()) { + BSONObj newObj = _specificStats.objInserted; + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = + Snapshotted<BSONObj>(getOpCtx()->recoveryUnit()->getSnapshotId(), newObj.getOwned()); + member->transitionToOwnedObj(); + return PlanStage::ADVANCED; + } + + // If we don't need to return the inserted document, we're done. + return PlanStage::IS_EOF; +} + +void UpsertStage::_performInsert(BSONObj newDocument) { + // If in FCV 4.2 and this collection is sharded, check if the doc we plan to insert belongs to + // this shard. MongoS uses the query field to target a shard, and it is possible the shard key + // fields in the 'q' field belong to this shard, but those in the 'u' field do not. In this case + // we need to throw so that MongoS can target the insert to the correct shard. + if (_shouldCheckForShardKeyUpdate) { + auto* const css = CollectionShardingState::get(getOpCtx(), collection()->ns()); + const auto& metadata = css->getCurrentMetadata(); + + const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42; + + if (isFCV42 && metadata->isSharded()) { + const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); + auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newDocument); + + if (!metadata->keyBelongsToMe(newShardKey)) { + // An attempt to upsert a document with a shard key value that belongs on another + // shard must either be a retryable write or inside a transaction. + uassert(ErrorCodes::IllegalOperation, + "The upsert document could not be inserted onto the shard targeted by the " + "query, since its shard key belongs on a different shard. Cross-shard " + "upserts are only allowed when running in a transaction or with " + "retryWrites: true.", + getOpCtx()->getTxnNumber()); + uasserted(WouldChangeOwningShardInfo( + _params.request->getQuery(), newDocument, true /* upsert */), + "The document we are inserting belongs on a different shard"); + } + } + } + + if (MONGO_unlikely(hangBeforeUpsertPerformsInsert.shouldFail())) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &hangBeforeUpsertPerformsInsert, getOpCtx(), "hangBeforeUpsertPerformsInsert"); + } + + writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] { + WriteUnitOfWork wunit(getOpCtx()); + uassertStatusOK( + collection()->insertDocument(getOpCtx(), + InsertStatement(_params.request->getStmtId(), newDocument), + _params.opDebug, + _params.request->isFromMigration())); + + // Technically, we should save/restore state here, but since we are going to return + // immediately after, it would just be wasted work. + wunit.commit(); + }); +} + +BSONObj UpsertStage::_produceNewDocumentForInsert(bool isInternalRequest) { + // Obtain the sharding metadata. This will be needed to compute the required paths. The metadata + // must remain in scope since it owns the pointers used by 'requiredPaths' and 'immutablePaths'. + auto* css = CollectionShardingState::get(getOpCtx(), _params.request->getNamespaceString()); + auto metadata = css->getCurrentMetadata(); + + // Each document has a set of required paths and a potentially different set of immutable paths. + FieldRefSet requiredPaths, immutablePaths; + getRequiredAndImmutablePaths( + getOpCtx(), metadata, isInternalRequest, &requiredPaths, &immutablePaths); + + // Reset the document into which we will be writing. + _doc.reset(); + + // First: populate the document's required paths with equality predicate values from the query, + // if available. This generates the pre-image document that we will run the update against. + if (auto* cq = _params.canonicalQuery) { + uassertStatusOK(_params.driver->populateDocumentWithQueryFields(*cq, requiredPaths, _doc)); + } else { + fassert(17354, CanonicalQuery::isSimpleIdQuery(_params.request->getQuery())); + fassert(17352, _doc.root().appendElement(_params.request->getQuery()[idFieldName])); + } + + // Second: run the appropriate document generation strategy over the document to generate the + // post-image. If the update operation modifies any of the immutable paths, this will throw. + if (_params.request->shouldUpsertSuppliedDocument()) { + _generateNewDocumentFromSuppliedDoc(immutablePaths); + } else { + _generateNewDocumentFromUpdateOp(immutablePaths); + } + + // Third: ensure _id is first if it exists, and generate a new OID otherwise. + _ensureIdFieldIsFirst(&_doc, true); + + // Fourth: assert that the finished document has all required fields and is valid for storage. + _assertDocumentToBeInsertedIsValid( + _doc, requiredPaths, isInternalRequest, _enforceOkForStorage); + + // Fifth: validate that the newly-produced document does not exceed the maximum BSON user size. + auto newDocument = _doc.getObject(); + uassert(17420, + str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize, + newDocument.objsize() <= BSONObjMaxUserSize); + + return newDocument; +} + +void UpsertStage::_generateNewDocumentFromUpdateOp(const FieldRefSet& immutablePaths) { + // Use the UpdateModification from the original request to generate a new document by running + // the update over the empty (except for fields extracted from the query) document. We do not + // validate for storage until later, but we do ensure that no immutable fields are modified. + const bool validateForStorage = false; + const bool isInsert = true; + uassertStatusOK( + _params.driver->update({}, &_doc, validateForStorage, immutablePaths, isInsert)); +}; + +void UpsertStage::_generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths) { + // We should never call this method unless the request has a set of update constants. + invariant(_params.request->shouldUpsertSuppliedDocument()); + invariant(_params.request->getUpdateConstants()); + + // Extract the supplied document from the constants and validate that it is an object. + auto suppliedDocElt = _params.request->getUpdateConstants()->getField("new"_sd); + invariant(suppliedDocElt.type() == BSONType::Object); + auto suppliedDoc = suppliedDocElt.embeddedObject(); + + // The supplied doc is functionally a replacement update. We need a new driver to apply it. + UpdateDriver replacementDriver(nullptr); + + // Create a new replacement-style update from the supplied document. + replacementDriver.parse({suppliedDoc}, {}); + replacementDriver.setLogOp(false); + + // We do not validate for storage, as we will validate the full document before inserting. + // However, we ensure that no immutable fields are modified. + const bool validateForStorage = false; + const bool isInsert = true; + uassertStatusOK( + replacementDriver.update({}, &_doc, validateForStorage, immutablePaths, isInsert)); +} + +void UpsertStage::_assertDocumentToBeInsertedIsValid(const mb::Document& document, + const FieldRefSet& requiredPaths, + bool isInternalRequest, + bool enforceOkForStorage) { + // For a non-internal operation, we assert that the document contains all required paths, that + // no shard key fields have arrays at any point along their paths, and that the document is + // valid for storage. Skip all such checks for an internal operation. + if (!isInternalRequest) { + if (enforceOkForStorage) { + storage_validation::storageValid(document); + } + _assertRequiredPathsPresent(document, requiredPaths); + } +} +} // namespace mongo diff --git a/src/mongo/db/exec/upsert_stage.h b/src/mongo/db/exec/upsert_stage.h new file mode 100644 index 00000000000..fe6572f9d16 --- /dev/null +++ b/src/mongo/db/exec/upsert_stage.h @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2019 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/update_stage.h" + +namespace mongo { + +/** + * Execution stage for update requests with {upsert:true}. This is a specialized UpdateStage which, + * in the event that no documents match the update request's query, generates and inserts a new + * document into the collection. All logic related to the insertion phase is implemented by this + * class. + * + * If the prior or newly-updated version of the document was requested to be returned, then ADVANCED + * is returned after updating or inserting a document. Otherwise, NEED_TIME is returned after + * updating a document if further updates are pending, and IS_EOF is returned if all updates have + * been performed or if a document has been inserted. + * + * Callers of doWork() must be holding a write lock. + */ +class UpsertStage final : public UpdateStage { + UpsertStage(const UpsertStage&) = delete; + UpsertStage& operator=(const UpsertStage&) = delete; + +public: + UpsertStage(OperationContext* opCtx, + const UpdateStageParams& params, + WorkingSet* ws, + Collection* collection, + PlanStage* child); + + bool isEOF() final; + StageState doWork(WorkingSetID* out) final; + +private: + BSONObj _produceNewDocumentForInsert(bool isInternalRequest); + void _performInsert(BSONObj newDocument); + + void _generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths); + void _generateNewDocumentFromUpdateOp(const FieldRefSet& immutablePaths); + + void _assertDocumentToBeInsertedIsValid(const mutablebson::Document& document, + const FieldRefSet& requiredPaths, + bool isInternalRequest, + bool enforceOkForStorage); +}; + +} // namespace mongo diff --git a/src/mongo/db/field_ref_set.cpp b/src/mongo/db/field_ref_set.cpp index c55a722b64b..3b98f049448 100644 --- a/src/mongo/db/field_ref_set.cpp +++ b/src/mongo/db/field_ref_set.cpp @@ -60,6 +60,14 @@ bool FieldRefSet::FieldRefPtrLessThan::operator()(const FieldRef* l, const Field FieldRefSet::FieldRefSet() {} +FieldRefSet::FieldRefSet(const std::vector<std::unique_ptr<FieldRef>>& paths) { + fillFrom(paths); +} + +FieldRefSet::FieldRefSet(const vector<const FieldRef*>& paths) { + _fieldSet.insert(paths.begin(), paths.end()); +} + FieldRefSet::FieldRefSet(const vector<FieldRef*>& paths) { fillFrom(paths); } @@ -105,6 +113,19 @@ void FieldRefSet::fillFrom(const std::vector<FieldRef*>& fields) { _fieldSet.insert(fields.begin(), fields.end()); } +void FieldRefSet::fillFrom(const std::vector<std::unique_ptr<FieldRef>>& fields) { + dassert(_fieldSet.empty()); + std::transform(fields.begin(), + fields.end(), + std::inserter(_fieldSet, _fieldSet.begin()), + [](const auto& field) { return field.get(); }); +} + +bool FieldRefSet::insertNoConflict(const FieldRef* toInsert) { + const FieldRef* conflict; + return insert(toInsert, &conflict); +} + bool FieldRefSet::insert(const FieldRef* toInsert, const FieldRef** conflict) { // We can determine if two fields conflict by checking their common prefix. // diff --git a/src/mongo/db/field_ref_set.h b/src/mongo/db/field_ref_set.h index 63166acbc89..0019bf9b4ba 100644 --- a/src/mongo/db/field_ref_set.h +++ b/src/mongo/db/field_ref_set.h @@ -64,6 +64,8 @@ public: FieldRefSet(); + FieldRefSet(const std::vector<std::unique_ptr<FieldRef>>& paths); + FieldRefSet(const std::vector<const FieldRef*>& paths); FieldRefSet(const std::vector<FieldRef*>& paths); /** Returns 'true' if the set is empty */ @@ -89,9 +91,9 @@ public: } /** - * Returns true if the field 'toInsert' can be added in the set without - * conflicts. Otherwise returns false and fill in '*conflict' with the field 'toInsert' - * clashed with. + * Returns true if the field 'toInsert' was added to the set without conflicts. + * + * Otherwise, returns false and fills '*conflict' with the field 'toInsert' clashed with. * * There is no ownership transfer of 'toInsert'. The caller is responsible for * maintaining it alive for as long as the FieldRefSet is so. By the same token @@ -100,13 +102,25 @@ public: bool insert(const FieldRef* toInsert, const FieldRef** conflict); /** - * Fills the set with the supplied FieldRef*s + * Returns true if the field 'toInsert' was added to the set without conflicts. + */ + bool insertNoConflict(const FieldRef* toInsert); + + /** + * Fills the set with the supplied FieldRef pointers. * * Note that *no* conflict resolution occurs here. */ void fillFrom(const std::vector<FieldRef*>& fields); /** + * Fills the set with the supplied FieldRefs. Does not take ownership of the managed pointers. + * + * Note that *no* conflict resolution occurs here. + */ + void fillFrom(const std::vector<std::unique_ptr<FieldRef>>& fields); + + /** * Replace any existing conflicting FieldRef with the shortest (closest to root) one. */ void keepShortest(const FieldRef* toInsert); diff --git a/src/mongo/db/ops/parsed_update.cpp b/src/mongo/db/ops/parsed_update.cpp index 07a44c54bd8..aaab7037871 100644 --- a/src/mongo/db/ops/parsed_update.cpp +++ b/src/mongo/db/ops/parsed_update.cpp @@ -32,6 +32,7 @@ #include "mongo/db/ops/parsed_update.h" #include "mongo/db/ops/update_request.h" +#include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/query_planner_common.h" @@ -53,6 +54,22 @@ Status ParsedUpdate::parseRequest() { // of a document during a multi-update. invariant(!(_request->shouldReturnAnyDocs() && _request->isMulti())); + // It is invalid to specify 'upsertSupplied:true' for a non-upsert operation, or if no upsert + // document was supplied with the request. + if (_request->shouldUpsertSuppliedDocument()) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "cannot specify '" + << write_ops::UpdateOpEntry::kUpsertSuppliedFieldName + << ": true' for a non-upsert operation", + _request->isUpsert()); + const auto& constants = _request->getUpdateConstants(); + uassert(ErrorCodes::FailedToParse, + str::stream() << "the parameter '" + << write_ops::UpdateOpEntry::kUpsertSuppliedFieldName + << "' is set to 'true', but no document was supplied", + constants && (*constants)["new"_sd].type() == BSONType::Object); + } + // It is invalid to request that a ProjectionStage be applied to the UpdateStage if the // UpdateStage would not return any document. invariant(_request->getProj().isEmpty() || _request->shouldReturnAnyDocs()); diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h index f946302c6b9..5fa0a561b7e 100644 --- a/src/mongo/db/ops/update_request.h +++ b/src/mongo/db/ops/update_request.h @@ -56,16 +56,7 @@ public: RETURN_NEW }; - inline UpdateRequest(const NamespaceString& nsString) - : _nsString(nsString), - _god(false), - _upsert(false), - _multi(false), - _fromMigration(false), - _fromOplogApplication(false), - _isExplain(false), - _returnDocs(ReturnDocOption::RETURN_NONE), - _yieldPolicy(PlanExecutor::NO_YIELD) {} + inline UpdateRequest(const NamespaceString& nsString) : _nsString(nsString) {} const NamespaceString& getNamespaceString() const { return _nsString; @@ -154,6 +145,14 @@ public: return _upsert; } + inline void setUpsertSuppliedDocument(bool value = true) { + _upsertSuppliedDocument = value; + } + + bool shouldUpsertSuppliedDocument() const { + return _upsertSuppliedDocument; + } + inline void setMulti(bool value = true) { _multi = value; } @@ -306,22 +305,26 @@ private: // God bypasses _id checking and index generation. It is only used on behalf of system // updates, never user updates. - bool _god; + bool _god = false; // True if this should insert if no matching document is found. - bool _upsert; + bool _upsert = false; + + // True if this upsert operation should insert the document supplied as 'c.new' if the query + // does not match any documents. + bool _upsertSuppliedDocument = false; // True if this update is allowed to affect more than one document. - bool _multi; + bool _multi = false; // True if this update is on behalf of a chunk migration. - bool _fromMigration; + bool _fromMigration = false; // True if this update was triggered by the application of an oplog entry. - bool _fromOplogApplication; + bool _fromOplogApplication = false; // Whether or not we are requesting an explained update. Explained updates are read-only. - bool _isExplain; + bool _isExplain = false; // Specifies which version of the documents to return, if any. // @@ -335,10 +338,10 @@ private: // // This allows findAndModify to execute an update and retrieve the resulting document // without another query before or after the update. - ReturnDocOption _returnDocs; + ReturnDocOption _returnDocs = ReturnDocOption::RETURN_NONE; // Whether or not the update should yield. Defaults to NO_YIELD. - PlanExecutor::YieldPolicy _yieldPolicy; + PlanExecutor::YieldPolicy _yieldPolicy = PlanExecutor::NO_YIELD; }; } // namespace mongo diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index 9409aa73041..42f86b1f481 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -127,6 +127,11 @@ structs: operation inserts only a single document." type: bool default: false + upsertSupplied: + description: "Only applicable when upsert is true. If set, and if no documents match + the query, the update subsystem will insert the document supplied as + 'c.new' rather than generating a new document from the update spec." + type: optionalBool collation: description: "Specifies the collation to use for the operation." type: object diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 1ecceb4caa8..3ae46dc6146 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -708,6 +708,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext* request.setArrayFilters(write_ops::arrayFiltersOf(op)); request.setMulti(op.getMulti()); request.setUpsert(op.getUpsert()); + request.setUpsertSuppliedDocument(op.getUpsertSupplied()); request.setHint(op.getHint()); request.setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 5d550eb7c8b..81c4567e48f 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -190,6 +190,13 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( } request.setMergeByPBRT(elem.Bool()); + } else if (fieldName == kUseNewUpsert) { + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() << kUseNewUpsert << " must be a boolean, not a " + << typeName(elem.type())}; + } + request.setUseNewUpsert(elem.boolean()); } else if (kAllowDiskUseName == fieldName) { if (storageGlobalParams.readOnly) { return {ErrorCodes::IllegalOperation, @@ -329,6 +336,7 @@ Document AggregationRequest::serializeToCommandObj() const { _writeConcern ? Value(_writeConcern->toBSON()) : Value()}, // Only serialize runtime constants if any were specified. {kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()}, + {kUseNewUpsert, _useNewUpsert ? Value(true) : Value()}, }; } } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 944423ba210..b15a06f898c 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -65,6 +65,7 @@ public: static constexpr StringData kCommentName = "comment"_sd; static constexpr StringData kExchangeName = "exchange"_sd; static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd; + static constexpr StringData kUseNewUpsert = "useNewUpsert"_sd; static constexpr long long kDefaultBatchSize = 101; @@ -238,6 +239,10 @@ public: return _runtimeConstants; } + bool getUseNewUpsert() const { + return _useNewUpsert; + } + // // Setters for optional fields. // @@ -310,6 +315,10 @@ public: _runtimeConstants = std::move(runtimeConstants); } + void setUseNewUpsert(bool useNewUpsert) { + _useNewUpsert = useNewUpsert; + } + private: // Required fields. const NamespaceString _nss; @@ -363,5 +372,9 @@ private: // A document containing runtime constants; i.e. values that do not change once computed (e.g. // $$NOW). boost::optional<RuntimeConstants> _runtimeConstants; + + // Indicates whether the aggregation may use the new 'upsertSupplied' mechanism when running + // $merge stages. Versions of mongoS from 4.2.2 onwards always set this flag. + bool _useNewUpsert = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 23129c98f85..888387a6e45 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -58,6 +58,7 @@ using WhenMatched = MergeStrategyDescriptor::WhenMatched; using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched; using BatchTransform = std::function<void(DocumentSourceMerge::BatchedObjects&)>; using UpdateModification = write_ops::UpdateModification; +using UpsertType = MongoProcessInterface::UpsertType; constexpr auto kStageName = DocumentSourceMerge::kStageName; constexpr auto kDefaultWhenMatched = WhenMatched::kMerge; @@ -75,12 +76,15 @@ constexpr auto kPipelineInsertMode = MergeMode{WhenMatched::kPipeline, WhenNotMa constexpr auto kPipelineFailMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kFail}; constexpr auto kPipelineDiscardMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kDiscard}; +const auto kDefaultPipelineLet = BSON("new" + << "$$ROOT"); + /** * Creates a merge strategy which uses update semantics to perform a merge operation. If * 'BatchTransform' function is provided, it will be called to transform batched objects before * passing them to the 'update'. */ -MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) { +MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) { return [upsert, transform]( const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { if (transform) { @@ -101,7 +105,7 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) { * error. If 'BatchTransform' function is provided, it will be called to transform batched objects * before passing them to the 'update'. */ -MergeStrategy makeStrictUpdateStrategy(bool upsert, BatchTransform transform) { +MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transform) { return [upsert, transform]( const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { if (transform) { @@ -168,44 +172,46 @@ const MergeStrategyDescriptorsMap& getDescriptors() { {kReplaceInsertMode, {kReplaceInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(true, {})}}, + makeUpdateStrategy(UpsertType::kGenerateNewDoc, {})}}, // whenMatched: replace, whenNotMatched: fail {kReplaceFailMode, - {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(false, {})}}, + {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(UpsertType::kNone, {})}}, // whenMatched: replace, whenNotMatched: discard {kReplaceDiscardMode, - {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(false, {})}}, + {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}}, // whenMatched: merge, whenNotMatched: insert {kMergeInsertMode, {kMergeInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(true, makeUpdateTransform("$set"))}}, + makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$set"))}}, // whenMatched: merge, whenNotMatched: fail {kMergeFailMode, {kMergeFailMode, {ActionType::update}, - makeStrictUpdateStrategy(false, makeUpdateTransform("$set"))}}, + makeStrictUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}}, // whenMatched: merge, whenNotMatched: discard {kMergeDiscardMode, {kMergeDiscardMode, {ActionType::update}, - makeUpdateStrategy(false, makeUpdateTransform("$set"))}}, + makeUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}}, // whenMatched: keepExisting, whenNotMatched: insert {kKeepExistingInsertMode, {kKeepExistingInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(true, makeUpdateTransform("$setOnInsert"))}}, + makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$setOnInsert"))}}, // whenMatched: [pipeline], whenNotMatched: insert {kPipelineInsertMode, {kPipelineInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(true, {})}}, + makeUpdateStrategy(UpsertType::kInsertSuppliedDoc, {})}}, // whenMatched: [pipeline], whenNotMatched: fail {kPipelineFailMode, - {kPipelineFailMode, {ActionType::update}, makeStrictUpdateStrategy(false, {})}}, + {kPipelineFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(UpsertType::kNone, {})}}, // whenMatched: [pipeline], whenNotMatched: discard {kPipelineDiscardMode, - {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(false, {})}}, + {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}}, // whenMatched: fail, whenNotMatched: insert {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}}; return mergeStrategyDescriptors; @@ -376,11 +382,16 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create( !outputNs.isSpecial()); if (whenMatched == WhenMatched::kPipeline) { - if (!letVariables) { - // For custom pipeline-style updates, default the 'let' variables to {new: "$$ROOT"}, - // if the user has omitted the 'let' argument. - letVariables = BSON("new" - << "$$ROOT"); + // If unspecified, 'letVariables' defaults to {new: "$$ROOT"}. + letVariables = letVariables.value_or(kDefaultPipelineLet); + auto newElt = letVariables->getField("new"_sd); + uassert(51273, + "'let' may not define a value for the reserved 'new' variable other than '$$ROOT'", + !newElt || newElt.valueStringDataSafe() == "$$ROOT"_sd); + // If the 'new' variable is missing and this is a {whenNotMatched: "insert"} merge, then the + // new document *must* be serialized with the update request. Add it to the let variables. + if (!newElt && whenNotMatched == WhenNotMatched::kInsert) { + letVariables = letVariables->addField(kDefaultPipelineLet.firstElement()); } } else { // Ensure the 'let' argument cannot be used with any other merge modes. diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp index dbebf226ced..e97cbbdf51d 100644 --- a/src/mongo/db/pipeline/document_source_merge_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_test.cpp @@ -759,20 +759,23 @@ TEST_F(DocumentSourceMergeTest, LetVariablesCanOnlyBeUsedWithPipelineMode) { ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51199); } +// We always serialize the default let variables as {new: "$$ROOT"} if omitted. TEST_F(DocumentSourceMergeTest, SerializeDefaultLetVariable) { - auto spec = - BSON("$merge" << BSON("into" - << "target_collection" - << "whenMatched" << BSON_ARRAY(BSON("$project" << BSON("x" << 1))) - << "whenNotMatched" - << "insert")); - auto mergeStage = createMergeStage(spec); - auto serialized = mergeStage->serialize().getDocument(); - ASSERT_VALUE_EQ(serialized["$merge"]["let"], - Value(BSON("new" - << "$$ROOT"))); + for (auto&& whenNotMatched : {"insert", "fail", "discard"}) { + auto spec = + BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" << BSON_ARRAY(BSON("$project" << BSON("x" << 1))) + << "whenNotMatched" << whenNotMatched)); + auto mergeStage = createMergeStage(spec); + auto serialized = mergeStage->serialize().getDocument(); + ASSERT_VALUE_EQ(serialized["$merge"]["let"], + Value(BSON("new" + << "$$ROOT"))); + } } +// Test the behaviour of 'let' serialization for each whenNotMatched mode. TEST_F(DocumentSourceMergeTest, SerializeLetVariables) { auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" << "$$v1" @@ -780,50 +783,70 @@ TEST_F(DocumentSourceMergeTest, SerializeLetVariables) { << "$$v2" << "z" << "$$v3"))); - auto spec = BSON("$merge" << BSON("into" - << "target_collection" - << "let" - << BSON("v1" << 10 << "v2" - << "foo" - << "v3" - << BSON("x" << 1 << "y" - << BSON("z" - << "bar"))) - << "whenMatched" << pipeline << "whenNotMatched" - << "insert")); - auto mergeStage = createMergeStage(spec); - ASSERT(mergeStage); - auto serialized = mergeStage->serialize().getDocument(); - ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v1"], Value(BSON("$const" << 10))); - ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v2"], - Value(BSON("$const" - << "foo"))); - ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v3"], - Value(BSON("x" << BSON("$const" << 1) << "y" - << BSON("z" << BSON("$const" - << "bar"))))); - ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline)); + + const auto createAndSerializeMergeStage = [this, &pipeline](StringData whenNotMatched) { + auto spec = BSON("$merge" << BSON("into" + << "target_collection" + << "let" + << BSON("v1" << 10 << "v2" + << "foo" + << "v3" + << BSON("x" << 1 << "y" + << BSON("z" + << "bar"))) + << "whenMatched" << pipeline << "whenNotMatched" + << whenNotMatched)); + auto mergeStage = createMergeStage(spec); + ASSERT(mergeStage); + + return mergeStage->serialize().getDocument(); + }; + + for (auto&& whenNotMatched : {"insert", "fail", "discard"}) { + const auto serialized = createAndSerializeMergeStage(whenNotMatched); + + // For {whenNotMatched:insert}, we always attach the 'new' document even if the user has + // already specified a set of variables. This is because a {whenNotMatched: insert} merge + // generates an upsert, and if no documents in the target collection match the query we must + // insert the original document. For other 'whenNotMatched' modes, we do not serialize the + // new document, since neither 'fail' nor 'discard' can result in an upsert. + ASSERT_VALUE_EQ(serialized["$merge"]["let"]["new"], + (whenNotMatched == "insert"_sd ? Value("$$ROOT"_sd) : Value())); + + // The user's variables should be serialized in all cases. + ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v1"], Value(BSON("$const" << 10))); + ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v2"], + Value(BSON("$const" + << "foo"))); + ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v3"], + Value(BSON("x" << BSON("$const" << 1) << "y" + << BSON("z" << BSON("$const" + << "bar"))))); + ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline)); + } } TEST_F(DocumentSourceMergeTest, SerializeLetArrayVariable) { - auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" - << "$$v1"))); - auto spec = - BSON("$merge" << BSON("into" - << "target_collection" - << "let" - << BSON("v1" << BSON_ARRAY(1 << "2" << BSON("x" << 1 << "y" << 2))) - << "whenMatched" << pipeline << "whenNotMatched" - << "insert")); - auto mergeStage = createMergeStage(spec); - ASSERT(mergeStage); - auto serialized = mergeStage->serialize().getDocument(); - ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v1"], - Value(BSON_ARRAY(BSON("$const" << 1) << BSON("$const" - << "2") - << BSON("x" << BSON("$const" << 1) << "y" - << BSON("$const" << 2))))); - ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline)); + for (auto&& whenNotMatched : {"insert", "fail", "discard"}) { + auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" + << "$$v1"))); + auto spec = BSON( + "$merge" << BSON("into" + << "target_collection" + << "let" + << BSON("v1" << BSON_ARRAY(1 << "2" << BSON("x" << 1 << "y" << 2))) + << "whenMatched" << pipeline << "whenNotMatched" << whenNotMatched)); + auto mergeStage = createMergeStage(spec); + ASSERT(mergeStage); + auto serialized = mergeStage->serialize().getDocument(); + ASSERT_VALUE_EQ( + serialized["$merge"]["let"]["v1"], + Value(BSON_ARRAY(BSON("$const" << 1) + << BSON("$const" + << "2") + << BSON("x" << BSON("$const" << 1) << "y" << BSON("$const" << 2))))); + ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline)); + } } // This test verifies that when the 'let' argument is specified as 'null', the default 'new' @@ -833,60 +856,66 @@ TEST_F(DocumentSourceMergeTest, SerializeLetArrayVariable) { // this test ensures that we're aware of this limitation. Once the limitation is addressed in // SERVER-41272, this test should be updated to accordingly. TEST_F(DocumentSourceMergeTest, SerializeNullLetVariablesAsDefault) { - auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" - << "1"))); - auto spec = - BSON("$merge" << BSON("into" - << "target_collection" - << "let" << BSONNULL << "whenMatched" << pipeline << "whenNotMatched" - << "insert")); - auto mergeStage = createMergeStage(spec); - ASSERT(mergeStage); - auto serialized = mergeStage->serialize().getDocument(); - ASSERT_VALUE_EQ(serialized["$merge"]["let"], - Value(BSON("new" - << "$$ROOT"))); - ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline)); + for (auto&& whenNotMatched : {"insert", "fail", "discard"}) { + auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" + << "1"))); + auto spec = BSON("$merge" << BSON("into" + << "target_collection" + << "let" << BSONNULL << "whenMatched" << pipeline + << "whenNotMatched" << whenNotMatched)); + auto mergeStage = createMergeStage(spec); + ASSERT(mergeStage); + auto serialized = mergeStage->serialize().getDocument(); + ASSERT_VALUE_EQ(serialized["$merge"]["let"], + Value(BSON("new" + << "$$ROOT"))); + ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline)); + } } TEST_F(DocumentSourceMergeTest, SerializeEmptyLetVariables) { - auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" - << "1"))); - auto spec = - BSON("$merge" << BSON("into" - << "target_collection" - << "let" << BSONObj() << "whenMatched" << pipeline << "whenNotMatched" - << "insert")); - auto mergeStage = createMergeStage(spec); - ASSERT(mergeStage); - auto serialized = mergeStage->serialize().getDocument(); - ASSERT_VALUE_EQ(serialized["$merge"]["let"], Value(BSONObj())); - ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline)); + for (auto&& whenNotMatched : {"insert", "fail", "discard"}) { + auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" + << "1"))); + auto spec = BSON("$merge" << BSON("into" + << "target_collection" + << "let" << BSONObj() << "whenMatched" << pipeline + << "whenNotMatched" << whenNotMatched)); + auto mergeStage = createMergeStage(spec); + ASSERT(mergeStage); + auto serialized = mergeStage->serialize().getDocument(); + ASSERT_VALUE_EQ(serialized["$merge"]["let"], + (whenNotMatched == "insert"_sd ? Value(BSON("new" + << "$$ROOT")) + : Value(BSONObj()))); + ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline)); + } } TEST_F(DocumentSourceMergeTest, OnlyObjectCanBeUsedAsLetVariables) { - auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" - << "1"))); - auto spec = BSON("$merge" << BSON("into" - << "target_collection" - << "let" << 1 << "whenMatched" << pipeline << "whenNotMatched" - << "insert")); - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$merge" << BSON("into" - << "target_collection" - << "let" - << "foo" - << "whenMatched" << pipeline << "whenNotMatched" - << "insert")); - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$merge" << BSON("into" - << "target_collection" - << "let" << BSON_ARRAY(1 << "2") << "whenMatched" << pipeline - << "whenNotMatched" - << "insert")); - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); + for (auto&& whenNotMatched : {"insert", "fail", "discard"}) { + auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x" + << "1"))); + auto spec = BSON("$merge" << BSON("into" + << "target_collection" + << "let" << 1 << "whenMatched" << pipeline + << "whenNotMatched" << whenNotMatched)); + ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "let" + << "foo" + << "whenMatched" << pipeline << "whenNotMatched" + << whenNotMatched)); + ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "let" << BSON_ARRAY(1 << "2") << "whenMatched" << pipeline + << "whenNotMatched" << whenNotMatched)); + ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); + } } } // namespace diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 9f65c12669f..781c0cc96c1 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -67,6 +67,10 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, } else { variables.setDefaultRuntimeConstants(opCtx); } + + // Any request which did not originate from a mongoS, or which did originate from a mongoS but + // has the 'useNewUpsert' flag set, can use the new upsertSupplied mechanism for $merge. + useNewUpsert = request.getUseNewUpsert() || !request.isFromMongos(); } ExpressionContext::ExpressionContext(OperationContext* opCtx, @@ -164,6 +168,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith( expCtx->subPipelineDepth = subPipelineDepth; expCtx->tempDir = tempDir; + expCtx->useNewUpsert = useNewUpsert; expCtx->opCtx = opCtx; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 4ed86cf5efb..e55284315f0 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -247,6 +247,10 @@ public: boost::optional<ServerGlobalParams::FeatureCompatibility::Version> maxFeatureCompatibilityVersion; + // True if this context is associated with a pipeline which is permitted to use the new + // upsertSupplied mechanism for applicable $merge modes. + bool useNewUpsert = false; + protected: static const int kInterruptCheckPeriod = 128; diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index be897699a6f..c3009543e0f 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -85,6 +85,12 @@ public: std::tuple<BSONObj, write_ops::UpdateModification, boost::optional<BSONObj>>; using BatchedObjects = std::vector<BatchObject>; + enum class UpsertType { + kNone, // This operation is not an upsert. + kGenerateNewDoc, // If no documents match, generate a new document using the update spec. + kInsertSuppliedDoc // If no documents match, insert the document supplied in 'c.new' as-is. + }; + enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle }; enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps }; @@ -167,7 +173,7 @@ public: const NamespaceString& ns, BatchedObjects&& batch, const WriteConcernOptions& wc, - bool upsert, + UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) = 0; diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 0e5583a85a4..c63d8bd8982 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -113,7 +113,7 @@ public: const NamespaceString& ns, BatchedObjects&& batch, const WriteConcernOptions& wc, - bool upsert, + UpsertType upsert, bool multi, boost::optional<OID>) final { MONGO_UNREACHABLE; diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index 18a857d305b..132fbc272c2 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -135,7 +135,7 @@ StatusWith<MongoProcessInterface::UpdateResult> MongoInterfaceShardServer::updat const NamespaceString& ns, BatchedObjects&& batch, const WriteConcernOptions& wc, - bool upsert, + UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) { BatchedCommandResponse response; diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index 4dbc21b6ca2..8b4666c9b55 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -78,7 +78,7 @@ public: const NamespaceString& ns, BatchedObjects&& batch, const WriteConcernOptions& wc, - bool upsert, + UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) final; diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index fed8b7076e8..7f2cf3e96f7 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -181,7 +181,7 @@ Update MongoInterfaceStandalone::buildUpdateOp( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, BatchedObjects&& batch, - bool upsert, + UpsertType upsert, bool multi) { Update updateOp(nss); updateOp.setUpdates([&] { @@ -193,7 +193,9 @@ Update MongoInterfaceStandalone::buildUpdateOp( entry.setQ(std::move(q)); entry.setU(std::move(u)); entry.setC(std::move(c)); - entry.setUpsert(upsert); + entry.setUpsert(upsert != UpsertType::kNone); + entry.setUpsertSupplied({{entry.getUpsert() && expCtx->useNewUpsert, + upsert == UpsertType::kInsertSuppliedDoc}}); entry.setMulti(multi); return entry; }()); @@ -232,7 +234,7 @@ StatusWith<MongoProcessInterface::UpdateResult> MongoInterfaceStandalone::update const NamespaceString& ns, BatchedObjects&& batch, const WriteConcernOptions& wc, - bool upsert, + UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) { auto writeResults = diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index e306fca6436..91d3fe5d380 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -72,7 +72,7 @@ public: const NamespaceString& ns, BatchedObjects&& batch, const WriteConcernOptions& wc, - bool upsert, + UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) override; @@ -176,7 +176,7 @@ protected: Update buildUpdateOp(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, BatchedObjects&& batch, - bool upsert, + UpsertType upsert, bool multi); private: diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index e17dddc7e50..d9e10e6eecc 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -117,6 +117,10 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, Value(static_cast<long long>(*opCtx->getTxnNumber())); } + // We set this flag to indicate that the shards should always use the new upsert mechanism when + // executing relevant $merge modes. + cmdForShards[AggregationRequest::kUseNewUpsert] = Value(true); + // agg creates temp collection and should handle implicit create separately. return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), true); } diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index e32048e0a80..6e7e17a9ff7 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -75,7 +75,7 @@ public: const NamespaceString& ns, BatchedObjects&& batch, const WriteConcernOptions& wc, - bool upsert, + UpsertType upsert, bool multi, boost::optional<OID>) final { MONGO_UNREACHABLE; diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 1c1f4f3387a..32d63cd41f4 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -53,6 +53,7 @@ #include "mongo/db/exec/sort_key_generator.h" #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/update_stage.h" +#include "mongo/db/exec/upsert_stage.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/index/wildcard_access_method.h" #include "mongo/db/index_names.h" @@ -1084,8 +1085,11 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorUpdate( invariant(root); updateStageParams.canonicalQuery = cq.get(); - root = stdx::make_unique<UpdateStage>( - opCtx, updateStageParams, ws.get(), collection, root.release()); + const bool isUpsert = updateStageParams.request->isUpsert(); + root = (isUpsert ? std::make_unique<UpsertStage>( + opCtx, updateStageParams, ws.get(), collection, root.release()) + : std::make_unique<UpdateStage>( + opCtx, updateStageParams, ws.get(), collection, root.release())); if (!request->getProj().isEmpty()) { invariant(request->shouldReturnAnyDocs()); diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index f303d1169ba..e1b492c04f5 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -40,6 +40,7 @@ #include "mongo/db/exec/idhack.h" #include "mongo/db/exec/index_scan.h" #include "mongo/db/exec/update_stage.h" +#include "mongo/db/exec/upsert_stage.h" #include "mongo/db/query/get_executor.h" #include "mongo/stdx/memory.h" @@ -166,9 +167,13 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::updateWith invariant(collection); auto ws = stdx::make_unique<WorkingSet>(); - auto idHackStage = stdx::make_unique<IDHackStage>(opCtx, key, ws.get(), descriptor); - auto root = - stdx::make_unique<UpdateStage>(opCtx, params, ws.get(), collection, idHackStage.release()); + auto idHackStage = std::make_unique<IDHackStage>(opCtx, key, ws.get(), descriptor); + + const bool isUpsert = params.request->isUpsert(); + auto root = (isUpsert ? std::make_unique<UpsertStage>( + opCtx, params, ws.get(), collection, idHackStage.release()) + : std::make_unique<UpdateStage>( + opCtx, params, ws.get(), collection, idHackStage.release())); auto executor = PlanExecutor::make(opCtx, std::move(ws), std::move(root), collection, yieldPolicy); diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp index c5780407243..13aed7d7550 100644 --- a/src/mongo/db/views/resolved_view.cpp +++ b/src/mongo/db/views/resolved_view.cpp @@ -112,6 +112,7 @@ AggregationRequest ResolvedView::asExpandedViewAggregation( expandedRequest.setUnwrappedReadPref(request.getUnwrappedReadPref()); expandedRequest.setBypassDocumentValidation(request.shouldBypassDocumentValidation()); expandedRequest.setAllowDiskUse(request.shouldAllowDiskUse()); + expandedRequest.setUseNewUpsert(request.getUseNewUpsert()); // Operations on a view must always use the default collation of the view. We must have already // checked that if the user's request specifies a collation, it matches the collation of the diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp index 715aa75d2ad..ea02b2a2359 100644 --- a/src/mongo/dbtests/query_stage_update.cpp +++ b/src/mongo/dbtests/query_stage_update.cpp @@ -43,6 +43,7 @@ #include "mongo/db/exec/eof.h" #include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/exec/update_stage.h" +#include "mongo/db/exec/upsert_stage.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/jsobj.h" #include "mongo/db/json.h" @@ -229,7 +230,7 @@ public: auto eofStage = make_unique<EOFStage>(&_opCtx); auto updateStage = - make_unique<UpdateStage>(&_opCtx, params, ws.get(), collection, eofStage.release()); + make_unique<UpsertStage>(&_opCtx, params, ws.get(), collection, eofStage.release()); runUpdate(updateStage.get()); } diff --git a/src/mongo/idl/basic_types.h b/src/mongo/idl/basic_types.h new file mode 100644 index 00000000000..8eb66c53629 --- /dev/null +++ b/src/mongo/idl/basic_types.h @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2019 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> + +#include "mongo/base/string_data.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { + +/** + * Wraps a boost::optional<bool> to provide consistent semantics. A standard boost::optional<bool> + * can introduce ambiguity because its 'operator bool' will resolve to 'true' if the optional is + * populated, even if is populated with boolean 'false'. By contrast, an instance of this class + * always resolves to the populated value, or false if not yet populated. This class will also + * serialize to BSON via the IDL only if the value has been explicitly set. + */ +class OptionalBool { +public: + static OptionalBool parseFromBSON(BSONElement element) { + uassert(ErrorCodes::BadValue, + str::stream() << "Field '" << element.fieldNameStringData() + << "' should be a boolean value, but found: " << element.type(), + !element || element.type() == BSONType::Bool); + return element ? OptionalBool{element.boolean()} : OptionalBool{}; + } + + OptionalBool(boost::optional<bool> value) : _value(std::move(value)) {} + OptionalBool(bool value) : _value(value) {} + + OptionalBool() : OptionalBool(boost::none) {} + + /** + * Returns true only if _value is populated with a value of true. + */ + operator bool() const { + return _value.value_or(false); + } + + /** + * Returns true if the value has been populated, false otherwise. + */ + bool has_value() const { + return _value.has_value(); + } + + /** + * Serialize this object as a field in a document. If _value is empty, omit the field. + */ + void serializeToBSON(StringData fieldName, BSONObjBuilder* builder) const { + if (_value) { + builder->appendBool(fieldName, *_value); + } + } + + /** + * Serialize this object as an element of a BSON array. If _value is empty, omit the entry. + */ + void serializeToBSON(BSONArrayBuilder* builder) const { + if (_value) { + builder->append(*_value); + } + } + + /** + * Returns a string representation of the current value. + */ + std::string toString() const { + return *this ? "1" : "0"; + } + operator std::string() const { + return toString(); + } + +private: + boost::optional<bool> _value; +}; + +} // namespace mongo diff --git a/src/mongo/idl/basic_types.idl b/src/mongo/idl/basic_types.idl index 7005fdc4bc3..31c665dd575 100644 --- a/src/mongo/idl/basic_types.idl +++ b/src/mongo/idl/basic_types.idl @@ -31,6 +31,7 @@ global: cpp_namespace: "mongo" cpp_includes: - "mongo/db/namespace_string.h" + - "mongo/idl/basic_types.h" - "mongo/util/uuid.h" types: @@ -92,10 +93,21 @@ types: cpp_type: bool deserializer: "mongo::BSONElement::trueValue" + optionalBool: + bson_serialization_type: any + description: "An optional bool type that does not serialize unless explicitly set. Can be + used in place of boost::optional<bool> to provide more intuitive semantics, + since the standard optional will coerce to true if populated regardless of + its internal value." + cpp_type: "mongo::OptionalBool" + default: "mongo::OptionalBool()" + deserializer: "mongo::OptionalBool::parseFromBSON" + serializer: "mongo::OptionalBool::serializeToBSON" + bindata_generic: bson_serialization_type: bindata bindata_subtype: generic - description: "A BSON bindata of " + description: "A BSON bindata of generic sub type" cpp_type: "std::vector<std::uint8_t>" deserializer: "mongo::BSONElement::_binDataVector" |