summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/sources/merge/all_modes.js6
-rw-r--r--jstests/aggregation/sources/merge/mode_pipeline_insert.js137
-rw-r--r--jstests/core/update_with_pipeline.js58
-rw-r--r--jstests/multiVersion/agg_merge_upsert_supplied_cluster.js226
-rw-r--r--jstests/multiVersion/agg_merge_upsert_supplied_replset.js90
-rw-r--r--jstests/sharding/upsert_sharded.js258
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp1
-rw-r--r--src/mongo/db/exec/update_stage.cpp339
-rw-r--r--src/mongo/db/exec/update_stage.h104
-rw-r--r--src/mongo/db/exec/upsert_stage.cpp301
-rw-r--r--src/mongo/db/exec/upsert_stage.h76
-rw-r--r--src/mongo/db/field_ref_set.cpp21
-rw-r--r--src/mongo/db/field_ref_set.h22
-rw-r--r--src/mongo/db/ops/parsed_update.cpp17
-rw-r--r--src/mongo/db/ops/update_request.h39
-rw-r--r--src/mongo/db/ops/write_ops.idl5
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp1
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp8
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h13
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp45
-rw-r--r--src/mongo/db/pipeline/document_source_merge_test.cpp229
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp5
-rw-r--r--src/mongo/db/pipeline/expression_context.h4
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h8
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h2
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp8
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h4
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h2
-rw-r--r--src/mongo/db/query/get_executor.cpp8
-rw-r--r--src/mongo/db/query/internal_plans.cpp11
-rw-r--r--src/mongo/db/views/resolved_view.cpp1
-rw-r--r--src/mongo/dbtests/query_stage_update.cpp3
-rw-r--r--src/mongo/idl/basic_types.h107
-rw-r--r--src/mongo/idl/basic_types.idl14
38 files changed, 1571 insertions, 611 deletions
diff --git a/jstests/aggregation/sources/merge/all_modes.js b/jstests/aggregation/sources/merge/all_modes.js
index 3854008072c..278d18de6c3 100644
--- a/jstests/aggregation/sources/merge/all_modes.js
+++ b/jstests/aggregation/sources/merge/all_modes.js
@@ -199,7 +199,7 @@ const target = db.all_modes_target;
})();
// Test 'whenMatched=[pipeline] whenNotMatched=insert' mode. This is an equivalent of a
-// pipeline-style update with upsert=true.
+// pipeline-style update with upsert=true and upsertSupplied=true.
(function testWhenMatchedPipelineUpdateWhenNotMatchedInsert() {
assert(target.drop());
assert.commandWorked(target.insert({_id: 1, b: 1}));
@@ -207,9 +207,11 @@ const target = db.all_modes_target;
$merge:
{into: target.getName(), whenMatched: [{$addFields: {x: 2}}], whenNotMatched: "insert"}
}]));
+ // We match {_id: 1} and apply the pipeline to add the field {x: 2}. The other source collection
+ // documents are copied directly into the target collection.
assertArrayEq({
actual: target.find().toArray(),
- expected: [{_id: 1, b: 1, x: 2}, {_id: 2, x: 2}, {_id: 3, x: 2}]
+ expected: [{_id: 1, b: 1, x: 2}, {_id: 2, a: 2, b: "b"}, {_id: 3, a: 3, b: "c"}]
});
})();
diff --git a/jstests/aggregation/sources/merge/mode_pipeline_insert.js b/jstests/aggregation/sources/merge/mode_pipeline_insert.js
index df3414e0950..e37f5467b81 100644
--- a/jstests/aggregation/sources/merge/mode_pipeline_insert.js
+++ b/jstests/aggregation/sources/merge/mode_pipeline_insert.js
@@ -37,19 +37,19 @@ target.drop();
assertArrayEq({
actual: target.find().toArray(),
expected: [
- {_id: 1, x: 1},
+ {_id: 1, a: 1, b: "a"},
]
});
})();
-// Test $merge inserts a document into an existing target collection if no matching document
-// is found.
+// Test $merge inserts the original source document into an existing target collection if no
+// matching document is found.
(function testMergeInsertsDocumentIfMatchNotFound() {
assert.commandWorked(target.deleteMany({}));
assert.doesNotThrow(
() => source.aggregate(makeMergePipeline(
{target: target.getName(), updatePipeline: [{$addFields: {x: 1, y: 2}}]})));
- assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 2}]});
+ assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, a: 1, b: "a"}]});
})();
// Test $merge updates an existing document in the target collection by applying a
@@ -57,9 +57,10 @@ target.drop();
(function testMergeUpdatesDocumentIfMatchFound() {
assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
target: target.getName(),
- updatePipeline: [{$project: {x: {$add: ["$x", 1]}, y: {$add: ["$y", 2]}}}]
+ updatePipeline:
+ [{$project: {x: {$add: ["$a", 1]}, y: {$sum: ["$y", 2]}, z: {$add: ["$y", 2]}}}]
})));
- assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2, y: 4}]});
+ assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2, y: 2, z: null}]});
})();
// Test $merge with various pipeline stages which are currently supported by the pipeline-style
@@ -76,7 +77,7 @@ target.drop();
() => source.aggregate(makeMergePipeline(
{target: target.getName(), updatePipeline: [{$addFields: {x: {$add: ["$b", 1]}}}]})));
assertArrayEq(
- {actual: target.find().toArray(), expected: [{_id: 1, b: 1, x: 2}, {_id: 2, x: null}]});
+ {actual: target.find().toArray(), expected: [{_id: 1, b: 1, x: 2}, {_id: 2, a: 2}]});
// Test $project stage.
assert(target.drop());
@@ -84,7 +85,7 @@ target.drop();
assert.doesNotThrow(
() => source.aggregate(makeMergePipeline(
{target: target.getName(), updatePipeline: [{$project: {x: {$add: ["$b", 1]}}}]})));
- assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2}, {_id: 2, x: null}]});
+ assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2}, {_id: 2, a: 2}]});
// Test $replaceWith stage.
assert(target.drop());
@@ -106,8 +107,8 @@ target.drop();
{actual: target.find().toArray(), expected: [{_id: 1, x: {y: 1}}, {_id: 2, x: {y: 2}}]});
})();
-// Test $merge inserts a new document into the target collection if not matching document is
-// found by applying a pipeline-style update with upsert=true semantics.
+// Test $merge inserts a new document into the target collection if no matching document is
+// found by applying a pipeline-style update with upsert=true and upsertSupplied=true.
(function testMergeInsertDocumentIfMatchNotFound() {
assert(source.drop());
assert(target.drop());
@@ -115,7 +116,7 @@ target.drop();
assert.commandWorked(target.insert({_id: 2, a: 2}));
assert.doesNotThrow(() => source.aggregate(makeMergePipeline(
{target: target.getName(), updatePipeline: [{$addFields: {x: 1}}]})));
- assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 1}, {_id: 2, a: 2}]});
+ assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, a: 1}, {_id: 2, a: 2}]});
})();
// Test $merge doesn't modify the target collection if a document has been removed from the
@@ -129,7 +130,7 @@ target.drop();
assertArrayEq({
actual: target.find().toArray(),
expected: [
- {_id: 1, x: 1},
+ {_id: 1, a: 1},
{_id: 2, a: 2},
]
});
@@ -186,7 +187,7 @@ target.drop();
// differ from the _id field.
(function testMergeWithOnFields() {
if (FixtureHelpers.isSharded(source)) {
- // Skip this test if the collection sharded, because an implicitly created sharded
+ // Skip this test if the collection is sharded, because an implicitly created sharded
// key of {_id: 1} will not be covered by a unique index created in this test, which
// is not allowed.
return;
@@ -208,7 +209,7 @@ target.drop();
})));
assertArrayEq({
actual: target.find({}, {_id: 0}).toArray(),
- expected: [{a: 1, b: 1, z: 1}, {a: 2, z: 1}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}]
+ expected: [{a: 1, b: 1, z: 1}, {a: 2}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}]
});
// The 'on' fields contains multiple document fields.
@@ -228,7 +229,7 @@ target.drop();
})));
assertArrayEq({
actual: target.find({}, {_id: 0}).toArray(),
- expected: [{a: 1, b: 1, z: 1}, {a: 2, b: 4, z: 1}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}]
+ expected: [{a: 1, b: 1, z: 1}, {a: 2, b: 4}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}]
});
assert.commandWorked(source.dropIndex({a: 1, b: 1}));
assert.commandWorked(target.dropIndex({a: 1, b: 1}));
@@ -260,12 +261,8 @@ target.drop();
updatePipeline: [{$addFields: {z: 1}}]
})));
assertArrayEq({
- actual: target.find().toArray(),
- expected: [
- {_id: 1, a: {b: "b"}, z: 1},
- {_id: 2, a: {b: "c"}, c: "y", z: 1},
- {_id: 3, a: {b: 30}, z: 1}
- ]
+ actual: target.find({}, {_id: 0}).toArray(),
+ expected: [{a: {b: "b"}, c: "x"}, {a: {b: "c"}, c: "y", z: 1}, {a: {b: 30}, b: "c"}]
});
})();
@@ -326,9 +323,8 @@ target.drop();
assertArrayEq({
actual: target.find({}, {_id: 0}).toArray(),
// There is a matching document in the target with {_id: 1}, but since we cannot match
- // it (no _id in projection), we just insert two new documents from the source
- // collection by applying a pipeline-style update.
- expected: [{b: "c"}, {z: 1}, {z: 1}]
+ // it (no _id in projection), we insert the two documents from the source collection.
+ expected: [{b: "c"}, {a: 1, b: "a"}, {a: 2, b: "b"}]
});
pipeline = makeMergePipeline({
@@ -346,7 +342,7 @@ target.drop();
assert.doesNotThrow(() => source.aggregate(pipeline));
assertArrayEq({
actual: target.find({}, {_id: 0}).toArray(),
- expected: [{b: "c"}, {a: 1, z: 1}, {a: 2, z: 1}]
+ expected: [{b: "c"}, {a: 1, b: "a"}, {a: 2, b: "b"}]
});
assert.commandWorked(source.dropIndex({_id: 1, a: -1}));
assert.commandWorked(target.dropIndex({_id: 1, a: -1}));
@@ -358,6 +354,7 @@ target.drop();
assert(target.drop());
assert.commandWorked(db.createCollection(target.getName(), {validator: validator}));
assert.commandWorked(target.createIndex({a: 1}));
+ assert.commandWorked(target.insert([{_id: 1, z: 5}, {_id: 2, z: 5}]));
assert.doesNotThrow(() => source.aggregate(makeMergePipeline(
{target: target.getName(), updatePipeline: [{$addFields: {z: 1}}]})));
assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, z: 1}, {_id: 2, z: 1}]});
@@ -368,8 +365,7 @@ target.drop();
assert.eq(validator, listColl.cursor.firstBatch[0].options["validator"]);
})();
-// Test $merge implicitly creates a new database when the target collection's database doesn't
-// exist.
+// Test that $merge implicitly creates a new database when the target collection's db doesn't exist.
(function testMergeImplicitlyCreatesTargetDatabase() {
assert(source.drop());
assert.commandWorked(source.insert({_id: 1, a: 1, b: "a"}));
@@ -384,19 +380,19 @@ target.drop();
if (!FixtureHelpers.isMongos(db)) {
assert.doesNotThrow(() => source.aggregate(foreignPipeline));
- assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, z: 1}]});
+ assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, a: 1, b: "a"}]});
} else {
// Implicit database creation is prohibited in a cluster.
const error = assert.throws(() => source.aggregate(foreignPipeline));
assert.commandFailedWithCode(error, ErrorCodes.NamespaceNotFound);
- // Force a creation of the database and collection, then fall through the test
- // below.
- assert.commandWorked(foreignTarget.insert({_id: 1}));
+ // Force creation of the database and collection, then fall through the test below.
+ assert.commandWorked(foreignTarget.insert({_id: 1, a: 1, b: "a"}));
}
assert.doesNotThrow(() => source.aggregate(foreignPipeline));
- assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, z: 1}]});
+ assertArrayEq(
+ {actual: foreignTarget.find().toArray(), expected: [{_id: 1, a: 1, b: "a", z: 1}]});
assert.commandWorked(foreignDb.dropDatabase());
})();
@@ -414,27 +410,28 @@ target.drop();
updatePipeline: [{$set: {x: {$add: ["$$new.a", "$$new.b"]}}}]
})));
assertArrayEq(
- {actual: target.find().toArray(), expected: [{_id: 1, c: 1, x: 2}, {_id: 2, x: 4}]});
+ {actual: target.find().toArray(), expected: [{_id: 1, c: 1, x: 2}, {_id: 2, a: 2, b: 2}]});
})();
-// Test that the default 'let' variable 'new' is not available once the 'let' argument to the
-// $merge stage is specified explicitly.
+// Test that the default 'let' variable 'new' is always available even when the 'let' argument to
+// the $merge stage is specified explicitly.
(function testMergeCannotUseDefaultLetVariableIfLetIsSpecified() {
assert(source.drop());
assert(target.drop());
assert.commandWorked(source.insert([{_id: 1, a: 1, b: 1}, {_id: 2, a: 2, b: 2}]));
assert.commandWorked(target.insert({_id: 1, c: 1}));
- const error = assert.throws(() => source.aggregate(makeMergePipeline({
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
letVars: {foo: "bar"},
target: target.getName(),
updatePipeline: [{$project: {x: "$$new.a", y: "$$new.b"}}]
})));
- assert.commandFailedWithCode(error, 17276);
+ assertArrayEq(
+ {actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 1}, {_id: 2, a: 2, b: 2}]});
})();
// Test that $merge can accept an empty object holding no variables and the default 'new'
-// variable is not available.
+// variable is still available.
(function testMergeWithEmptyLetVariables() {
assert(source.drop());
assert(target.drop());
@@ -447,16 +444,42 @@ target.drop();
{letVars: {}, target: target.getName(), updatePipeline: [{$set: {x: "foo"}}]})));
assertArrayEq({
actual: target.find().toArray(),
- expected: [{_id: 1, c: 1, x: "foo"}, {_id: 2, x: "foo"}]
+ expected: [{_id: 1, c: 1, x: "foo"}, {_id: 2, a: 2, b: 2}]
});
// No default variable 'new' is available.
- const error = assert.throws(() => source.aggregate(makeMergePipeline({
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
letVars: {},
target: target.getName(),
updatePipeline: [{$project: {x: "$$new.a", y: "$$new.b"}}]
})));
- assert.commandFailedWithCode(error, 17276);
+ assertArrayEq(
+ {actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 1}, {_id: 2, x: 2, y: 2}]});
+})();
+
+// Test that $merge will reject a 'let' specification which attempts to redefine 'new'.
+(function testMergeRejectsLetVariablesWhichRedefineNew() {
+ assert(source.drop());
+ assert(target.drop());
+ assert.commandWorked(source.insert([{_id: 1, a: 1, b: 1}, {_id: 2, a: 2, b: 2}]));
+ assert.commandWorked(target.insert({_id: 1, c: 1}));
+
+ // Cannot override 'new' with an arbitrary value.
+ const error = assert.throws(() => source.aggregate(makeMergePipeline({
+ letVars: {new: "$a"},
+ target: target.getName(),
+ updatePipeline: [{$set: {x: "foo"}}]
+ })));
+ assert.commandFailedWithCode(error, 51273);
+
+ // If the user's 'let' explicitly sets 'new' to "$$ROOT", we allow it.
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
+ letVars: {new: "$$ROOT"},
+ target: target.getName(),
+ updatePipeline: [{$project: {x: "$$new.a", y: "$$new.b"}}]
+ })));
+ assertArrayEq(
+ {actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 1}, {_id: 2, a: 2, b: 2}]});
})();
// Test that $merge can accept a null value as the 'let' argument and the default variable 'new'
@@ -465,7 +488,7 @@ target.drop();
// cannot differentiate between an optional field specified explicitly as 'null', or not
// specified at all. In both cases it will treat the field like it wasn't specified. So, this
// test ensures that we're aware of this limitation. Once the limitation is addressed in
-// SERVER-41272, this test should be updated to accordingly.
+// SERVER-41272, this test should be updated accordingly.
(function testMergeWithNullLetVariables() {
assert(source.drop());
assert(target.drop());
@@ -478,7 +501,7 @@ target.drop();
{letVars: null, target: target.getName(), updatePipeline: [{$set: {x: "foo"}}]})));
assertArrayEq({
actual: target.find().toArray(),
- expected: [{_id: 1, c: 1, x: "foo"}, {_id: 2, x: "foo"}]
+ expected: [{_id: 1, c: 1, x: "foo"}, {_id: 2, a: 2, b: 2}]
});
// Can use the default 'new' variable.
@@ -507,7 +530,7 @@ target.drop();
})));
assertArrayEq({
actual: target.find().toArray(),
- expected: [{_id: 1, c: 1, x: 1, y: "foo", z: true}, {_id: 2, x: 1, y: "foo", z: true}]
+ expected: [{_id: 1, c: 1, x: 1, y: "foo", z: true}, {_id: 2, a: 2, b: 2}]
});
// Constant array.
@@ -520,7 +543,7 @@ target.drop();
updatePipeline: [{$set: {x: {$arrayElemAt: ["$$a", 1]}}}]
})));
assertArrayEq(
- {actual: target.find().toArray(), expected: [{_id: 1, c: 1, x: 2}, {_id: 2, x: 2}]});
+ {actual: target.find().toArray(), expected: [{_id: 1, c: 1, x: 2}, {_id: 2, a: 2, b: 2}]});
})();
// Test that variables referencing the fields in the source document can be specified in the
@@ -538,7 +561,7 @@ target.drop();
updatePipeline: [{$set: {z: {$add: ["$$x", "$$y"]}}}]
})));
assertArrayEq(
- {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 2}, {_id: 2, z: 4}]});
+ {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 2}, {_id: 2, a: 2, b: 2}]});
// Array field with expressions in the pipeline.
assert(source.drop());
@@ -551,8 +574,10 @@ target.drop();
target: target.getName(),
updatePipeline: [{$set: {z: {$arrayElemAt: ["$$x", 1]}}}]
})));
- assertArrayEq(
- {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 2}, {_id: 2, z: 5}]});
+ assertArrayEq({
+ actual: target.find().toArray(),
+ expected: [{_id: 1, c: 1, z: 2}, {_id: 2, a: [4, 5, 6]}]
+ });
// Array field with expressions in the 'let' argument.
assert(target.drop());
@@ -563,8 +588,10 @@ target.drop();
target: target.getName(),
updatePipeline: [{$set: {z: "$$x"}}]
})));
- assertArrayEq(
- {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 3}, {_id: 2, z: 6}]});
+ assertArrayEq({
+ actual: target.find().toArray(),
+ expected: [{_id: 1, c: 1, z: 3}, {_id: 2, a: [4, 5, 6]}]
+ });
})();
// Test that variables using the dotted path can be specified in the 'let' argument and
@@ -580,8 +607,10 @@ target.drop();
target: target.getName(),
updatePipeline: [{$set: {z: {$pow: ["$$x", 2]}}}]
})));
- assertArrayEq(
- {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 4}, {_id: 2, z: 9}]});
+ assertArrayEq({
+ actual: target.find().toArray(),
+ expected: [{_id: 1, c: 1, z: 4}, {_id: 2, a: {b: {c: 3}}}]
+ });
})();
// Test that 'let' variables are referred to the computed document in the aggregation pipeline,
@@ -605,7 +634,7 @@ target.drop();
})));
assertArrayEq({
actual: target.find().toArray(),
- expected: [{_id: 1, c: 1, z: {_id: 1, a: 3}}, {_id: 2, z: {_id: 2, a: 3}}]
+ expected: [{_id: 1, c: 1, z: {_id: 1, a: 3}}, {_id: 2, a: 3}]
});
// Test custom 'let' variables.
@@ -622,6 +651,6 @@ target.drop();
updatePipeline: [{$set: {z: "$$x"}}]
})));
assertArrayEq(
- {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 49}, {_id: 2, z: 9}]});
+ {actual: target.find().toArray(), expected: [{_id: 1, c: 1, z: 49}, {_id: 2, a: 3}]});
})();
}());
diff --git a/jstests/core/update_with_pipeline.js b/jstests/core/update_with_pipeline.js
index 963d72b6592..4f044a19352 100644
--- a/jstests/core/update_with_pipeline.js
+++ b/jstests/core/update_with_pipeline.js
@@ -117,8 +117,62 @@ testUpsertDoesInsert({_id: 1, x: 1}, [{$project: {x: 1}}], {_id: 1, x: 1});
testUpsertDoesInsert({_id: 1, x: 1}, [{$project: {x: "foo"}}], {_id: 1, x: "foo"});
testUpsertDoesInsert({_id: 1, x: 1, y: 1}, [{$unset: ["x"]}], {_id: 1, y: 1});
-// Update fails when invalid stage is specified. This is a sanity check rather than an
-// exhaustive test of all stages.
+// Upsert with 'upsertSupplied' inserts the given document and populates _id from the query.
+assert.commandWorked(db.runCommand({
+ update: coll.getName(),
+ updates: [{
+ q: {_id: "supplied_doc"},
+ u: [{$set: {x: 1}}],
+ upsert: true,
+ upsertSupplied: true,
+ c: {new: {suppliedDoc: true}}
+ }]
+}));
+assert(coll.findOne({_id: "supplied_doc", suppliedDoc: true}));
+
+// Update with 'upsertSupplied:true' fails if 'upsert' is false.
+assert.commandFailedWithCode(db.runCommand({
+ update: coll.getName(),
+ updates: [{
+ q: {_id: "supplied_doc"},
+ u: [{$set: {x: 1}}],
+ upsert: false,
+ upsertSupplied: true,
+ c: {new: {suppliedDoc: true}}
+ }]
+}),
+ ErrorCodes.FailedToParse);
+
+// Upsert with 'upsertSupplied' fails if no constants are provided.
+assert.commandFailedWithCode(db.runCommand({
+ update: coll.getName(),
+ updates: [{q: {_id: "supplied_doc"}, u: [{$set: {x: 1}}], upsert: true, upsertSupplied: true}]
+}),
+ ErrorCodes.FailedToParse);
+
+// Upsert with 'upsertSupplied' fails if constants do not include a field called 'new'.
+assert.commandFailedWithCode(db.runCommand({
+ update: coll.getName(),
+ updates:
+ [{q: {_id: "supplied_doc"}, u: [{$set: {x: 1}}], upsert: true, upsertSupplied: true, c: {}}]
+}),
+ ErrorCodes.FailedToParse);
+
+// Upsert with 'upsertSupplied' fails if c.new is not an object.
+assert.commandFailedWithCode(db.runCommand({
+ update: coll.getName(),
+ updates: [{
+ q: {_id: "supplied_doc"},
+ u: [{$set: {x: 1}}],
+ upsert: true,
+ upsertSupplied: true,
+ c: {new: "string"}
+ }]
+}),
+ ErrorCodes.FailedToParse);
+
+// Update fails when invalid stage is specified. This is a sanity check rather than an exhaustive
+// test of all stages.
assert.commandFailedWithCode(coll.update({x: 1}, [{$match: {x: 1}}]), ErrorCodes.InvalidOptions);
assert.commandFailedWithCode(coll.update({x: 1}, [{$sort: {x: 1}}]), ErrorCodes.InvalidOptions);
assert.commandFailedWithCode(coll.update({x: 1}, [{$facet: {a: [{$match: {x: 1}}]}}]),
diff --git a/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js b/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js
new file mode 100644
index 00000000000..25d4433b25c
--- /dev/null
+++ b/jstests/multiVersion/agg_merge_upsert_supplied_cluster.js
@@ -0,0 +1,226 @@
+/**
+ * Tests that $merge with {whenMatched: [], whenNotMatched: 'insert'} is handled correctly during
+ * upgrade from and downgrade to a pre-backport version of 4.2 on a sharded cluster.
+ */
+(function() {
+"use strict";
+
+load("jstests/multiVersion/libs/causal_consistency_helpers.js"); // supportsMajorityReadConcern
+load("jstests/multiVersion/libs/multi_cluster.js"); // upgradeCluster
+load("jstests/multiVersion/libs/multi_rs.js"); // upgradeSet
+
+// The UUID consistency check can hit NotMasterNoSlaveOk when it attempts to obtain a list of
+// collections from the shard Primaries through mongoS at the end of this test.
+TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+
+if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+}
+
+const preBackport42Version = "4.2.1";
+const latestVersion = "latest";
+
+const st = new ShardingTest({
+ shards: 2,
+ mongos: 1,
+ rs: {nodes: 3},
+ other: {
+ mongosOptions: {binVersion: preBackport42Version},
+ configOptions: {binVersion: preBackport42Version},
+ rsOptions: {binVersion: preBackport42Version},
+ }
+});
+
+// Obtain references to the test database, the source and target collections.
+let mongosDB = st.s.getDB(jsTestName());
+let sourceSharded = mongosDB.source_coll_sharded;
+let targetSharded = mongosDB.target_coll_sharded;
+let sourceUnsharded = mongosDB.source_coll_unsharded;
+let targetUnsharded = mongosDB.target_coll_unsharded;
+
+// Updates the specified cluster components and then refreshes our references to each of them.
+function refreshCluster(version, components, singleShard) {
+ // Default to only upgrading the explicitly specified components.
+ const defaultComponents = {upgradeMongos: false, upgradeShards: false, upgradeConfigs: false};
+ components = Object.assign(defaultComponents, components);
+
+ if (singleShard) {
+ singleShard.upgradeSet({binVersion: version});
+ } else {
+ st.upgradeCluster(version, components);
+ }
+
+ // Wait for the config server and shards to become available, and restart mongoS.
+ st.configRS.awaitSecondaryNodes();
+ st.rs0.awaitSecondaryNodes();
+ st.rs1.awaitSecondaryNodes();
+ st.restartMongoses();
+
+ // Having upgraded the cluster, reacquire references to each component.
+ mongosDB = st.s.getDB(jsTestName());
+ sourceSharded = mongosDB.source_coll_sharded;
+ targetSharded = mongosDB.target_coll_sharded;
+ sourceUnsharded = mongosDB.source_coll_unsharded;
+ targetUnsharded = mongosDB.target_coll_unsharded;
+}
+
+// Run the aggregation and swallow applicable exceptions for as long as we receive them, up to the
+// assert.soon timeout. This is necessary because there is a period after one shard's Primary steps
+// down during upgrade where a $merge on the other shard may still target the previous Primary.
+function tryWhileNotMaster(sourceColl, targetColl, pipeline, options) {
+ assert.soon(() => {
+ const aggCmdParams = Object.assign({pipeline: pipeline, cursor: {}}, options);
+ const cmdRes = sourceColl.runCommand("aggregate", aggCmdParams);
+ if (cmdRes.ok) {
+ return true;
+ }
+ // The only errors we are prepared to swallow are ErrorCodes.NotMaster and CursorNotFound.
+ // The latter can be thrown as a consequence of a NotMaster on one shard when the $merge
+ // stage is dispatched to a merging shard as part of the latter half of the pipeline.
+ const errorsToSwallow = [ErrorCodes.NotMaster, ErrorCodes.CursorNotFound];
+ assert(errorsToSwallow.includes(cmdRes.code), () => tojson(cmdRes));
+ // TODO SERVER-43851: this may be susceptible to zombie writes. Ditto for all other
+ // occurrences of remove({}) throughout this test.
+ assert.commandWorked(targetColl.remove({}));
+ return false;
+ });
+}
+
+// Enable sharding on the the test database and ensure that the primary is shard0.
+assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+// Shard the source collection on {_id: 1}, split across the shards at {_id: 0}.
+st.shardColl(sourceSharded, {_id: 1}, {_id: 0}, {_id: 1});
+
+// Shard the target collection on {_id: "hashed"}, so that the target shard for each document will
+// not necessarily be the same as the source shard.
+st.shardColl(targetSharded, {_id: "hashed"}, false, false);
+
+// Insert an identical set of test data into both the sharded and unsharded source collections. In
+// the former case, the documents are spread across both shards.
+for (let i = -20; i < 20; ++i) {
+ assert.commandWorked(sourceSharded.insert({_id: i}));
+ assert.commandWorked(sourceUnsharded.insert({_id: i}));
+}
+
+// Define a series of test cases covering all $merge distributed planning scenarios.
+const testCases = [
+ // $merge from unsharded to unsharded, passthrough from mongoS and write locally.
+ {
+ sourceColl: () => sourceUnsharded,
+ targetColl: () => targetUnsharded,
+ preMergePipeline: [],
+ allowDiskUse: false,
+ disableExchange: false
+ },
+ // $merge from unsharded to sharded, passthrough from mongoS and write cross-shard.
+ {
+ sourceColl: () => sourceUnsharded,
+ targetColl: () => targetSharded,
+ preMergePipeline: [],
+ allowDiskUse: false,
+ disableExchange: false
+ },
+ // $merge from sharded to sharded, writes from shard to shard in parallel.
+ {
+ sourceColl: () => sourceSharded,
+ targetColl: () => targetSharded,
+ preMergePipeline: [],
+ allowDiskUse: false,
+ disableExchange: false
+ },
+ // $group with exchange, sends input documents to relevant shard and $merges locally.
+ {
+ sourceColl: () => sourceSharded,
+ targetColl: () => targetSharded,
+ preMergePipeline: [{$group: {_id: "$_id"}}],
+ allowDiskUse: false,
+ disableExchange: false
+ },
+ // $group, exchange prohibited, $merge is executed on mongoS.
+ {
+ sourceColl: () => sourceSharded,
+ targetColl: () => targetSharded,
+ preMergePipeline: [{$group: {_id: "$_id"}}],
+ allowDiskUse: false,
+ disableExchange: true
+ },
+ // $group, exchange prohibited, $merge sent to single shard and writes cross-shard.
+ {
+ sourceColl: () => sourceSharded,
+ targetColl: () => targetSharded,
+ preMergePipeline: [{$group: {_id: "$_id"}}],
+ allowDiskUse: true,
+ disableExchange: true
+ },
+];
+
+// The 'whenMatched' pipeline to apply as part of the $merge. When the old 4.2.1 behaviour is in
+// effect, output documents will all have an _id field and the field added by this pipeline.
+const mergePipe = [{$addFields: {docWasGeneratedFromWhenMatchedPipeline: true}}];
+
+// Generate the array of output documents we expect to see under the old upsert behaviour.
+const expectedOldBehaviourOutput = Array.from(sourceSharded.find().toArray(), (doc) => {
+ return {_id: doc._id, docWasGeneratedFromWhenMatchedPipeline: true};
+});
+
+for (let testCaseNum = 0; testCaseNum < testCases.length; ++testCaseNum) {
+ // Perform initial test-case setup. Disable the exchange optimization if appropriate.
+ const testCase = testCases[testCaseNum];
+ assert.commandWorked(mongosDB.adminCommand(
+ {setParameter: 1, internalQueryDisableExchange: testCase.disableExchange}));
+
+ // Construct the options object that will be supplied along with the pipeline.
+ const aggOptions = {allowDiskUse: testCase.allowDiskUse};
+
+ // Construct the final pipeline by appending $merge to the the testCase's preMergePipeline.
+ const finalPipeline = testCase.preMergePipeline.concat([{
+ $merge: {
+ into: testCase.targetColl().getName(),
+ whenMatched: mergePipe,
+ whenNotMatched: "insert"
+ }
+ }]);
+
+ // Run a $merge with the whole cluster on 'preBackport42Version' and confirm that the output
+ // documents are produced using the old upsert behaviour.
+ tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
+ assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput);
+ assert.commandWorked(testCase.targetColl().remove({}));
+
+ // Upgrade a single shard to latest but leave the mongoS on 'preBackport42Version'. The upgraded
+ // shard continues to produce upsert requests that are compatible with the pre-backport shards.
+ refreshCluster(latestVersion, null, st.rs1);
+ tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
+ assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput);
+ assert.commandWorked(testCase.targetColl().remove({}));
+
+ // Upgrade the configs and the remaining shard to latest but leave mongoS on pre-backport 4.2.
+ // The shards continue to produce upsert requests that use the pre-backport behaviour. This is
+ // to ensure that the pipeline produces the same behaviour regardless of whether $merge is
+ // pushed down to the shards or run on the mongoS itself.
+ refreshCluster(latestVersion, {upgradeShards: true, upgradeConfigs: true});
+ tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
+ assert.sameMembers(testCase.targetColl().find().toArray(), expectedOldBehaviourOutput);
+ assert.commandWorked(testCase.targetColl().remove({}));
+
+ // Upgrade the mongoS to latest. We should now see that the $merge adopts the new behaviour, and
+ // inserts the exact source document rather than generating one from the whenMatched pipeline.
+ refreshCluster(latestVersion, {upgradeMongos: true});
+ tryWhileNotMaster(testCase.sourceColl(), testCase.targetColl(), finalPipeline, aggOptions);
+ assert.sameMembers(testCase.targetColl().find().toArray(),
+ testCase.sourceColl().find().toArray());
+ assert.commandWorked(testCase.targetColl().remove({}));
+
+ // Finally, downgrade the cluster to pre-backport 4.2 in preparation for the next test case. No
+ // need to do this after the final test, as it will simply extend the runtime for no reason.
+ if (testCaseNum < testCases.length - 1) {
+ refreshCluster(preBackport42Version,
+ {upgradeMongos: true, upgradeShards: true, upgradeConfigs: true});
+ }
+}
+
+st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/multiVersion/agg_merge_upsert_supplied_replset.js b/jstests/multiVersion/agg_merge_upsert_supplied_replset.js
new file mode 100644
index 00000000000..2ca233e8e48
--- /dev/null
+++ b/jstests/multiVersion/agg_merge_upsert_supplied_replset.js
@@ -0,0 +1,90 @@
+/**
+ * Tests that $merge with {whenMatched: [], whenNotMatched: 'insert'} is handled correctly during
+ * upgrade from and downgrade to a pre-backport version of 4.2 on a single replica set.
+ */
+(function() {
+"use strict";
+
+load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet.
+load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+const preBackport42Version = "4.2.1";
+const latestVersion = "latest";
+
+const rst = new ReplSetTest({
+ nodes: 3,
+ nodeOptions: {binVersion: preBackport42Version},
+});
+if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+}
+rst.initiate();
+
+// Obtain references to the test database and create the test collection.
+let testDB = rst.getPrimary().getDB(jsTestName());
+let sourceColl = testDB.source_coll;
+let targetColl = testDB.target_coll;
+
+// Up- or downgrades the replset and then refreshes our references to the test collection.
+function refreshReplSet(version, secondariesOnly) {
+ // Upgrade the set and wait for it to become available again.
+ if (secondariesOnly) {
+ rst.upgradeSecondaries(rst.getPrimary(), {binVersion: version});
+ } else {
+ rst.upgradeSet({binVersion: version});
+ }
+ rst.awaitSecondaryNodes();
+
+ // Having upgraded the set, reacquire references to the db and collection.
+ testDB = rst.getPrimary().getDB(jsTestName());
+ sourceColl = testDB.source_coll;
+ targetColl = testDB.target_coll;
+}
+
+// Insert a set of test data.
+for (let i = -20; i < 20; ++i) {
+ assert.commandWorked(sourceColl.insert({_id: i}));
+}
+
+// The 'whenMatched' pipeline to apply as part of the $merge. When the old 4.2.1 behaviour is in
+// effect, output documents will all have an _id field and the field added by this pipeline.
+const mergePipe = [{$addFields: {docWasGeneratedFromWhenMatchedPipeline: true}}];
+
+// Generate the array of output documents we expect to see under the old upsert behaviour.
+const expectedOldBehaviourOutput = Array.from(sourceColl.find().toArray(), (doc) => {
+ return {_id: doc._id, docWasGeneratedFromWhenMatchedPipeline: true};
+});
+
+// The pipeline to run for each test. Results in different output depending on upsert mode used.
+const finalPipeline =
+ [{$merge: {into: targetColl.getName(), whenMatched: mergePipe, whenNotMatched: "insert"}}];
+
+// Run a $merge with the whole cluster on 'preBackport42Version' and confirm that the output
+// documents are produced using the old upsert behaviour.
+sourceColl.aggregate(finalPipeline);
+assert.sameMembers(targetColl.find().toArray(), expectedOldBehaviourOutput);
+assert.commandWorked(targetColl.remove({}));
+
+// Upgrade the Secondaries but leave the Primary on 'preBackport42Version'. The set continues to
+// produce output documents using the old upsert behaviour.
+refreshReplSet(latestVersion, true);
+sourceColl.aggregate(finalPipeline);
+assert.sameMembers(targetColl.find().toArray(), expectedOldBehaviourOutput);
+assert.commandWorked(targetColl.remove({}));
+
+// Since we cannot run $merge on a Secondary, we cannot end up in a situation where an upgraded
+// Secondary issues an 'upsertSupplied' request to the pre-backport Primary.
+assert.throws(
+ () => rst.getSecondaries()[0].getCollection(sourceColl.getFullName()).aggregate(finalPipeline));
+
+// Upgrade the Primary to latest. We should now see that the $merge adopts the new behaviour, and
+// inserts the exact source document rather than generating one from the whenMatched pipeline.
+refreshReplSet(latestVersion);
+sourceColl.aggregate(finalPipeline);
+assert.sameMembers(targetColl.find().toArray(), sourceColl.find().toArray());
+assert.commandWorked(targetColl.remove({}));
+
+rst.stopSet();
+})(); \ No newline at end of file
diff --git a/jstests/sharding/upsert_sharded.js b/jstests/sharding/upsert_sharded.js
index 32a59b9a586..41b873db084 100644
--- a/jstests/sharding/upsert_sharded.js
+++ b/jstests/sharding/upsert_sharded.js
@@ -5,33 +5,46 @@
(function() {
'use strict';
-var st = new ShardingTest({shards: 2, mongos: 1});
+const st = new ShardingTest({shards: 2, mongos: 1});
-var mongos = st.s0;
-var admin = mongos.getDB("admin");
-var coll = mongos.getCollection("foo.bar");
+const mongos = st.s0;
+const admin = mongos.getDB("admin");
+const coll = mongos.getCollection("foo.bar");
assert(admin.runCommand({enableSharding: coll.getDB() + ""}).ok);
st.ensurePrimaryShard(coll.getDB().getName(), st.shard1.shardName);
-var upsertedResult = function(query, expr) {
- coll.remove({});
- return coll.update(query, expr, {upsert: true});
+const upsertSuppliedResult = function(upsertColl, query, newDoc) {
+ assert.commandWorked(upsertColl.remove({}));
+ return coll.runCommand({
+ update: coll.getName(),
+ updates: [{
+ q: query,
+ u: [{$addFields: {unused: true}}],
+ c: {new: newDoc},
+ upsert: true,
+ upsertSupplied: true
+ }]
+ });
};
-var upsertedField = function(query, expr, fieldName) {
- assert.writeOK(upsertedResult(query, expr));
- return coll.findOne()[fieldName];
+const upsertedResult = function(upsertColl, query, expr) {
+ assert.commandWorked(upsertColl.remove({}));
+ return upsertColl.update(query, expr, {upsert: true});
};
-var upsertedId = function(query, expr) {
- return upsertedField(query, expr, "_id");
+const upsertedField = function(upsertColl, query, expr, fieldName) {
+ assert.commandWorked(upsertedResult(upsertColl, query, expr));
+ return upsertColl.findOne()[fieldName];
};
-var upsertedXVal = function(query, expr) {
- return upsertedField(query, expr, "x");
+const upsertedXVal = function(upsertColl, query, expr) {
+ return upsertedField(upsertColl, query, expr, "x");
};
+//
+// Tests for non-nested shard key.
+//
st.ensurePrimaryShard(coll.getDB() + "", st.shard0.shardName);
assert.commandWorked(admin.runCommand({shardCollection: coll + "", key: {x: 1}}));
assert.commandWorked(admin.runCommand({split: coll + "", middle: {x: 0}}));
@@ -40,44 +53,59 @@ assert.commandWorked(admin.runCommand(
st.printShardingStatus();
-// upserted update replacement would result in no shard key
-assert.writeError(upsertedResult({x: 1}, {}));
+// Upserted replacement fails if it would result in no shard key.
+assert.commandFailedWithCode(upsertedResult(coll, {x: -1}, {_id: 1}), ErrorCodes.ShardKeyNotFound);
+
+// Upserted with supplied document fails if it would result in no shard key.
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {x: 1}, {_id: 1}), ErrorCodes.NoSuchKey);
+
+// Upserted op style update will propagate shard key by default.
+assert.commandWorked(upsertedResult(coll, {x: -1}, {$set: {_id: 1}}));
+assert.docEq(coll.findOne({}), {_id: 1, x: -1});
-// updates with upsert must contain shard key in query when $op style
-assert.eq(1, upsertedXVal({x: 1}, {$set: {a: 1}}));
-assert.eq(1, upsertedXVal({x: {$eq: 1}}, {$set: {a: 1}}));
-assert.eq(1, upsertedXVal({x: {$all: [1]}}, {$set: {a: 1}}));
-assert.eq(1, upsertedXVal({x: {$in: [1]}}, {$set: {a: 1}}));
-assert.eq(1, upsertedXVal({$and: [{x: {$eq: 1}}]}, {$set: {a: 1}}));
-assert.eq(1, upsertedXVal({$or: [{x: {$eq: 1}}]}, {$set: {a: 1}}));
+// Updates with upsert must contain shard key in query when $op style
+assert.eq(1, upsertedXVal(coll, {x: 1}, {$set: {a: 1}}));
+assert.eq(1, upsertedXVal(coll, {x: {$eq: 1}}, {$set: {a: 1}}));
+assert.eq(1, upsertedXVal(coll, {x: {$all: [1]}}, {$set: {a: 1}}));
+assert.eq(1, upsertedXVal(coll, {x: {$in: [1]}}, {$set: {a: 1}}));
+assert.eq(1, upsertedXVal(coll, {$and: [{x: {$eq: 1}}]}, {$set: {a: 1}}));
+assert.eq(1, upsertedXVal(coll, {$or: [{x: {$eq: 1}}]}, {$set: {a: 1}}));
// Missing shard key in query.
-assert.commandFailedWithCode(upsertedResult({}, {$set: {a: 1, x: 1}}), ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {}, {$set: {a: 1, x: 1}}),
+ ErrorCodes.ShardKeyNotFound);
// Missing equality match on shard key in query.
-assert.commandFailedWithCode(upsertedResult({x: {$gt: 10}}, {$set: {a: 1, x: 5}}),
+assert.commandFailedWithCode(upsertedResult(coll, {x: {$gt: 10}}, {$set: {a: 1, x: 5}}),
ErrorCodes.ShardKeyNotFound);
// Regex shard key value in query is ambigious and cannot be extracted for an equality match.
-assert.commandFailedWithCode(upsertedResult({x: {$eq: /abc*/}}, {$set: {a: 1, x: "regexValue"}}),
- ErrorCodes.ShardKeyNotFound);
-assert.commandFailedWithCode(upsertedResult({x: {$eq: /abc/}}, {$set: {a: 1, x: /abc/}}),
+assert.commandFailedWithCode(
+ upsertedResult(coll, {x: {$eq: /abc*/}}, {$set: {a: 1, x: "regexValue"}}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {x: {$eq: /abc/}}, {$set: {a: 1, x: /abc/}}),
ErrorCodes.ShardKeyNotFound);
// Shard key in query not extractable.
-assert.commandFailedWithCode(upsertedResult({x: undefined}, {$set: {a: 1}}), ErrorCodes.BadValue);
-assert.commandFailedWithCode(upsertedResult({x: [1, 2]}, {$set: {a: 1}}),
+assert.commandFailedWithCode(upsertedResult(coll, {x: undefined}, {$set: {a: 1}}),
+ ErrorCodes.BadValue);
+assert.commandFailedWithCode(upsertedResult(coll, {x: [1, 2]}, {$set: {a: 1}}),
ErrorCodes.ShardKeyNotFound);
-assert.commandFailedWithCode(upsertedResult({x: {$eq: {$gt: 5}}}, {$set: {a: 1}}),
+assert.commandFailedWithCode(upsertedResult(coll, {x: {$eq: {$gt: 5}}}, {$set: {a: 1}}),
ErrorCodes.ShardKeyNotFound);
-// nested field extraction always fails with non-nested key - like _id, we require setting the
+// Nested field extraction always fails with non-nested key - like _id, we require setting the
// elements directly
-assert.writeError(upsertedResult({"x.x": 1}, {$set: {a: 1}}));
-assert.writeError(upsertedResult({"x.x": {$eq: 1}}, {$set: {a: 1}}));
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x": 1}, {$set: {a: 1}}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x": {$eq: 1}}, {$set: {a: 1}}),
+ ErrorCodes.ShardKeyNotFound);
coll.drop();
+//
+// Tests for nested shard key.
+//
st.ensurePrimaryShard(coll.getDB() + "", st.shard0.shardName);
assert.commandWorked(admin.runCommand({shardCollection: coll + "", key: {'x.x': 1}}));
assert.commandWorked(admin.runCommand({split: coll + "", middle: {'x.x': 0}}));
@@ -86,24 +114,160 @@ assert.commandWorked(admin.runCommand(
st.printShardingStatus();
-// nested field extraction with nested shard key
-assert.docEq({x: 1}, upsertedXVal({"x.x": 1}, {$set: {a: 1}}));
-assert.docEq({x: 1}, upsertedXVal({"x.x": {$eq: 1}}, {$set: {a: 1}}));
-assert.docEq({x: 1}, upsertedXVal({"x.x": {$all: [1]}}, {$set: {a: 1}}));
-assert.docEq({x: 1}, upsertedXVal({$and: [{"x.x": {$eq: 1}}]}, {$set: {a: 1}}));
-assert.docEq({x: 1}, upsertedXVal({$or: [{"x.x": {$eq: 1}}]}, {$set: {a: 1}}));
+// Upserted replacement update fails if it result in no shard key with nested shard key.
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {_id: 1}),
+ ErrorCodes.ShardKeyNotFound);
+
+// Upserted with supplied document fails if it result in no shard key with nested shard key.
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {"x.x": -1}, {_id: 1}),
+ ErrorCodes.NoSuchKey);
+
+// Upserted op style update will propagate shard key by default with nested shard key.
+assert.commandWorked(upsertedResult(coll, {"x.x": -1}, {$set: {_id: 1}}));
+assert.docEq(coll.findOne({}), {_id: 1, x: {x: -1}});
+
+// Nested field extraction with nested shard key
+assert.docEq({x: 1}, upsertedXVal(coll, {"x.x": 1}, {$set: {a: 1}}));
+assert.docEq({x: 1}, upsertedXVal(coll, {"x.x": {$eq: 1}}, {$set: {a: 1}}));
+assert.docEq({x: 1}, upsertedXVal(coll, {"x.x": {$all: [1]}}, {$set: {a: 1}}));
+assert.docEq({x: 1}, upsertedXVal(coll, {$and: [{"x.x": {$eq: 1}}]}, {$set: {a: 1}}));
+assert.docEq({x: 1}, upsertedXVal(coll, {$or: [{"x.x": {$eq: 1}}]}, {$set: {a: 1}}));
// Can specify siblings of nested shard keys
-assert.docEq({x: 1, y: 1}, upsertedXVal({"x.x": 1, "x.y": 1}, {$set: {a: 1}}));
-assert.docEq({x: 1, y: {z: 1}}, upsertedXVal({"x.x": 1, "x.y.z": 1}, {$set: {a: 1}}));
+assert.docEq({x: 1, y: 1}, upsertedXVal(coll, {"x.x": 1, "x.y": 1}, {$set: {a: 1}}));
+assert.docEq({x: 1, y: {z: 1}}, upsertedXVal(coll, {"x.x": 1, "x.y.z": 1}, {$set: {a: 1}}));
+
+// No arrays at any level for targeting.
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x": []}, {$set: {a: 1}}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {x: {x: []}}, {$set: {a: 1}}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {x: [{x: 1}]}, {$set: {a: 1}}),
+ ErrorCodes.ShardKeyNotFound);
+
+// No arrays at any level for document insertion for replacement, supplied, and op updates.
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {$set: {x: {x: []}}}),
+ ErrorCodes.NotSingleValueField);
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {$set: {x: [{x: 1}]}}),
+ ErrorCodes.NotSingleValueField);
+
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {"x.x": -1}, {x: {x: []}}),
+ ErrorCodes.NotSingleValueField);
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {"x.x": -1}, {x: [{x: 1}]}),
+ ErrorCodes.NotSingleValueField);
-// No arrays at any level
-assert.writeError(upsertedResult({"x.x": []}, {$set: {a: 1}}));
-assert.writeError(upsertedResult({x: {x: []}}, {$set: {a: 1}}));
-assert.writeError(upsertedResult({x: [{x: 1}]}, {$set: {a: 1}}));
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {x: {x: []}}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x": -1}, {x: [{x: 1}]}),
+ ErrorCodes.ShardKeyNotFound);
// Can't set sub-fields of nested key
-assert.writeError(upsertedResult({"x.x.x": {$eq: 1}}, {$set: {a: 1}}));
+assert.commandFailedWithCode(upsertedResult(coll, {"x.x.x": {$eq: 1}}, {$set: {a: 1}}),
+ ErrorCodes.ShardKeyNotFound);
+
+coll.drop();
+
+//
+// Tests for nested _id shard key.
+//
+st.ensurePrimaryShard(coll.getDB() + "", st.shard0.shardName);
+assert.commandWorked(admin.runCommand({shardCollection: coll + "", key: {'_id.x': 1}}));
+assert.commandWorked(admin.runCommand({split: coll + "", middle: {'_id.x': 0}}));
+assert.commandWorked(admin.runCommand(
+ {moveChunk: coll + "", find: {'_id.x': 0}, to: st.shard1.shardName, _waitForDelete: true}));
+
+st.printShardingStatus();
+
+// No upsert type can result in a missing shard key for nested _id key.
+assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1}}, {}), ErrorCodes.ShardKeyNotFound);
+
+assert.commandWorked(upsertSuppliedResult(coll, {_id: {x: -1}}, {}));
+assert.docEq(coll.findOne({}), {_id: {x: -1}});
+
+assert.commandWorked(upsertedResult(coll, {_id: {x: -1}}, {$set: {y: 1}}));
+assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1});
+
+assert.commandFailedWithCode(
+ upsertedResult(coll, {_id: {x: -1}}, {$set: {y: 1}, $unset: {"_id.x": 1}}),
+ ErrorCodes.ImmutableField);
+
+// All update types can re-state shard key for nested _id key.
+assert.commandWorked(upsertedResult(coll, {_id: {x: -1}}, {_id: {x: -1}, y: 1}));
+assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1});
+
+assert.commandWorked(upsertSuppliedResult(coll, {_id: {x: -1}}, {_id: {x: -1}, y: 1}));
+assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1});
+
+assert.commandWorked(upsertedResult(coll, {_id: {x: -1}}, {$set: {_id: {x: -1}, y: 1}}));
+assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1});
+
+assert.commandWorked(upsertedResult(coll, {_id: {x: -1}}, {$set: {"_id.x": -1, y: 1}}));
+assert.docEq(coll.findOne({}), {_id: {x: -1}, y: 1});
+
+// No upsert type can modify shard key for nested _id key.
+assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1}}, {_id: {x: -2}}),
+ ErrorCodes.ImmutableField);
+
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: {x: -1}}, {_id: {x: -2}}),
+ ErrorCodes.ImmutableField);
+
+assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1}}, {$set: {_id: {x: -2}}}),
+ ErrorCodes.ImmutableField);
+
+// No upsert type can add new _id subfield for nested _id key.
+assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1}}, {_id: {x: -1, y: -1}}),
+ ErrorCodes.ImmutableField);
+
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: {x: -1}}, {_id: {x: -1, y: -1}}),
+ ErrorCodes.ImmutableField);
+
+assert.commandFailedWithCode(
+ upsertedResult(coll, {_id: {x: -1}}, {$set: {"_id.x": -1, "_id.y": -1}}),
+ ErrorCodes.ImmutableField);
+
+// No upsert type can remove non-shardkey _id subfield for nested _id key.
+assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1, y: -1}}, {_id: {x: -1}}),
+ ErrorCodes.ImmutableField);
+
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: {x: -1, y: -1}}, {_id: {x: -1}}),
+ ErrorCodes.ImmutableField);
+
+assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: -1, y: -1}}, {$unset: {"_id.y": 1}}),
+ ErrorCodes.ImmutableField);
+
+// No upsert type can set array element for nested _id key.
+assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: [1]}}, {}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {"_id.x": [1]}, {}), ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {_id: [{x: 1}]}, {}),
+ ErrorCodes.ShardKeyNotFound);
+
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: {x: [1]}}, {}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {"_id.x": [1]}, {}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertSuppliedResult(coll, {_id: [{x: 1}]}, {}),
+ ErrorCodes.ShardKeyNotFound);
+
+assert.commandFailedWithCode(upsertedResult(coll, {_id: {x: [1]}}, {$set: {y: 1}}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {"_id.x": [1]}, {$set: {y: 1}}),
+ ErrorCodes.ShardKeyNotFound);
+assert.commandFailedWithCode(upsertedResult(coll, {_id: [{x: 1}]}, {$set: {y: 1}}),
+ ErrorCodes.ShardKeyNotFound);
+
+// Replacement and op-style {$set _id} fail when using dotted-path query on nested _id key.
+assert.commandFailedWithCode(upsertedResult(coll, {"_id.x": -1}, {_id: {x: -1}}),
+ ErrorCodes.NotExactValueField);
+
+assert.commandWorked(upsertSuppliedResult(coll, {"_id.x": -1}, {_id: {x: -1}}));
+assert.docEq(coll.findOne({}), {_id: {x: -1}});
+
+assert.commandFailedWithCode(upsertedResult(coll, {"_id.x": -1}, {$set: {_id: {x: -1}}}),
+ ErrorCodes.ImmutableField);
+
+assert.commandWorked(upsertedResult(coll, {"_id.x": -1}, {$set: {"_id.x": -1}}));
+assert.docEq(coll.findOne({}), {_id: {x: -1}});
st.stop();
})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index c473d43c3d5..1bb1d5ff863 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1239,6 +1239,7 @@ env.Library(
'exec/text_or.cpp',
'exec/trial_stage.cpp',
'exec/update_stage.cpp',
+ 'exec/upsert_stage.cpp',
'exec/working_set_common.cpp',
'exec/write_stage_common.cpp',
'ops/parsed_delete.cpp',
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index 422b19f6be0..7743d73d695 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -368,6 +368,7 @@ private:
updateRequest.setArrayFilters(write_ops::arrayFiltersOf(_batch.getUpdates()[0]));
updateRequest.setMulti(_batch.getUpdates()[0].getMulti());
updateRequest.setUpsert(_batch.getUpdates()[0].getUpsert());
+ updateRequest.setUpsertSuppliedDocument(_batch.getUpdates()[0].getUpsertSupplied());
updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
updateRequest.setHint(_batch.getUpdates()[0].getHint());
updateRequest.setExplain();
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index a7b307bf49c..10696c6cb65 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -39,7 +39,6 @@
#include "mongo/bson/bson_comparator_interface_base.h"
#include "mongo/bson/mutable/algorithm.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/curop_failpoint_helpers.h"
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/exec/write_stage_common.h"
@@ -57,11 +56,9 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
-#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h"
namespace mongo {
-MONGO_FAIL_POINT_DEFINE(hangBeforeUpsertPerformsInsert);
MONGO_FAIL_POINT_DEFINE(hangBeforeThrowWouldChangeOwningShard);
using std::string;
@@ -76,59 +73,13 @@ namespace {
const char idFieldName[] = "_id";
const FieldRef idFieldRef(idFieldName);
-Status ensureIdFieldIsFirst(mb::Document* doc) {
- mb::Element idElem = mb::findFirstChildNamed(doc->root(), idFieldName);
-
- if (!idElem.ok()) {
- return {ErrorCodes::InvalidIdField, "_id field is missing"};
- }
-
- if (idElem.leftSibling().ok()) {
- // Move '_id' to be the first element
- Status s = idElem.remove();
- if (!s.isOK())
- return s;
- s = doc->root().pushFront(idElem);
- if (!s.isOK())
- return s;
- }
-
- return Status::OK();
-}
-
void addObjectIDIdField(mb::Document* doc) {
const auto idElem = doc->makeElementNewOID(idFieldName);
- if (!idElem.ok())
- uasserted(17268, "Could not create new ObjectId '_id' field.");
-
+ uassert(17268, "Could not create new ObjectId '_id' field.", idElem.ok());
uassertStatusOK(doc->root().pushFront(idElem));
}
/**
- * Uasserts if any of the paths in 'requiredPaths' are not present in 'document', or if they are
- * arrays or array descendants.
- */
-void assertRequiredPathsPresent(const mb::Document& document, const FieldRefSet& requiredPaths) {
- for (const auto& path : requiredPaths) {
- auto elem = document.root();
- for (size_t i = 0; i < (*path).numParts(); ++i) {
- elem = elem[(*path).getPart(i)];
- uassert(ErrorCodes::NoSuchKey,
- str::stream() << "After applying the update, the new document was missing the "
- "required field '"
- << (*path).dottedField() << "'",
- elem.ok());
- uassert(
- ErrorCodes::NotSingleValueField,
- str::stream() << "After applying the update to the document, the required field '"
- << (*path).dottedField()
- << "' was found to be an array or array descendant.",
- elem.getType() != BSONType::Array);
- }
- }
-}
-
-/**
* Returns true if we should throw a WriteConflictException in order to retry the operation in the
* case of a conflict. Returns false if we should skip the document and keep going.
*/
@@ -158,19 +109,30 @@ const char* UpdateStage::kStageType = "UPDATE";
const UpdateStats UpdateStage::kEmptyUpdateStats;
+// Public constructor.
UpdateStage::UpdateStage(OperationContext* opCtx,
const UpdateStageParams& params,
WorkingSet* ws,
Collection* collection,
PlanStage* child)
+ : UpdateStage(opCtx, params, ws, collection) {
+ // We should never reach here if the request is an upsert.
+ invariant(!_params.request->isUpsert());
+ _children.emplace_back(child);
+}
+
+// Protected constructor.
+UpdateStage::UpdateStage(OperationContext* opCtx,
+ const UpdateStageParams& params,
+ WorkingSet* ws,
+ Collection* collection)
: RequiresMutableCollectionStage(kStageType, opCtx, collection),
_params(params),
_ws(ws),
+ _doc(params.driver->getDocument()),
_idRetrying(WorkingSet::INVALID_ID),
_idReturning(WorkingSet::INVALID_ID),
- _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : NULL),
- _doc(params.driver->getDocument()) {
- _children.emplace_back(child);
+ _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : nullptr) {
// Should the modifiers validate their embedded docs via storage_validation::storageValid()?
// Only user updates should be checked. Any system or replication stuff should pass through.
@@ -226,9 +188,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
if (getOpCtx()->writesAreReplicated() && !request->isFromMigration()) {
if (metadata->isSharded() &&
(!OperationShardingState::isOperationVersioned(getOpCtx()) || !isFCV42)) {
- auto& immutablePathsVector = metadata->getKeyPatternFields();
- immutablePaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(immutablePathsVector));
+ immutablePaths.fillFrom(metadata->getKeyPatternFields());
}
immutablePaths.keepShortest(&idFieldRef);
}
@@ -270,16 +230,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
// neither grow nor shrink).
const auto createIdField = !collection()->isCapped();
- // Ensure if _id exists it is first
- status = ensureIdFieldIsFirst(&_doc);
- if (status.code() == ErrorCodes::InvalidIdField) {
- // Create ObjectId _id field if we are doing that
- if (createIdField) {
- addObjectIDIdField(&_doc);
- }
- } else {
- uassertStatusOK(status);
- }
+ // Ensure _id is first if it exists, and generate a new OID if appropriate.
+ _ensureIdFieldIsFirst(&_doc, createIdField);
// See if the changes were applied in place
const char* source = NULL;
@@ -394,99 +346,6 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
return newObj;
}
-BSONObj UpdateStage::applyUpdateOpsForInsert(OperationContext* opCtx,
- const CanonicalQuery* cq,
- const BSONObj& query,
- UpdateDriver* driver,
- mutablebson::Document* doc,
- bool isInternalRequest,
- const NamespaceString& ns,
- bool enforceOkForStorage,
- UpdateStats* stats) {
- // Since this is an insert (no docs found and upsert:true), we will be logging it
- // as an insert in the oplog. We don't need the driver's help to build the
- // oplog record, then. We also set the context of the update driver to the INSERT_CONTEXT.
- // Some mods may only work in that context (e.g. $setOnInsert).
- driver->setLogOp(false);
-
- auto* const css = CollectionShardingState::get(opCtx, ns);
- auto metadata = css->getCurrentMetadata();
-
- const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42;
-
- FieldRefSet immutablePaths;
- if (metadata->isSharded() &&
- (!OperationShardingState::isOperationVersioned(opCtx) || !isFCV42)) {
- auto& immutablePathsVector = metadata->getKeyPatternFields();
- immutablePaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(immutablePathsVector));
- }
- immutablePaths.keepShortest(&idFieldRef);
-
- if (cq) {
- FieldRefSet requiredPaths;
- if (metadata->isSharded()) {
- const auto& shardKeyPathsVector = metadata->getKeyPatternFields();
- requiredPaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector));
- }
- requiredPaths.keepShortest(&idFieldRef);
- uassertStatusOK(driver->populateDocumentWithQueryFields(*cq, requiredPaths, *doc));
- } else {
- fassert(17354, CanonicalQuery::isSimpleIdQuery(query));
- BSONElement idElt = query[idFieldName];
- fassert(17352, doc->root().appendElement(idElt));
- }
-
- // Apply the update modifications here. Do not validate for storage, since we will validate the
- // entire document after the update. However, we ensure that no immutable fields are updated.
- const bool validateForStorage = false;
- const bool isInsert = true;
- if (isInternalRequest) {
- immutablePaths.clear();
- }
- Status updateStatus =
- driver->update(StringData(), doc, validateForStorage, immutablePaths, isInsert);
- if (!updateStatus.isOK()) {
- uasserted(16836, updateStatus.reason());
- }
-
- // Ensure _id exists and is first
- auto idAndFirstStatus = ensureIdFieldIsFirst(doc);
- if (idAndFirstStatus.code() == ErrorCodes::InvalidIdField) { // _id field is missing
- addObjectIDIdField(doc);
- } else {
- uassertStatusOK(idAndFirstStatus);
- }
-
- // Validate that the object replacement or modifiers resulted in a document
- // that contains all the required keys and can be stored if it isn't coming
- // from a migration or via replication.
- if (!isInternalRequest) {
- if (enforceOkForStorage) {
- storage_validation::storageValid(*doc);
- }
- FieldRefSet requiredPaths;
- if (metadata->isSharded()) {
- const auto& shardKeyPathsVector = metadata->getKeyPatternFields();
- requiredPaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector));
- }
- requiredPaths.keepShortest(&idFieldRef);
- assertRequiredPathsPresent(*doc, requiredPaths);
- }
-
- BSONObj newObj = doc->getObject();
- if (newObj.objsize() > BSONObjMaxUserSize) {
- uasserted(17420,
- str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize);
- }
-
- return newObj;
-}
-
bool UpdateStage::matchContainsOnlyAndedEqualityNodes(const MatchExpression& root) {
if (root.matchType() == MatchExpression::EQ) {
return true;
@@ -563,131 +422,18 @@ bool UpdateStage::shouldRetryDuplicateKeyException(const ParsedUpdate& parsedUpd
return true;
}
-void UpdateStage::doInsert() {
- _specificStats.inserted = true;
-
- const UpdateRequest* request = _params.request;
- bool isInternalRequest = !getOpCtx()->writesAreReplicated() || request->isFromMigration();
-
- // Reset the document we will be writing to.
- _doc.reset();
-
- BSONObj newObj = applyUpdateOpsForInsert(getOpCtx(),
- _params.canonicalQuery,
- request->getQuery(),
- _params.driver,
- &_doc,
- isInternalRequest,
- request->getNamespaceString(),
- _enforceOkForStorage,
- &_specificStats);
-
- _specificStats.objInserted = newObj;
-
- // If this is an explain, bail out now without doing the insert.
- if (request->isExplain()) {
- return;
- }
-
- // If in FCV 4.2 and this collection is sharded, check if the doc we plan to insert belongs to
- // this shard. MongoS uses the query field to target a shard, and it is possible the shard key
- // fields in the 'q' field belong to this shard, but those in the 'u' field do not. In this case
- // we need to throw so that MongoS can target the insert to the correct shard.
- if (_shouldCheckForShardKeyUpdate) {
- const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42;
- auto* const css = CollectionShardingState::get(getOpCtx(), collection()->ns());
- const auto& metadata = css->getCurrentMetadata();
-
- if (isFCV42 && metadata->isSharded()) {
- const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern());
- auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj);
-
- if (!metadata->keyBelongsToMe(newShardKey)) {
- // An attempt to upsert a document with a shard key value that belongs on another
- // shard must either be a retryable write or inside a transaction.
- uassert(ErrorCodes::IllegalOperation,
- "The upsert document could not be inserted onto the shard targeted by the "
- "query, since its shard key belongs on a different shard. Cross-shard "
- "upserts are only allowed when running in a transaction or with "
- "retryWrites: true.",
- getOpCtx()->getTxnNumber());
- uasserted(
- WouldChangeOwningShardInfo(request->getQuery(), newObj, true /* upsert */),
- "The document we are inserting belongs on a different shard");
- }
- }
- }
-
- if (MONGO_FAIL_POINT(hangBeforeUpsertPerformsInsert)) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &hangBeforeUpsertPerformsInsert, getOpCtx(), "hangBeforeUpsertPerformsInsert");
- }
-
- writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] {
- WriteUnitOfWork wunit(getOpCtx());
- uassertStatusOK(collection()->insertDocument(getOpCtx(),
- InsertStatement(request->getStmtId(), newObj),
- _params.opDebug,
- request->isFromMigration()));
-
- // Technically, we should save/restore state here, but since we are going to return
- // immediately after, it would just be wasted work.
- wunit.commit();
- });
-}
-
-bool UpdateStage::doneUpdating() {
+bool UpdateStage::isEOF() {
// We're done updating if either the child has no more results to give us, or we've
// already gotten a result back and we're not a multi-update.
return _idRetrying == WorkingSet::INVALID_ID && _idReturning == WorkingSet::INVALID_ID &&
(child()->isEOF() || (_specificStats.nMatched > 0 && !_params.request->isMulti()));
}
-bool UpdateStage::needInsert() {
- // We need to insert if
- // 1) we haven't inserted already,
- // 2) the child stage returned zero matches, and
- // 3) the user asked for an upsert.
- return !_specificStats.inserted && _specificStats.nMatched == 0 && _params.request->isUpsert();
-}
-
-bool UpdateStage::isEOF() {
- return doneUpdating() && !needInsert();
-}
-
PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
if (isEOF()) {
return PlanStage::IS_EOF;
}
- if (doneUpdating()) {
- // Even if we're done updating, we may have some inserting left to do.
- if (needInsert()) {
-
- doInsert();
-
- invariant(isEOF());
- if (_params.request->shouldReturnNewDocs()) {
- // Want to return the document we just inserted, create it as a WorkingSetMember
- // so that we can return it.
- BSONObj newObj = _specificStats.objInserted;
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- member->obj = Snapshotted<BSONObj>(getOpCtx()->recoveryUnit()->getSnapshotId(),
- newObj.getOwned());
- member->transitionToOwnedObj();
- return PlanStage::ADVANCED;
- }
- }
-
- // At this point either we're done updating and there was no insert to do,
- // or we're done updating and we're done inserting. Either way, we're EOF.
- invariant(isEOF());
- return PlanStage::IS_EOF;
- }
-
// It is possible that after an update was applied, a WriteConflictException
// occurred and prevented us from returning ADVANCED with the requested version
// of the document.
@@ -834,9 +580,8 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
return PlanStage::NEED_TIME;
} else if (PlanStage::IS_EOF == status) {
- // The child is out of results, but we might not be done yet because we still might
- // have to do an insert.
- return PlanStage::NEED_TIME;
+ // The child is out of results, and therefore so are we.
+ return PlanStage::IS_EOF;
} else if (PlanStage::FAILURE == status) {
*out = id;
// If a stage fails, it may create a status WSM to indicate why it failed, in which case
@@ -855,6 +600,40 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
return status;
}
+void UpdateStage::_assertRequiredPathsPresent(const mb::Document& document,
+ const FieldRefSet& requiredPaths) {
+ for (const auto& path : requiredPaths) {
+ auto elem = document.root();
+ for (size_t i = 0; i < (*path).numParts(); ++i) {
+ elem = elem[(*path).getPart(i)];
+ uassert(ErrorCodes::NoSuchKey,
+ str::stream() << "After applying the update, the new document was missing the "
+ "required field '"
+ << (*path).dottedField() << "'",
+ elem.ok());
+ uassert(
+ ErrorCodes::NotSingleValueField,
+ str::stream() << "After applying the update to the document, the required field '"
+ << (*path).dottedField()
+ << "' was found to be an array or array descendant.",
+ elem.getType() != BSONType::Array);
+ }
+ }
+}
+
+void UpdateStage::_ensureIdFieldIsFirst(mb::Document* doc, bool generateOIDIfMissing) {
+ mb::Element idElem = mb::findFirstChildNamed(doc->root(), idFieldName);
+
+ // If the document has no _id and the caller has requested that we generate one, do so.
+ if (!idElem.ok() && generateOIDIfMissing) {
+ addObjectIDIdField(doc);
+ } else if (idElem.ok() && idElem.leftSibling().ok()) {
+ // If the document does have an _id but it is not the first element, move it to the front.
+ uassertStatusOK(idElem.remove());
+ uassertStatusOK(doc->root().pushFront(idElem));
+ }
+}
+
void UpdateStage::doRestoreStateRequiresCollection() {
const UpdateRequest& request = *_params.request;
const NamespaceString& nsString(request.getNamespaceString());
@@ -947,13 +726,11 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(ScopedCollectionMetadata meta
return false;
}
- FieldRefSet shardKeyPaths;
- const auto& shardKeyPathsVector = metadata->getKeyPatternFields();
- shardKeyPaths.fillFrom(transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector));
+ FieldRefSet shardKeyPaths(metadata->getKeyPatternFields());
// Assert that the updated doc has all shard key fields and none are arrays or array
// descendants.
- assertRequiredPathsPresent(_doc, shardKeyPaths);
+ _assertRequiredPathsPresent(_doc, shardKeyPaths);
// We do not allow modifying shard key value without specifying the full shard key in the query.
// If the query is a simple equality match on _id, then '_params.canonicalQuery' will be null.
@@ -968,7 +745,7 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(ScopedCollectionMetadata meta
pathsupport::extractFullEqualityMatches(
*(_params.canonicalQuery->root()), shardKeyPaths, &equalities)
.isOK() &&
- equalities.size() == shardKeyPathsVector.size());
+ equalities.size() == metadata->getKeyPatternFields().size());
// We do not allow updates to the shard key when 'multi' is true.
uassert(ErrorCodes::InvalidOptions,
diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h
index 93936166544..f865fb9d162 100644
--- a/src/mongo/db/exec/update_stage.h
+++ b/src/mongo/db/exec/update_stage.h
@@ -69,14 +69,14 @@ private:
};
/**
- * Execution stage responsible for updates to documents and upserts. If the prior or
- * newly-updated version of the document was requested to be returned, then ADVANCED is
- * returned after updating or inserting a document. Otherwise, NEED_TIME is returned after
- * updating or inserting a document.
+ * Execution stage responsible for updates to documents. If the prior or newly-updated version of
+ * the document was requested to be returned, then ADVANCED is returned after updating a document.
+ * Otherwise, NEED_TIME is returned after updating a document if further updates are pending,
+ * and IS_EOF is returned if no documents were found or all updates have been performed.
*
* Callers of doWork() must be holding a write lock.
*/
-class UpdateStage final : public RequiresMutableCollectionStage {
+class UpdateStage : public RequiresMutableCollectionStage {
UpdateStage(const UpdateStage&) = delete;
UpdateStage& operator=(const UpdateStage&) = delete;
@@ -87,8 +87,8 @@ public:
Collection* collection,
PlanStage* child);
- bool isEOF() final;
- StageState doWork(WorkingSetID* out) final;
+ bool isEOF() override;
+ StageState doWork(WorkingSetID* out) override;
StageType stageType() const final {
return STAGE_UPDATE;
@@ -119,32 +119,6 @@ public:
static UpdateResult makeUpdateResult(const UpdateStats* updateStats);
/**
- * Computes the document to insert if the upsert flag is set to true and no matching
- * documents are found in the database. The document to upsert is computing using the
- * query 'cq' and the update mods contained in 'driver'.
- *
- * If 'cq' is NULL, which can happen for the idhack update fast path, then 'query' is
- * used to compute the doc to insert instead of 'cq'.
- *
- * 'doc' is the mutable BSON document which you would like the update driver to use
- * when computing the document to insert.
- *
- * Set 'isInternalRequest' to true if the upsert was issued by the replication or
- * sharding systems.
- *
- * Returns the document to insert.
- */
- static BSONObj applyUpdateOpsForInsert(OperationContext* opCtx,
- const CanonicalQuery* cq,
- const BSONObj& query,
- UpdateDriver* driver,
- mutablebson::Document* doc,
- bool isInternalRequest,
- const NamespaceString& ns,
- bool enforceOkForStorage,
- UpdateStats* stats);
-
- /**
* Returns true if an update failure due to a given DuplicateKey error is eligible for retry.
* Requires that parsedUpdate.hasParsedQuery() is true.
*/
@@ -152,10 +126,38 @@ public:
const DuplicateKeyErrorInfo& errorInfo);
protected:
+ UpdateStage(OperationContext* opCtx,
+ const UpdateStageParams& params,
+ WorkingSet* ws,
+ Collection* collection);
+
void doSaveStateRequiresCollection() final {}
void doRestoreStateRequiresCollection() final;
+ void _ensureIdFieldIsFirst(mutablebson::Document* doc, bool generateOIDIfMissing);
+
+ void _assertRequiredPathsPresent(const mutablebson::Document& document,
+ const FieldRefSet& requiredPaths);
+
+ UpdateStageParams _params;
+
+ // Not owned by us.
+ WorkingSet* _ws;
+
+ // Stats
+ UpdateStats _specificStats;
+
+ // True if the request should be checked for an update to the shard key.
+ bool _shouldCheckForShardKeyUpdate;
+
+ // True if updated documents should be validated with storage_validation::storageValid().
+ bool _enforceOkForStorage;
+
+ // These get reused for each update.
+ mutablebson::Document& _doc;
+ mutablebson::DamageVector _damages;
+
private:
static const UpdateStats kEmptyUpdateStats;
@@ -173,24 +175,6 @@ private:
BSONObj transformAndUpdate(const Snapshotted<BSONObj>& oldObj, RecordId& recordId);
/**
- * Computes the document to insert and inserts it into the collection. Used if the
- * user requested an upsert and no matching documents were found.
- */
- void doInsert();
-
- /**
- * Have we performed all necessary updates? Even if this is true, we might not be EOF,
- * as we might still have to do an insert.
- */
- bool doneUpdating();
-
- /**
- * Examines the stats / update request and returns whether there is still an insert left
- * to do. If so then this stage is not EOF yet.
- */
- bool needInsert();
-
- /**
* Stores 'idToRetry' in '_idRetrying' so the update can be retried during the next call to
* doWork(). Always returns NEED_YIELD and sets 'out' to WorkingSet::INVALID_ID.
*/
@@ -210,26 +194,12 @@ private:
bool checkUpdateChangesShardKeyFields(ScopedCollectionMetadata metadata,
const Snapshotted<BSONObj>& oldObj);
- UpdateStageParams _params;
-
- // Not owned by us.
- WorkingSet* _ws;
-
// If not WorkingSet::INVALID_ID, we use this rather than asking our child what to do next.
WorkingSetID _idRetrying;
// If not WorkingSet::INVALID_ID, we return this member to our caller.
WorkingSetID _idReturning;
- // Stats
- UpdateStats _specificStats;
-
- // True if updated documents should be validated with storage_validation::storageValid().
- bool _enforceOkForStorage;
-
- // True if the request should be checked for an update to the shard key.
- bool _shouldCheckForShardKeyUpdate;
-
// If the update was in-place, we may see it again. This only matters if we're doing
// a multi-update; if we're not doing a multi-update we stop after one update and we
// won't see any more docs.
@@ -244,10 +214,6 @@ private:
// So, no matter what, we keep track of where the doc wound up.
typedef stdx::unordered_set<RecordId, RecordId::Hasher> RecordIdSet;
const std::unique_ptr<RecordIdSet> _updatedRecordIds;
-
- // These get reused for each update.
- mutablebson::Document& _doc;
- mutablebson::DamageVector _damages;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp
new file mode 100644
index 00000000000..dadbaf522f7
--- /dev/null
+++ b/src/mongo/db/exec/upsert_stage.cpp
@@ -0,0 +1,301 @@
+/**
+ * Copyright (C) 2019 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/exec/upsert_stage.h"
+
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop_failpoint_helpers.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/update/storage_validation.h"
+#include "mongo/s/would_change_owning_shard_exception.h"
+
+namespace mongo {
+
+MONGO_FAIL_POINT_DEFINE(hangBeforeUpsertPerformsInsert);
+
+namespace mb = mutablebson;
+
+namespace {
+
+const char idFieldName[] = "_id";
+const FieldRef idFieldRef(idFieldName);
+
+/**
+ * Populates the given FieldRefSets with the fields that are required to be present in the document
+ * and the fields which cannot be changed, respectively.
+ */
+void getRequiredAndImmutablePaths(OperationContext* opCtx,
+ const ScopedCollectionMetadata& metadata,
+ bool isInternalRequest,
+ FieldRefSet* requiredPaths,
+ FieldRefSet* immutablePaths) {
+ // Each document has a set of required paths and a potentially different set of immutable paths.
+ // If the collection is sharded, add the shard key fields to the required paths vector.
+ if (metadata->isSharded()) {
+ requiredPaths->fillFrom(metadata->getKeyPatternFields());
+ }
+ // Add the _id field, replacing any existing paths that are prefixed by _id if present.
+ requiredPaths->keepShortest(&idFieldRef);
+
+ // If this is an internal request, no fields are immutable and we leave 'immutablePaths' empty.
+ if (!isInternalRequest) {
+ // An unversioned request cannot update the shard key, so all required fields are immutable.
+ // The shard key is also immutable if we have not yet upgraded to 4.2 feature compatibility.
+ const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42;
+ if (!OperationShardingState::isOperationVersioned(opCtx) || !isFCV42) {
+ for (auto&& reqPath : *requiredPaths) {
+ immutablePaths->insert(reqPath);
+ }
+ }
+ // The _id field is always immutable to user requests, even if the shard key is mutable.
+ immutablePaths->keepShortest(&idFieldRef);
+ }
+}
+} // namespace
+
+UpsertStage::UpsertStage(OperationContext* opCtx,
+ const UpdateStageParams& params,
+ WorkingSet* ws,
+ Collection* collection,
+ PlanStage* child)
+ : UpdateStage(opCtx, params, ws, collection) {
+ // We should never create this stage for a non-upsert request.
+ invariant(_params.request->isUpsert());
+ _children.emplace_back(child);
+};
+
+// We're done when updating is finished and we have either matched or inserted.
+bool UpsertStage::isEOF() {
+ return UpdateStage::isEOF() && (_specificStats.nMatched > 0 || _specificStats.inserted);
+}
+
+PlanStage::StageState UpsertStage::doWork(WorkingSetID* out) {
+ if (isEOF()) {
+ return StageState::IS_EOF;
+ }
+
+ // First, attempt to perform the update on a matching document.
+ auto updateState = UpdateStage::doWork(out);
+
+ // If the update returned anything other than EOF, just forward it along. There's a chance we
+ // still may find a document to update and will not have to insert anything. If it did return
+ // EOF and we do not need to insert a new document, return EOF immediately here.
+ if (updateState != PlanStage::IS_EOF || isEOF()) {
+ return updateState;
+ }
+
+ // If the update resulted in EOF without matching anything, we must insert a new document.
+ invariant(updateState == PlanStage::IS_EOF && !isEOF());
+
+ // Since this is an insert, we will be logging it as such in the oplog. We don't need the
+ // driver's help to build the oplog record. We also set the 'inserted' stats flag here.
+ _params.driver->setLogOp(false);
+ _specificStats.inserted = true;
+
+ // Determine whether this is a user-initiated or internal request.
+ const bool isInternalRequest =
+ !getOpCtx()->writesAreReplicated() || _params.request->isFromMigration();
+
+ // Generate the new document to be inserted.
+ _specificStats.objInserted = _produceNewDocumentForInsert(isInternalRequest);
+
+ // If this is an explain, skip performing the actual insert.
+ if (!_params.request->isExplain()) {
+ _performInsert(_specificStats.objInserted);
+ }
+
+ // We should always be EOF at this point.
+ invariant(isEOF());
+
+ // If we want to return the document we just inserted, create it as a WorkingSetMember.
+ if (_params.request->shouldReturnNewDocs()) {
+ BSONObj newObj = _specificStats.objInserted;
+ *out = _ws->allocate();
+ WorkingSetMember* member = _ws->get(*out);
+ member->obj =
+ Snapshotted<BSONObj>(getOpCtx()->recoveryUnit()->getSnapshotId(), newObj.getOwned());
+ member->transitionToOwnedObj();
+ return PlanStage::ADVANCED;
+ }
+
+ // If we don't need to return the inserted document, we're done.
+ return PlanStage::IS_EOF;
+}
+
+void UpsertStage::_performInsert(BSONObj newDocument) {
+ // If in FCV 4.2 and this collection is sharded, check if the doc we plan to insert belongs to
+ // this shard. MongoS uses the query field to target a shard, and it is possible the shard key
+ // fields in the 'q' field belong to this shard, but those in the 'u' field do not. In this case
+ // we need to throw so that MongoS can target the insert to the correct shard.
+ if (_shouldCheckForShardKeyUpdate) {
+ auto* const css = CollectionShardingState::get(getOpCtx(), collection()->ns());
+ const auto& metadata = css->getCurrentMetadata();
+
+ const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42;
+
+ if (isFCV42 && metadata->isSharded()) {
+ const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern());
+ auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newDocument);
+
+ if (!metadata->keyBelongsToMe(newShardKey)) {
+ // An attempt to upsert a document with a shard key value that belongs on another
+ // shard must either be a retryable write or inside a transaction.
+ uassert(ErrorCodes::IllegalOperation,
+ "The upsert document could not be inserted onto the shard targeted by the "
+ "query, since its shard key belongs on a different shard. Cross-shard "
+ "upserts are only allowed when running in a transaction or with "
+ "retryWrites: true.",
+ getOpCtx()->getTxnNumber());
+ uasserted(WouldChangeOwningShardInfo(
+ _params.request->getQuery(), newDocument, true /* upsert */),
+ "The document we are inserting belongs on a different shard");
+ }
+ }
+ }
+
+ if (MONGO_unlikely(hangBeforeUpsertPerformsInsert.shouldFail())) {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &hangBeforeUpsertPerformsInsert, getOpCtx(), "hangBeforeUpsertPerformsInsert");
+ }
+
+ writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] {
+ WriteUnitOfWork wunit(getOpCtx());
+ uassertStatusOK(
+ collection()->insertDocument(getOpCtx(),
+ InsertStatement(_params.request->getStmtId(), newDocument),
+ _params.opDebug,
+ _params.request->isFromMigration()));
+
+ // Technically, we should save/restore state here, but since we are going to return
+ // immediately after, it would just be wasted work.
+ wunit.commit();
+ });
+}
+
+BSONObj UpsertStage::_produceNewDocumentForInsert(bool isInternalRequest) {
+ // Obtain the sharding metadata. This will be needed to compute the required paths. The metadata
+ // must remain in scope since it owns the pointers used by 'requiredPaths' and 'immutablePaths'.
+ auto* css = CollectionShardingState::get(getOpCtx(), _params.request->getNamespaceString());
+ auto metadata = css->getCurrentMetadata();
+
+ // Each document has a set of required paths and a potentially different set of immutable paths.
+ FieldRefSet requiredPaths, immutablePaths;
+ getRequiredAndImmutablePaths(
+ getOpCtx(), metadata, isInternalRequest, &requiredPaths, &immutablePaths);
+
+ // Reset the document into which we will be writing.
+ _doc.reset();
+
+ // First: populate the document's required paths with equality predicate values from the query,
+ // if available. This generates the pre-image document that we will run the update against.
+ if (auto* cq = _params.canonicalQuery) {
+ uassertStatusOK(_params.driver->populateDocumentWithQueryFields(*cq, requiredPaths, _doc));
+ } else {
+ fassert(17354, CanonicalQuery::isSimpleIdQuery(_params.request->getQuery()));
+ fassert(17352, _doc.root().appendElement(_params.request->getQuery()[idFieldName]));
+ }
+
+ // Second: run the appropriate document generation strategy over the document to generate the
+ // post-image. If the update operation modifies any of the immutable paths, this will throw.
+ if (_params.request->shouldUpsertSuppliedDocument()) {
+ _generateNewDocumentFromSuppliedDoc(immutablePaths);
+ } else {
+ _generateNewDocumentFromUpdateOp(immutablePaths);
+ }
+
+ // Third: ensure _id is first if it exists, and generate a new OID otherwise.
+ _ensureIdFieldIsFirst(&_doc, true);
+
+ // Fourth: assert that the finished document has all required fields and is valid for storage.
+ _assertDocumentToBeInsertedIsValid(
+ _doc, requiredPaths, isInternalRequest, _enforceOkForStorage);
+
+ // Fifth: validate that the newly-produced document does not exceed the maximum BSON user size.
+ auto newDocument = _doc.getObject();
+ uassert(17420,
+ str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize,
+ newDocument.objsize() <= BSONObjMaxUserSize);
+
+ return newDocument;
+}
+
+void UpsertStage::_generateNewDocumentFromUpdateOp(const FieldRefSet& immutablePaths) {
+ // Use the UpdateModification from the original request to generate a new document by running
+ // the update over the empty (except for fields extracted from the query) document. We do not
+ // validate for storage until later, but we do ensure that no immutable fields are modified.
+ const bool validateForStorage = false;
+ const bool isInsert = true;
+ uassertStatusOK(
+ _params.driver->update({}, &_doc, validateForStorage, immutablePaths, isInsert));
+};
+
+void UpsertStage::_generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths) {
+ // We should never call this method unless the request has a set of update constants.
+ invariant(_params.request->shouldUpsertSuppliedDocument());
+ invariant(_params.request->getUpdateConstants());
+
+ // Extract the supplied document from the constants and validate that it is an object.
+ auto suppliedDocElt = _params.request->getUpdateConstants()->getField("new"_sd);
+ invariant(suppliedDocElt.type() == BSONType::Object);
+ auto suppliedDoc = suppliedDocElt.embeddedObject();
+
+ // The supplied doc is functionally a replacement update. We need a new driver to apply it.
+ UpdateDriver replacementDriver(nullptr);
+
+ // Create a new replacement-style update from the supplied document.
+ replacementDriver.parse({suppliedDoc}, {});
+ replacementDriver.setLogOp(false);
+
+ // We do not validate for storage, as we will validate the full document before inserting.
+ // However, we ensure that no immutable fields are modified.
+ const bool validateForStorage = false;
+ const bool isInsert = true;
+ uassertStatusOK(
+ replacementDriver.update({}, &_doc, validateForStorage, immutablePaths, isInsert));
+}
+
+void UpsertStage::_assertDocumentToBeInsertedIsValid(const mb::Document& document,
+ const FieldRefSet& requiredPaths,
+ bool isInternalRequest,
+ bool enforceOkForStorage) {
+ // For a non-internal operation, we assert that the document contains all required paths, that
+ // no shard key fields have arrays at any point along their paths, and that the document is
+ // valid for storage. Skip all such checks for an internal operation.
+ if (!isInternalRequest) {
+ if (enforceOkForStorage) {
+ storage_validation::storageValid(document);
+ }
+ _assertRequiredPathsPresent(document, requiredPaths);
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/db/exec/upsert_stage.h b/src/mongo/db/exec/upsert_stage.h
new file mode 100644
index 00000000000..fe6572f9d16
--- /dev/null
+++ b/src/mongo/db/exec/upsert_stage.h
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2019 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/update_stage.h"
+
+namespace mongo {
+
+/**
+ * Execution stage for update requests with {upsert:true}. This is a specialized UpdateStage which,
+ * in the event that no documents match the update request's query, generates and inserts a new
+ * document into the collection. All logic related to the insertion phase is implemented by this
+ * class.
+ *
+ * If the prior or newly-updated version of the document was requested to be returned, then ADVANCED
+ * is returned after updating or inserting a document. Otherwise, NEED_TIME is returned after
+ * updating a document if further updates are pending, and IS_EOF is returned if all updates have
+ * been performed or if a document has been inserted.
+ *
+ * Callers of doWork() must be holding a write lock.
+ */
+class UpsertStage final : public UpdateStage {
+ UpsertStage(const UpsertStage&) = delete;
+ UpsertStage& operator=(const UpsertStage&) = delete;
+
+public:
+ UpsertStage(OperationContext* opCtx,
+ const UpdateStageParams& params,
+ WorkingSet* ws,
+ Collection* collection,
+ PlanStage* child);
+
+ bool isEOF() final;
+ StageState doWork(WorkingSetID* out) final;
+
+private:
+ BSONObj _produceNewDocumentForInsert(bool isInternalRequest);
+ void _performInsert(BSONObj newDocument);
+
+ void _generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths);
+ void _generateNewDocumentFromUpdateOp(const FieldRefSet& immutablePaths);
+
+ void _assertDocumentToBeInsertedIsValid(const mutablebson::Document& document,
+ const FieldRefSet& requiredPaths,
+ bool isInternalRequest,
+ bool enforceOkForStorage);
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/field_ref_set.cpp b/src/mongo/db/field_ref_set.cpp
index c55a722b64b..3b98f049448 100644
--- a/src/mongo/db/field_ref_set.cpp
+++ b/src/mongo/db/field_ref_set.cpp
@@ -60,6 +60,14 @@ bool FieldRefSet::FieldRefPtrLessThan::operator()(const FieldRef* l, const Field
FieldRefSet::FieldRefSet() {}
+FieldRefSet::FieldRefSet(const std::vector<std::unique_ptr<FieldRef>>& paths) {
+ fillFrom(paths);
+}
+
+FieldRefSet::FieldRefSet(const vector<const FieldRef*>& paths) {
+ _fieldSet.insert(paths.begin(), paths.end());
+}
+
FieldRefSet::FieldRefSet(const vector<FieldRef*>& paths) {
fillFrom(paths);
}
@@ -105,6 +113,19 @@ void FieldRefSet::fillFrom(const std::vector<FieldRef*>& fields) {
_fieldSet.insert(fields.begin(), fields.end());
}
+void FieldRefSet::fillFrom(const std::vector<std::unique_ptr<FieldRef>>& fields) {
+ dassert(_fieldSet.empty());
+ std::transform(fields.begin(),
+ fields.end(),
+ std::inserter(_fieldSet, _fieldSet.begin()),
+ [](const auto& field) { return field.get(); });
+}
+
+bool FieldRefSet::insertNoConflict(const FieldRef* toInsert) {
+ const FieldRef* conflict;
+ return insert(toInsert, &conflict);
+}
+
bool FieldRefSet::insert(const FieldRef* toInsert, const FieldRef** conflict) {
// We can determine if two fields conflict by checking their common prefix.
//
diff --git a/src/mongo/db/field_ref_set.h b/src/mongo/db/field_ref_set.h
index 63166acbc89..0019bf9b4ba 100644
--- a/src/mongo/db/field_ref_set.h
+++ b/src/mongo/db/field_ref_set.h
@@ -64,6 +64,8 @@ public:
FieldRefSet();
+ FieldRefSet(const std::vector<std::unique_ptr<FieldRef>>& paths);
+ FieldRefSet(const std::vector<const FieldRef*>& paths);
FieldRefSet(const std::vector<FieldRef*>& paths);
/** Returns 'true' if the set is empty */
@@ -89,9 +91,9 @@ public:
}
/**
- * Returns true if the field 'toInsert' can be added in the set without
- * conflicts. Otherwise returns false and fill in '*conflict' with the field 'toInsert'
- * clashed with.
+ * Returns true if the field 'toInsert' was added to the set without conflicts.
+ *
+ * Otherwise, returns false and fills '*conflict' with the field 'toInsert' clashed with.
*
* There is no ownership transfer of 'toInsert'. The caller is responsible for
* maintaining it alive for as long as the FieldRefSet is so. By the same token
@@ -100,13 +102,25 @@ public:
bool insert(const FieldRef* toInsert, const FieldRef** conflict);
/**
- * Fills the set with the supplied FieldRef*s
+ * Returns true if the field 'toInsert' was added to the set without conflicts.
+ */
+ bool insertNoConflict(const FieldRef* toInsert);
+
+ /**
+ * Fills the set with the supplied FieldRef pointers.
*
* Note that *no* conflict resolution occurs here.
*/
void fillFrom(const std::vector<FieldRef*>& fields);
/**
+ * Fills the set with the supplied FieldRefs. Does not take ownership of the managed pointers.
+ *
+ * Note that *no* conflict resolution occurs here.
+ */
+ void fillFrom(const std::vector<std::unique_ptr<FieldRef>>& fields);
+
+ /**
* Replace any existing conflicting FieldRef with the shortest (closest to root) one.
*/
void keepShortest(const FieldRef* toInsert);
diff --git a/src/mongo/db/ops/parsed_update.cpp b/src/mongo/db/ops/parsed_update.cpp
index 07a44c54bd8..aaab7037871 100644
--- a/src/mongo/db/ops/parsed_update.cpp
+++ b/src/mongo/db/ops/parsed_update.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/query_planner_common.h"
@@ -53,6 +54,22 @@ Status ParsedUpdate::parseRequest() {
// of a document during a multi-update.
invariant(!(_request->shouldReturnAnyDocs() && _request->isMulti()));
+ // It is invalid to specify 'upsertSupplied:true' for a non-upsert operation, or if no upsert
+ // document was supplied with the request.
+ if (_request->shouldUpsertSuppliedDocument()) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "cannot specify '"
+ << write_ops::UpdateOpEntry::kUpsertSuppliedFieldName
+ << ": true' for a non-upsert operation",
+ _request->isUpsert());
+ const auto& constants = _request->getUpdateConstants();
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "the parameter '"
+ << write_ops::UpdateOpEntry::kUpsertSuppliedFieldName
+ << "' is set to 'true', but no document was supplied",
+ constants && (*constants)["new"_sd].type() == BSONType::Object);
+ }
+
// It is invalid to request that a ProjectionStage be applied to the UpdateStage if the
// UpdateStage would not return any document.
invariant(_request->getProj().isEmpty() || _request->shouldReturnAnyDocs());
diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h
index f946302c6b9..5fa0a561b7e 100644
--- a/src/mongo/db/ops/update_request.h
+++ b/src/mongo/db/ops/update_request.h
@@ -56,16 +56,7 @@ public:
RETURN_NEW
};
- inline UpdateRequest(const NamespaceString& nsString)
- : _nsString(nsString),
- _god(false),
- _upsert(false),
- _multi(false),
- _fromMigration(false),
- _fromOplogApplication(false),
- _isExplain(false),
- _returnDocs(ReturnDocOption::RETURN_NONE),
- _yieldPolicy(PlanExecutor::NO_YIELD) {}
+ inline UpdateRequest(const NamespaceString& nsString) : _nsString(nsString) {}
const NamespaceString& getNamespaceString() const {
return _nsString;
@@ -154,6 +145,14 @@ public:
return _upsert;
}
+ inline void setUpsertSuppliedDocument(bool value = true) {
+ _upsertSuppliedDocument = value;
+ }
+
+ bool shouldUpsertSuppliedDocument() const {
+ return _upsertSuppliedDocument;
+ }
+
inline void setMulti(bool value = true) {
_multi = value;
}
@@ -306,22 +305,26 @@ private:
// God bypasses _id checking and index generation. It is only used on behalf of system
// updates, never user updates.
- bool _god;
+ bool _god = false;
// True if this should insert if no matching document is found.
- bool _upsert;
+ bool _upsert = false;
+
+ // True if this upsert operation should insert the document supplied as 'c.new' if the query
+ // does not match any documents.
+ bool _upsertSuppliedDocument = false;
// True if this update is allowed to affect more than one document.
- bool _multi;
+ bool _multi = false;
// True if this update is on behalf of a chunk migration.
- bool _fromMigration;
+ bool _fromMigration = false;
// True if this update was triggered by the application of an oplog entry.
- bool _fromOplogApplication;
+ bool _fromOplogApplication = false;
// Whether or not we are requesting an explained update. Explained updates are read-only.
- bool _isExplain;
+ bool _isExplain = false;
// Specifies which version of the documents to return, if any.
//
@@ -335,10 +338,10 @@ private:
//
// This allows findAndModify to execute an update and retrieve the resulting document
// without another query before or after the update.
- ReturnDocOption _returnDocs;
+ ReturnDocOption _returnDocs = ReturnDocOption::RETURN_NONE;
// Whether or not the update should yield. Defaults to NO_YIELD.
- PlanExecutor::YieldPolicy _yieldPolicy;
+ PlanExecutor::YieldPolicy _yieldPolicy = PlanExecutor::NO_YIELD;
};
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl
index 9409aa73041..42f86b1f481 100644
--- a/src/mongo/db/ops/write_ops.idl
+++ b/src/mongo/db/ops/write_ops.idl
@@ -127,6 +127,11 @@ structs:
operation inserts only a single document."
type: bool
default: false
+ upsertSupplied:
+ description: "Only applicable when upsert is true. If set, and if no documents match
+ the query, the update subsystem will insert the document supplied as
+ 'c.new' rather than generating a new document from the update spec."
+ type: optionalBool
collation:
description: "Specifies the collation to use for the operation."
type: object
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 1ecceb4caa8..3ae46dc6146 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -708,6 +708,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext*
request.setArrayFilters(write_ops::arrayFiltersOf(op));
request.setMulti(op.getMulti());
request.setUpsert(op.getUpsert());
+ request.setUpsertSuppliedDocument(op.getUpsertSupplied());
request.setHint(op.getHint());
request.setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 5d550eb7c8b..81c4567e48f 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -190,6 +190,13 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
}
request.setMergeByPBRT(elem.Bool());
+ } else if (fieldName == kUseNewUpsert) {
+ if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << kUseNewUpsert << " must be a boolean, not a "
+ << typeName(elem.type())};
+ }
+ request.setUseNewUpsert(elem.boolean());
} else if (kAllowDiskUseName == fieldName) {
if (storageGlobalParams.readOnly) {
return {ErrorCodes::IllegalOperation,
@@ -329,6 +336,7 @@ Document AggregationRequest::serializeToCommandObj() const {
_writeConcern ? Value(_writeConcern->toBSON()) : Value()},
// Only serialize runtime constants if any were specified.
{kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()},
+ {kUseNewUpsert, _useNewUpsert ? Value(true) : Value()},
};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 944423ba210..b15a06f898c 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -65,6 +65,7 @@ public:
static constexpr StringData kCommentName = "comment"_sd;
static constexpr StringData kExchangeName = "exchange"_sd;
static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd;
+ static constexpr StringData kUseNewUpsert = "useNewUpsert"_sd;
static constexpr long long kDefaultBatchSize = 101;
@@ -238,6 +239,10 @@ public:
return _runtimeConstants;
}
+ bool getUseNewUpsert() const {
+ return _useNewUpsert;
+ }
+
//
// Setters for optional fields.
//
@@ -310,6 +315,10 @@ public:
_runtimeConstants = std::move(runtimeConstants);
}
+ void setUseNewUpsert(bool useNewUpsert) {
+ _useNewUpsert = useNewUpsert;
+ }
+
private:
// Required fields.
const NamespaceString _nss;
@@ -363,5 +372,9 @@ private:
// A document containing runtime constants; i.e. values that do not change once computed (e.g.
// $$NOW).
boost::optional<RuntimeConstants> _runtimeConstants;
+
+ // Indicates whether the aggregation may use the new 'upsertSupplied' mechanism when running
+ // $merge stages. Versions of mongoS from 4.2.2 onwards always set this flag.
+ bool _useNewUpsert = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 23129c98f85..888387a6e45 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -58,6 +58,7 @@ using WhenMatched = MergeStrategyDescriptor::WhenMatched;
using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched;
using BatchTransform = std::function<void(DocumentSourceMerge::BatchedObjects&)>;
using UpdateModification = write_ops::UpdateModification;
+using UpsertType = MongoProcessInterface::UpsertType;
constexpr auto kStageName = DocumentSourceMerge::kStageName;
constexpr auto kDefaultWhenMatched = WhenMatched::kMerge;
@@ -75,12 +76,15 @@ constexpr auto kPipelineInsertMode = MergeMode{WhenMatched::kPipeline, WhenNotMa
constexpr auto kPipelineFailMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kFail};
constexpr auto kPipelineDiscardMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kDiscard};
+const auto kDefaultPipelineLet = BSON("new"
+ << "$$ROOT");
+
/**
* Creates a merge strategy which uses update semantics to perform a merge operation. If
* 'BatchTransform' function is provided, it will be called to transform batched objects before
* passing them to the 'update'.
*/
-MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) {
+MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) {
return [upsert, transform](
const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
if (transform) {
@@ -101,7 +105,7 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) {
* error. If 'BatchTransform' function is provided, it will be called to transform batched objects
* before passing them to the 'update'.
*/
-MergeStrategy makeStrictUpdateStrategy(bool upsert, BatchTransform transform) {
+MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transform) {
return [upsert, transform](
const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
if (transform) {
@@ -168,44 +172,46 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
{kReplaceInsertMode,
{kReplaceInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(true, {})}},
+ makeUpdateStrategy(UpsertType::kGenerateNewDoc, {})}},
// whenMatched: replace, whenNotMatched: fail
{kReplaceFailMode,
- {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(false, {})}},
+ {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(UpsertType::kNone, {})}},
// whenMatched: replace, whenNotMatched: discard
{kReplaceDiscardMode,
- {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(false, {})}},
+ {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}},
// whenMatched: merge, whenNotMatched: insert
{kMergeInsertMode,
{kMergeInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(true, makeUpdateTransform("$set"))}},
+ makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$set"))}},
// whenMatched: merge, whenNotMatched: fail
{kMergeFailMode,
{kMergeFailMode,
{ActionType::update},
- makeStrictUpdateStrategy(false, makeUpdateTransform("$set"))}},
+ makeStrictUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}},
// whenMatched: merge, whenNotMatched: discard
{kMergeDiscardMode,
{kMergeDiscardMode,
{ActionType::update},
- makeUpdateStrategy(false, makeUpdateTransform("$set"))}},
+ makeUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}},
// whenMatched: keepExisting, whenNotMatched: insert
{kKeepExistingInsertMode,
{kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(true, makeUpdateTransform("$setOnInsert"))}},
+ makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$setOnInsert"))}},
// whenMatched: [pipeline], whenNotMatched: insert
{kPipelineInsertMode,
{kPipelineInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(true, {})}},
+ makeUpdateStrategy(UpsertType::kInsertSuppliedDoc, {})}},
// whenMatched: [pipeline], whenNotMatched: fail
{kPipelineFailMode,
- {kPipelineFailMode, {ActionType::update}, makeStrictUpdateStrategy(false, {})}},
+ {kPipelineFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(UpsertType::kNone, {})}},
// whenMatched: [pipeline], whenNotMatched: discard
{kPipelineDiscardMode,
- {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(false, {})}},
+ {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}},
// whenMatched: fail, whenNotMatched: insert
{kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}};
return mergeStrategyDescriptors;
@@ -376,11 +382,16 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
!outputNs.isSpecial());
if (whenMatched == WhenMatched::kPipeline) {
- if (!letVariables) {
- // For custom pipeline-style updates, default the 'let' variables to {new: "$$ROOT"},
- // if the user has omitted the 'let' argument.
- letVariables = BSON("new"
- << "$$ROOT");
+ // If unspecified, 'letVariables' defaults to {new: "$$ROOT"}.
+ letVariables = letVariables.value_or(kDefaultPipelineLet);
+ auto newElt = letVariables->getField("new"_sd);
+ uassert(51273,
+ "'let' may not define a value for the reserved 'new' variable other than '$$ROOT'",
+ !newElt || newElt.valueStringDataSafe() == "$$ROOT"_sd);
+ // If the 'new' variable is missing and this is a {whenNotMatched: "insert"} merge, then the
+ // new document *must* be serialized with the update request. Add it to the let variables.
+ if (!newElt && whenNotMatched == WhenNotMatched::kInsert) {
+ letVariables = letVariables->addField(kDefaultPipelineLet.firstElement());
}
} else {
// Ensure the 'let' argument cannot be used with any other merge modes.
diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp
index dbebf226ced..e97cbbdf51d 100644
--- a/src/mongo/db/pipeline/document_source_merge_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_test.cpp
@@ -759,20 +759,23 @@ TEST_F(DocumentSourceMergeTest, LetVariablesCanOnlyBeUsedWithPipelineMode) {
ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51199);
}
+// We always serialize the default let variables as {new: "$$ROOT"} if omitted.
TEST_F(DocumentSourceMergeTest, SerializeDefaultLetVariable) {
- auto spec =
- BSON("$merge" << BSON("into"
- << "target_collection"
- << "whenMatched" << BSON_ARRAY(BSON("$project" << BSON("x" << 1)))
- << "whenNotMatched"
- << "insert"));
- auto mergeStage = createMergeStage(spec);
- auto serialized = mergeStage->serialize().getDocument();
- ASSERT_VALUE_EQ(serialized["$merge"]["let"],
- Value(BSON("new"
- << "$$ROOT")));
+ for (auto&& whenNotMatched : {"insert", "fail", "discard"}) {
+ auto spec =
+ BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched" << BSON_ARRAY(BSON("$project" << BSON("x" << 1)))
+ << "whenNotMatched" << whenNotMatched));
+ auto mergeStage = createMergeStage(spec);
+ auto serialized = mergeStage->serialize().getDocument();
+ ASSERT_VALUE_EQ(serialized["$merge"]["let"],
+ Value(BSON("new"
+ << "$$ROOT")));
+ }
}
+// Test the behaviour of 'let' serialization for each whenNotMatched mode.
TEST_F(DocumentSourceMergeTest, SerializeLetVariables) {
auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
<< "$$v1"
@@ -780,50 +783,70 @@ TEST_F(DocumentSourceMergeTest, SerializeLetVariables) {
<< "$$v2"
<< "z"
<< "$$v3")));
- auto spec = BSON("$merge" << BSON("into"
- << "target_collection"
- << "let"
- << BSON("v1" << 10 << "v2"
- << "foo"
- << "v3"
- << BSON("x" << 1 << "y"
- << BSON("z"
- << "bar")))
- << "whenMatched" << pipeline << "whenNotMatched"
- << "insert"));
- auto mergeStage = createMergeStage(spec);
- ASSERT(mergeStage);
- auto serialized = mergeStage->serialize().getDocument();
- ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v1"], Value(BSON("$const" << 10)));
- ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v2"],
- Value(BSON("$const"
- << "foo")));
- ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v3"],
- Value(BSON("x" << BSON("$const" << 1) << "y"
- << BSON("z" << BSON("$const"
- << "bar")))));
- ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline));
+
+ const auto createAndSerializeMergeStage = [this, &pipeline](StringData whenNotMatched) {
+ auto spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "let"
+ << BSON("v1" << 10 << "v2"
+ << "foo"
+ << "v3"
+ << BSON("x" << 1 << "y"
+ << BSON("z"
+ << "bar")))
+ << "whenMatched" << pipeline << "whenNotMatched"
+ << whenNotMatched));
+ auto mergeStage = createMergeStage(spec);
+ ASSERT(mergeStage);
+
+ return mergeStage->serialize().getDocument();
+ };
+
+ for (auto&& whenNotMatched : {"insert", "fail", "discard"}) {
+ const auto serialized = createAndSerializeMergeStage(whenNotMatched);
+
+ // For {whenNotMatched:insert}, we always attach the 'new' document even if the user has
+ // already specified a set of variables. This is because a {whenNotMatched: insert} merge
+ // generates an upsert, and if no documents in the target collection match the query we must
+ // insert the original document. For other 'whenNotMatched' modes, we do not serialize the
+ // new document, since neither 'fail' nor 'discard' can result in an upsert.
+ ASSERT_VALUE_EQ(serialized["$merge"]["let"]["new"],
+ (whenNotMatched == "insert"_sd ? Value("$$ROOT"_sd) : Value()));
+
+ // The user's variables should be serialized in all cases.
+ ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v1"], Value(BSON("$const" << 10)));
+ ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v2"],
+ Value(BSON("$const"
+ << "foo")));
+ ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v3"],
+ Value(BSON("x" << BSON("$const" << 1) << "y"
+ << BSON("z" << BSON("$const"
+ << "bar")))));
+ ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline));
+ }
}
TEST_F(DocumentSourceMergeTest, SerializeLetArrayVariable) {
- auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
- << "$$v1")));
- auto spec =
- BSON("$merge" << BSON("into"
- << "target_collection"
- << "let"
- << BSON("v1" << BSON_ARRAY(1 << "2" << BSON("x" << 1 << "y" << 2)))
- << "whenMatched" << pipeline << "whenNotMatched"
- << "insert"));
- auto mergeStage = createMergeStage(spec);
- ASSERT(mergeStage);
- auto serialized = mergeStage->serialize().getDocument();
- ASSERT_VALUE_EQ(serialized["$merge"]["let"]["v1"],
- Value(BSON_ARRAY(BSON("$const" << 1) << BSON("$const"
- << "2")
- << BSON("x" << BSON("$const" << 1) << "y"
- << BSON("$const" << 2)))));
- ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline));
+ for (auto&& whenNotMatched : {"insert", "fail", "discard"}) {
+ auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
+ << "$$v1")));
+ auto spec = BSON(
+ "$merge" << BSON("into"
+ << "target_collection"
+ << "let"
+ << BSON("v1" << BSON_ARRAY(1 << "2" << BSON("x" << 1 << "y" << 2)))
+ << "whenMatched" << pipeline << "whenNotMatched" << whenNotMatched));
+ auto mergeStage = createMergeStage(spec);
+ ASSERT(mergeStage);
+ auto serialized = mergeStage->serialize().getDocument();
+ ASSERT_VALUE_EQ(
+ serialized["$merge"]["let"]["v1"],
+ Value(BSON_ARRAY(BSON("$const" << 1)
+ << BSON("$const"
+ << "2")
+ << BSON("x" << BSON("$const" << 1) << "y" << BSON("$const" << 2)))));
+ ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline));
+ }
}
// This test verifies that when the 'let' argument is specified as 'null', the default 'new'
@@ -833,60 +856,66 @@ TEST_F(DocumentSourceMergeTest, SerializeLetArrayVariable) {
// this test ensures that we're aware of this limitation. Once the limitation is addressed in
// SERVER-41272, this test should be updated to accordingly.
TEST_F(DocumentSourceMergeTest, SerializeNullLetVariablesAsDefault) {
- auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
- << "1")));
- auto spec =
- BSON("$merge" << BSON("into"
- << "target_collection"
- << "let" << BSONNULL << "whenMatched" << pipeline << "whenNotMatched"
- << "insert"));
- auto mergeStage = createMergeStage(spec);
- ASSERT(mergeStage);
- auto serialized = mergeStage->serialize().getDocument();
- ASSERT_VALUE_EQ(serialized["$merge"]["let"],
- Value(BSON("new"
- << "$$ROOT")));
- ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline));
+ for (auto&& whenNotMatched : {"insert", "fail", "discard"}) {
+ auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
+ << "1")));
+ auto spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "let" << BSONNULL << "whenMatched" << pipeline
+ << "whenNotMatched" << whenNotMatched));
+ auto mergeStage = createMergeStage(spec);
+ ASSERT(mergeStage);
+ auto serialized = mergeStage->serialize().getDocument();
+ ASSERT_VALUE_EQ(serialized["$merge"]["let"],
+ Value(BSON("new"
+ << "$$ROOT")));
+ ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline));
+ }
}
TEST_F(DocumentSourceMergeTest, SerializeEmptyLetVariables) {
- auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
- << "1")));
- auto spec =
- BSON("$merge" << BSON("into"
- << "target_collection"
- << "let" << BSONObj() << "whenMatched" << pipeline << "whenNotMatched"
- << "insert"));
- auto mergeStage = createMergeStage(spec);
- ASSERT(mergeStage);
- auto serialized = mergeStage->serialize().getDocument();
- ASSERT_VALUE_EQ(serialized["$merge"]["let"], Value(BSONObj()));
- ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline));
+ for (auto&& whenNotMatched : {"insert", "fail", "discard"}) {
+ auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
+ << "1")));
+ auto spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "let" << BSONObj() << "whenMatched" << pipeline
+ << "whenNotMatched" << whenNotMatched));
+ auto mergeStage = createMergeStage(spec);
+ ASSERT(mergeStage);
+ auto serialized = mergeStage->serialize().getDocument();
+ ASSERT_VALUE_EQ(serialized["$merge"]["let"],
+ (whenNotMatched == "insert"_sd ? Value(BSON("new"
+ << "$$ROOT"))
+ : Value(BSONObj())));
+ ASSERT_VALUE_EQ(serialized["$merge"]["whenMatched"], Value(pipeline));
+ }
}
TEST_F(DocumentSourceMergeTest, OnlyObjectCanBeUsedAsLetVariables) {
- auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
- << "1")));
- auto spec = BSON("$merge" << BSON("into"
- << "target_collection"
- << "let" << 1 << "whenMatched" << pipeline << "whenNotMatched"
- << "insert"));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$merge" << BSON("into"
- << "target_collection"
- << "let"
- << "foo"
- << "whenMatched" << pipeline << "whenNotMatched"
- << "insert"));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$merge" << BSON("into"
- << "target_collection"
- << "let" << BSON_ARRAY(1 << "2") << "whenMatched" << pipeline
- << "whenNotMatched"
- << "insert"));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+ for (auto&& whenNotMatched : {"insert", "fail", "discard"}) {
+ auto pipeline = BSON_ARRAY(BSON("$project" << BSON("x"
+ << "1")));
+ auto spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "let" << 1 << "whenMatched" << pipeline
+ << "whenNotMatched" << whenNotMatched));
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "let"
+ << "foo"
+ << "whenMatched" << pipeline << "whenNotMatched"
+ << whenNotMatched));
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "let" << BSON_ARRAY(1 << "2") << "whenMatched" << pipeline
+ << "whenNotMatched" << whenNotMatched));
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+ }
}
} // namespace
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 9f65c12669f..781c0cc96c1 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -67,6 +67,10 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
} else {
variables.setDefaultRuntimeConstants(opCtx);
}
+
+ // Any request which did not originate from a mongoS, or which did originate from a mongoS but
+ // has the 'useNewUpsert' flag set, can use the new upsertSupplied mechanism for $merge.
+ useNewUpsert = request.getUseNewUpsert() || !request.isFromMongos();
}
ExpressionContext::ExpressionContext(OperationContext* opCtx,
@@ -164,6 +168,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(
expCtx->subPipelineDepth = subPipelineDepth;
expCtx->tempDir = tempDir;
+ expCtx->useNewUpsert = useNewUpsert;
expCtx->opCtx = opCtx;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 4ed86cf5efb..e55284315f0 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -247,6 +247,10 @@ public:
boost::optional<ServerGlobalParams::FeatureCompatibility::Version>
maxFeatureCompatibilityVersion;
+ // True if this context is associated with a pipeline which is permitted to use the new
+ // upsertSupplied mechanism for applicable $merge modes.
+ bool useNewUpsert = false;
+
protected:
static const int kInterruptCheckPeriod = 128;
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index be897699a6f..c3009543e0f 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -85,6 +85,12 @@ public:
std::tuple<BSONObj, write_ops::UpdateModification, boost::optional<BSONObj>>;
using BatchedObjects = std::vector<BatchObject>;
+ enum class UpsertType {
+ kNone, // This operation is not an upsert.
+ kGenerateNewDoc, // If no documents match, generate a new document using the update spec.
+ kInsertSuppliedDoc // If no documents match, insert the document supplied in 'c.new' as-is.
+ };
+
enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle };
enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps };
@@ -167,7 +173,7 @@ public:
const NamespaceString& ns,
BatchedObjects&& batch,
const WriteConcernOptions& wc,
- bool upsert,
+ UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) = 0;
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 0e5583a85a4..c63d8bd8982 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -113,7 +113,7 @@ public:
const NamespaceString& ns,
BatchedObjects&& batch,
const WriteConcernOptions& wc,
- bool upsert,
+ UpsertType upsert,
bool multi,
boost::optional<OID>) final {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index 18a857d305b..132fbc272c2 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -135,7 +135,7 @@ StatusWith<MongoProcessInterface::UpdateResult> MongoInterfaceShardServer::updat
const NamespaceString& ns,
BatchedObjects&& batch,
const WriteConcernOptions& wc,
- bool upsert,
+ UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) {
BatchedCommandResponse response;
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index 4dbc21b6ca2..8b4666c9b55 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -78,7 +78,7 @@ public:
const NamespaceString& ns,
BatchedObjects&& batch,
const WriteConcernOptions& wc,
- bool upsert,
+ UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) final;
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index fed8b7076e8..7f2cf3e96f7 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -181,7 +181,7 @@ Update MongoInterfaceStandalone::buildUpdateOp(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
BatchedObjects&& batch,
- bool upsert,
+ UpsertType upsert,
bool multi) {
Update updateOp(nss);
updateOp.setUpdates([&] {
@@ -193,7 +193,9 @@ Update MongoInterfaceStandalone::buildUpdateOp(
entry.setQ(std::move(q));
entry.setU(std::move(u));
entry.setC(std::move(c));
- entry.setUpsert(upsert);
+ entry.setUpsert(upsert != UpsertType::kNone);
+ entry.setUpsertSupplied({{entry.getUpsert() && expCtx->useNewUpsert,
+ upsert == UpsertType::kInsertSuppliedDoc}});
entry.setMulti(multi);
return entry;
}());
@@ -232,7 +234,7 @@ StatusWith<MongoProcessInterface::UpdateResult> MongoInterfaceStandalone::update
const NamespaceString& ns,
BatchedObjects&& batch,
const WriteConcernOptions& wc,
- bool upsert,
+ UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) {
auto writeResults =
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index e306fca6436..91d3fe5d380 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -72,7 +72,7 @@ public:
const NamespaceString& ns,
BatchedObjects&& batch,
const WriteConcernOptions& wc,
- bool upsert,
+ UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) override;
@@ -176,7 +176,7 @@ protected:
Update buildUpdateOp(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
BatchedObjects&& batch,
- bool upsert,
+ UpsertType upsert,
bool multi);
private:
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index e17dddc7e50..d9e10e6eecc 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -117,6 +117,10 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
Value(static_cast<long long>(*opCtx->getTxnNumber()));
}
+ // We set this flag to indicate that the shards should always use the new upsert mechanism when
+ // executing relevant $merge modes.
+ cmdForShards[AggregationRequest::kUseNewUpsert] = Value(true);
+
// agg creates temp collection and should handle implicit create separately.
return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), true);
}
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index e32048e0a80..6e7e17a9ff7 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -75,7 +75,7 @@ public:
const NamespaceString& ns,
BatchedObjects&& batch,
const WriteConcernOptions& wc,
- bool upsert,
+ UpsertType upsert,
bool multi,
boost::optional<OID>) final {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 1c1f4f3387a..32d63cd41f4 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/exec/sort_key_generator.h"
#include "mongo/db/exec/subplan.h"
#include "mongo/db/exec/update_stage.h"
+#include "mongo/db/exec/upsert_stage.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/index/wildcard_access_method.h"
#include "mongo/db/index_names.h"
@@ -1084,8 +1085,11 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorUpdate(
invariant(root);
updateStageParams.canonicalQuery = cq.get();
- root = stdx::make_unique<UpdateStage>(
- opCtx, updateStageParams, ws.get(), collection, root.release());
+ const bool isUpsert = updateStageParams.request->isUpsert();
+ root = (isUpsert ? std::make_unique<UpsertStage>(
+ opCtx, updateStageParams, ws.get(), collection, root.release())
+ : std::make_unique<UpdateStage>(
+ opCtx, updateStageParams, ws.get(), collection, root.release()));
if (!request->getProj().isEmpty()) {
invariant(request->shouldReturnAnyDocs());
diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp
index f303d1169ba..e1b492c04f5 100644
--- a/src/mongo/db/query/internal_plans.cpp
+++ b/src/mongo/db/query/internal_plans.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/exec/idhack.h"
#include "mongo/db/exec/index_scan.h"
#include "mongo/db/exec/update_stage.h"
+#include "mongo/db/exec/upsert_stage.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/stdx/memory.h"
@@ -166,9 +167,13 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::updateWith
invariant(collection);
auto ws = stdx::make_unique<WorkingSet>();
- auto idHackStage = stdx::make_unique<IDHackStage>(opCtx, key, ws.get(), descriptor);
- auto root =
- stdx::make_unique<UpdateStage>(opCtx, params, ws.get(), collection, idHackStage.release());
+ auto idHackStage = std::make_unique<IDHackStage>(opCtx, key, ws.get(), descriptor);
+
+ const bool isUpsert = params.request->isUpsert();
+ auto root = (isUpsert ? std::make_unique<UpsertStage>(
+ opCtx, params, ws.get(), collection, idHackStage.release())
+ : std::make_unique<UpdateStage>(
+ opCtx, params, ws.get(), collection, idHackStage.release()));
auto executor =
PlanExecutor::make(opCtx, std::move(ws), std::move(root), collection, yieldPolicy);
diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp
index c5780407243..13aed7d7550 100644
--- a/src/mongo/db/views/resolved_view.cpp
+++ b/src/mongo/db/views/resolved_view.cpp
@@ -112,6 +112,7 @@ AggregationRequest ResolvedView::asExpandedViewAggregation(
expandedRequest.setUnwrappedReadPref(request.getUnwrappedReadPref());
expandedRequest.setBypassDocumentValidation(request.shouldBypassDocumentValidation());
expandedRequest.setAllowDiskUse(request.shouldAllowDiskUse());
+ expandedRequest.setUseNewUpsert(request.getUseNewUpsert());
// Operations on a view must always use the default collation of the view. We must have already
// checked that if the user's request specifies a collation, it matches the collation of the
diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp
index 715aa75d2ad..ea02b2a2359 100644
--- a/src/mongo/dbtests/query_stage_update.cpp
+++ b/src/mongo/dbtests/query_stage_update.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/exec/eof.h"
#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/exec/update_stage.h"
+#include "mongo/db/exec/upsert_stage.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/json.h"
@@ -229,7 +230,7 @@ public:
auto eofStage = make_unique<EOFStage>(&_opCtx);
auto updateStage =
- make_unique<UpdateStage>(&_opCtx, params, ws.get(), collection, eofStage.release());
+ make_unique<UpsertStage>(&_opCtx, params, ws.get(), collection, eofStage.release());
runUpdate(updateStage.get());
}
diff --git a/src/mongo/idl/basic_types.h b/src/mongo/idl/basic_types.h
new file mode 100644
index 00000000000..8eb66c53629
--- /dev/null
+++ b/src/mongo/idl/basic_types.h
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2019 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/base/string_data.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+
+/**
+ * Wraps a boost::optional<bool> to provide consistent semantics. A standard boost::optional<bool>
+ * can introduce ambiguity because its 'operator bool' will resolve to 'true' if the optional is
+ * populated, even if is populated with boolean 'false'. By contrast, an instance of this class
+ * always resolves to the populated value, or false if not yet populated. This class will also
+ * serialize to BSON via the IDL only if the value has been explicitly set.
+ */
+class OptionalBool {
+public:
+ static OptionalBool parseFromBSON(BSONElement element) {
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "Field '" << element.fieldNameStringData()
+ << "' should be a boolean value, but found: " << element.type(),
+ !element || element.type() == BSONType::Bool);
+ return element ? OptionalBool{element.boolean()} : OptionalBool{};
+ }
+
+ OptionalBool(boost::optional<bool> value) : _value(std::move(value)) {}
+ OptionalBool(bool value) : _value(value) {}
+
+ OptionalBool() : OptionalBool(boost::none) {}
+
+ /**
+ * Returns true only if _value is populated with a value of true.
+ */
+ operator bool() const {
+ return _value.value_or(false);
+ }
+
+ /**
+ * Returns true if the value has been populated, false otherwise.
+ */
+ bool has_value() const {
+ return _value.has_value();
+ }
+
+ /**
+ * Serialize this object as a field in a document. If _value is empty, omit the field.
+ */
+ void serializeToBSON(StringData fieldName, BSONObjBuilder* builder) const {
+ if (_value) {
+ builder->appendBool(fieldName, *_value);
+ }
+ }
+
+ /**
+ * Serialize this object as an element of a BSON array. If _value is empty, omit the entry.
+ */
+ void serializeToBSON(BSONArrayBuilder* builder) const {
+ if (_value) {
+ builder->append(*_value);
+ }
+ }
+
+ /**
+ * Returns a string representation of the current value.
+ */
+ std::string toString() const {
+ return *this ? "1" : "0";
+ }
+ operator std::string() const {
+ return toString();
+ }
+
+private:
+ boost::optional<bool> _value;
+};
+
+} // namespace mongo
diff --git a/src/mongo/idl/basic_types.idl b/src/mongo/idl/basic_types.idl
index 7005fdc4bc3..31c665dd575 100644
--- a/src/mongo/idl/basic_types.idl
+++ b/src/mongo/idl/basic_types.idl
@@ -31,6 +31,7 @@ global:
cpp_namespace: "mongo"
cpp_includes:
- "mongo/db/namespace_string.h"
+ - "mongo/idl/basic_types.h"
- "mongo/util/uuid.h"
types:
@@ -92,10 +93,21 @@ types:
cpp_type: bool
deserializer: "mongo::BSONElement::trueValue"
+ optionalBool:
+ bson_serialization_type: any
+ description: "An optional bool type that does not serialize unless explicitly set. Can be
+ used in place of boost::optional<bool> to provide more intuitive semantics,
+ since the standard optional will coerce to true if populated regardless of
+ its internal value."
+ cpp_type: "mongo::OptionalBool"
+ default: "mongo::OptionalBool()"
+ deserializer: "mongo::OptionalBool::parseFromBSON"
+ serializer: "mongo::OptionalBool::serializeToBSON"
+
bindata_generic:
bson_serialization_type: bindata
bindata_subtype: generic
- description: "A BSON bindata of "
+ description: "A BSON bindata of generic sub type"
cpp_type: "std::vector<std::uint8_t>"
deserializer: "mongo::BSONElement::_binDataVector"