summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Korshunov <anton.korshunov@mongodb.com>2019-05-06 20:37:22 +0100
committerAnton Korshunov <anton.korshunov@mongodb.com>2019-05-13 21:40:17 +0100
commitdde091c07989ffaefc57705859abf6517beeeace (patch)
treec5639b56f03fa24f27aeb9e3422617907be6dc77
parente4b13ae68a4eef9393357038f09f14bfd8102050 (diff)
downloadmongo-dde091c07989ffaefc57705859abf6517beeeace.tar.gz
SERVER-40431 Add merge support for whenMatched: pipeline
-rw-r--r--jstests/aggregation/sources/merge/mode_pipeline_insert.js397
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp3
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp48
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h21
-rw-r--r--src/mongo/db/pipeline/document_source_merge.idl28
-rw-r--r--src/mongo/db/pipeline/document_source_merge_modes.idl58
-rw-r--r--src/mongo/db/pipeline/document_source_merge_spec.cpp55
-rw-r--r--src/mongo/db/pipeline/document_source_merge_spec.h36
-rw-r--r--src/mongo/db/pipeline/document_source_merge_test.cpp40
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp2
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h3
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h2
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp5
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h2
19 files changed, 643 insertions, 68 deletions
diff --git a/jstests/aggregation/sources/merge/mode_pipeline_insert.js b/jstests/aggregation/sources/merge/mode_pipeline_insert.js
new file mode 100644
index 00000000000..ee8c63d19ce
--- /dev/null
+++ b/jstests/aggregation/sources/merge/mode_pipeline_insert.js
@@ -0,0 +1,397 @@
+// Tests the behaviour of the $merge stage with whenMatched=[<pipeline>] and whenNotMatched=insert.
+//
+// Cannot implicitly shard accessed collections because a collection can be implictly created and
+// exists when none is expected.
+// @tags: [assumes_no_implicit_collection_creation_after_drop]
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertArrayEq.
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos.
+
+ // Asserts that two arrays are equal - that is, if their sizes are equal and each element in
+ // the 'actual' array has a matching element in the 'expected' array, without honoring elements
+ // order.
+ function assertArrayEq({actual = [], expected = []} = {}) {
+ assert(arrayEq(actual, expected), `actual=${tojson(actual)}, expected=${tojson(expected)}`);
+ }
+
+ // A helper function to create a pipeline with a $merge stage using a custom 'updatePipeline'
+ // for the whenMatched mode. If 'initialStages' array is specified, the $merge stage will be
+ // appened to this array and the result returned to the caller, otherwise an array with a
+ // single $merge stage is returned. An output collection for the $merge stage is specified
+ // in the 'target', and the $merge stage 'on' fields in the 'on' parameter.
+ function makeMergePipeline(
+ {target = "", initialStages = [], updatePipeline = [], on = "_id"} = {}) {
+ return initialStages.concat([{
+ $merge:
+ {into: target, on: on, whenMatched: updatePipeline, whenNotMatched: "insert"}
+ }]);
+ }
+
+ const source = db[`${jsTest.name()}_source`];
+ source.drop();
+ const target = db[`${jsTest.name()}_target`];
+ target.drop();
+
+ (function testMergeIntoNonExistentCollection() {
+ assert.commandWorked(source.insert({_id: 1, a: 1, b: "a"}));
+ assert.doesNotThrow(
+ () => source.aggregate(makeMergePipeline(
+ {target: target.getName(), updatePipeline: [{$addFields: {x: 1}}]})));
+ assertArrayEq({
+ actual: target.find().toArray(),
+ expected: [
+ {_id: 1, x: 1},
+ ]
+ });
+ })();
+
+ // Test $merge inserts a document into an existing target collection if no matching document
+ // is found.
+ (function testMergeInsertsDocumentIfMatchNotFound() {
+ assert.commandWorked(target.deleteMany({}));
+ assert.doesNotThrow(
+ () => source.aggregate(makeMergePipeline(
+ {target: target.getName(), updatePipeline: [{$addFields: {x: 1, y: 2}}]})));
+ assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 2}]});
+ })();
+
+ // Test $merge updates an existing document in the target collection by applying a
+ // pipeline-style update.
+ (function testMergeUpdatesDocumentIfMatchFound() {
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
+ target: target.getName(),
+ updatePipeline: [{$project: {x: {$add: ["$x", 1]}, y: {$add: ["$y", 2]}}}]
+ })));
+ assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2, y: 4}]});
+ })();
+
+ // Test $merge with various pipeline stages which are currently supported by the pipeline-style
+ // update.
+ (function testMergeWithSupportedUpdatePipelineStages() {
+ assert(source.drop());
+ assert(target.drop());
+
+ assert.commandWorked(source.insert([{_id: 1, a: 1}, {_id: 2, a: 2}]));
+ assert.commandWorked(target.insert({_id: 1, b: 1}));
+
+ // Test $addFields stage.
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
+ target: target.getName(),
+ updatePipeline: [{$addFields: {x: {$add: ["$b", 1]}}}]
+ })));
+ assertArrayEq(
+ {actual: target.find().toArray(), expected: [{_id: 1, b: 1, x: 2}, {_id: 2, x: null}]});
+
+ // Test $project stage.
+ assert(target.drop());
+ assert.commandWorked(target.insert({_id: 1, b: 1}));
+ assert.doesNotThrow(
+ () => source.aggregate(makeMergePipeline(
+ {target: target.getName(), updatePipeline: [{$project: {x: {$add: ["$b", 1]}}}]})));
+ assertArrayEq(
+ {actual: target.find().toArray(), expected: [{_id: 1, x: 2}, {_id: 2, x: null}]});
+
+ // Test $replaceRoot stage.
+ assert(target.drop());
+ assert.commandWorked(
+ target.insert([{_id: 1, b: 1, c: {x: {y: 1}}}, {_id: 2, b: 2, c: {x: {y: 2}}}]));
+ assert.doesNotThrow(
+ () => source.aggregate(makeMergePipeline(
+ {target: target.getName(), updatePipeline: [{$replaceRoot: {newRoot: "$c"}}]})));
+ assertArrayEq({
+ actual: target.find().toArray(),
+ expected: [{_id: 1, x: {y: 1}}, {_id: 2, x: {y: 2}}]
+ });
+ })();
+
+ // Test $merge inserts a new document into the target collection if not matching document is
+ // found by applying a pipeline-style update with upsert=true semantics.
+ (function testMergeInsertDocumentIfMatchNotFound() {
+ assert(source.drop());
+ assert(target.drop());
+ assert.commandWorked(source.insert({_id: 1, a: 1}));
+ assert.commandWorked(target.insert({_id: 2, a: 2}));
+ assert.doesNotThrow(
+ () => source.aggregate(makeMergePipeline(
+ {target: target.getName(), updatePipeline: [{$addFields: {x: 1}}]})));
+ assertArrayEq(
+ {actual: target.find().toArray(), expected: [{_id: 1, x: 1}, {_id: 2, a: 2}]});
+ })();
+
+ // Test $merge doesn't modify the target collection if a document has been removed from the
+ // source collection.
+ (function testMergeDoesNotUpdateDeletedDocument() {
+ assert.commandWorked(source.deleteOne({_id: 1}));
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
+ target: target.getName(),
+ updatePipeline: [{$project: {x: {$add: ["$x", 1]}, a: 1}}]
+ })));
+ assertArrayEq({
+ actual: target.find().toArray(),
+ expected: [
+ {_id: 1, x: 1},
+ {_id: 2, a: 2},
+ ]
+ });
+ })();
+
+ // Test $merge fails if a unique index constraint in the target collection is violated.
+ (function testMergeFailsIfTargetUniqueKeyIsViolated() {
+ if (FixtureHelpers.isSharded(source)) {
+ // Skip this test if the collection sharded, because an implicitly created sharded
+ // key of {_id: 1} will not be covered by a unique index created in this test, which
+ // is not allowed.
+ return;
+ }
+
+ assert(target.drop());
+ assert.commandWorked(source.insert({_id: 4, a: 2}));
+ assert.commandWorked(target.insert([{_id: 1, x: 1}, {_id: 2, a: 2}]));
+ assert.commandWorked(target.createIndex({a: 1}, {unique: true}));
+ const error = assert.throws(
+ () => source.aggregate(makeMergePipeline(
+ {target: target.getName(), updatePipeline: [{$project: {x: 1, a: 1}}]})));
+ assert.commandFailedWithCode(error, ErrorCodes.DuplicateKey);
+ assertArrayEq({
+ actual: target.find().toArray(),
+ expected: [
+ {_id: 1, x: 1},
+ {_id: 2, a: 2},
+ ]
+ });
+ assert.commandWorked(target.dropIndex({a: 1}));
+ })();
+
+ // Test $merge fails if it cannot find an index to verify that the 'on' fields will be unique.
+ (function testMergeFailsIfOnFieldCannotBeVerifiedForUniquness() {
+ // The 'on' fields contains a single document field.
+ let error = assert.throws(() => source.aggregate(makeMergePipeline({
+ target: target.getName(),
+ on: "nonexistent",
+ updatePipeline: [{$project: {x: 1, a: 1}}]
+ })));
+ assert.commandFailedWithCode(error, [51190, 51183]);
+
+ // The 'on' fields contains multiple document fields.
+ error = assert.throws(() => source.aggregate(makeMergePipeline({
+ target: target.getName(),
+ on: ["nonexistent1", "nonexistent2"],
+ updatePipeline: [{$project: {x: 1, a: 1}}]
+ })));
+ assert.commandFailedWithCode(error, [51190, 51183]);
+ })();
+
+ // Test $merge with an explicit 'on' field over a single or multiple document fields which
+ // differ from the _id field.
+ (function testMergeWithOnFields() {
+ if (FixtureHelpers.isSharded(source)) {
+ // Skip this test if the collection sharded, because an implicitly created sharded
+ // key of {_id: 1} will not be covered by a unique index created in this test, which
+ // is not allowed.
+ return;
+ }
+
+ // The 'on' fields contains a single document field.
+ assert(source.drop());
+ assert(target.drop());
+ assert.commandWorked(source.createIndex({a: 1}, {unique: true}));
+ assert.commandWorked(target.createIndex({a: 1}, {unique: true}));
+ assert.commandWorked(source.insert([{_id: 1, a: 1}, {_id: 2, a: 2}, {_id: 3, a: 30}]));
+ assert.commandWorked(
+ target.insert([{_id: 1, a: 1, b: 1}, {_id: 4, a: 30, b: 2}, {_id: 5, a: 40, b: 3}]));
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
+ initialStages: [{$project: {_id: 0}}],
+ target: target.getName(),
+ on: "a",
+ updatePipeline: [{$addFields: {z: 1}}]
+ })));
+ assertArrayEq({
+ actual: target.find({}, {_id: 0}).toArray(),
+ expected: [{a: 1, b: 1, z: 1}, {a: 2, z: 1}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}]
+ });
+
+ // The 'on' fields contains multiple document fields.
+ assert(source.drop());
+ assert(target.drop());
+ assert.commandWorked(source.createIndex({a: 1, b: 1}, {unique: true}));
+ assert.commandWorked(target.createIndex({a: 1, b: 1}, {unique: true}));
+ assert.commandWorked(
+ source.insert([{_id: 1, a: 1, b: 1}, {_id: 2, a: 2, b: 4}, {_id: 3, a: 30, b: 2}]));
+ assert.commandWorked(
+ target.insert([{_id: 1, a: 1, b: 1}, {_id: 4, a: 30, b: 2}, {_id: 5, a: 40, b: 3}]));
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
+ initialStages: [{$project: {_id: 0}}],
+ target: target.getName(),
+ on: ["a", "b"],
+ updatePipeline: [{$addFields: {z: 1}}]
+ })));
+ assertArrayEq({
+ actual: target.find({}, {_id: 0}).toArray(),
+ expected:
+ [{a: 1, b: 1, z: 1}, {a: 2, b: 4, z: 1}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}]
+ });
+ assert.commandWorked(source.dropIndex({a: 1, b: 1}));
+ assert.commandWorked(target.dropIndex({a: 1, b: 1}));
+ })();
+
+ // Test $merge with a dotted path in the 'on' field.
+ (function testMergeWithDottedOnField() {
+ if (FixtureHelpers.isSharded(source)) {
+ // Skip this test if the collection sharded, because an implicitly created sharded
+ // key of {_id: 1} will not be covered by a unique index created in this test, which
+ // is not allowed.
+ return;
+ }
+
+ assert(source.drop());
+ assert(target.drop());
+ assert.commandWorked(source.createIndex({"a.b": 1}, {unique: true}));
+ assert.commandWorked(target.createIndex({"a.b": 1}, {unique: true}));
+ assert.commandWorked(source.insert([
+ {_id: 1, a: {b: "b"}, c: "x"},
+ {_id: 2, a: {b: "c"}, c: "y"},
+ {_id: 3, a: {b: 30}, b: "c"}
+ ]));
+ assert.commandWorked(target.insert({_id: 2, a: {b: "c"}, c: "y"}));
+ assert.doesNotThrow(() => source.aggregate(makeMergePipeline({
+ initialStages: [{$project: {_id: 0}}],
+ target: target.getName(),
+ on: "a.b",
+ updatePipeline: [{$addFields: {z: 1}}]
+ })));
+ assertArrayEq({
+ actual: target.find().toArray(),
+ expected: [
+ {_id: 1, a: {b: "b"}, z: 1},
+ {_id: 2, a: {b: "c"}, c: "y", z: 1},
+ {_id: 3, a: {b: 30}, z: 1}
+ ]
+ });
+ })();
+
+ // Test $merge fails if the value of the 'on' field in a document is invalid, e.g. missing,
+ // null or an array.
+ (function testMergeFailsIfOnFieldIsInvalid() {
+ if (FixtureHelpers.isSharded(source)) {
+ // Skip this test if the collection sharded, because an implicitly created sharded
+ // key of {_id: 1} will not be covered by a unique index created in this test, which
+ // is not allowed.
+ return;
+ }
+
+ assert(source.drop());
+ assert(target.drop());
+ assert.commandWorked(source.createIndex({"z": 1}, {unique: true}));
+ assert.commandWorked(target.createIndex({"z": 1}, {unique: true}));
+
+ const pipeline = makeMergePipeline({
+ initialStages: [{$project: {_id: 0}}],
+ target: target.getName(),
+ on: "z",
+ updatePipeline: [{$addFields: {z: 1}}]
+ });
+
+ // The 'on' field is missing.
+ assert.commandWorked(source.insert({_id: 1}));
+ let error = assert.throws(() => source.aggregate(pipeline));
+ assert.commandFailedWithCode(error, 51132);
+
+ // The 'on' field is null.
+ assert.commandWorked(source.update({_id: 1}, {z: null}));
+ error = assert.throws(() => source.aggregate(pipeline));
+ assert.commandFailedWithCode(error, 51132);
+
+ // The 'on' field is an array.
+ assert.commandWorked(source.update({_id: 1}, {z: [1, 2]}));
+ error = assert.throws(() => source.aggregate(pipeline));
+ assert.commandFailedWithCode(error, 51185);
+ })();
+
+ // Test $merge when the _id field is removed from the aggregate projection but is used in the
+ // $merge's 'on' field. When the _id is missing, the $merge stage will create a new ObjectId in
+ // its place before performing the insert or update.
+ (function testMergeWhenDocIdIsRemovedFromProjection() {
+ const pipeline = makeMergePipeline({
+ initialStages: [{$project: {_id: 0}}],
+ target: target.getName(),
+ updatePipeline: [{$addFields: {z: 1}}]
+ });
+
+ // The _id is a single 'on' field (a default one).
+ assert(source.drop());
+ assert(target.drop());
+ assert.commandWorked(source.insert([{_id: 1, a: 1, b: "a"}, {_id: 2, a: 2, b: "b"}]));
+ assert.commandWorked(target.insert({_id: 1, b: "c"}));
+ assert.doesNotThrow(() => source.aggregate(pipeline));
+ assertArrayEq({
+ actual: target.find({}, {_id: 0}).toArray(),
+ // There is a matching document in the target with {_id: 1}, but since we cannot match
+ // it (no _id in projection), we just insert two new documents from the source
+ // collection by applying a pipeline-style update.
+ expected: [{b: "c"}, {z: 1}, {z: 1}]
+ });
+
+ // The _id is part of the compound 'on' field.
+ assert(target.drop());
+ assert.commandWorked(target.insert({_id: 1, b: "c"}));
+ assert.commandWorked(source.createIndex({_id: 1, a: -1}));
+ assert.commandWorked(target.createIndex({_id: 1, a: -1}));
+ assert.doesNotThrow(() => source.aggregate(pipeline));
+ assertArrayEq(
+ {actual: target.find({}, {_id: 0}).toArray(), expected: [{b: "c"}, {z: 1}, {z: 1}]});
+ assert.commandWorked(source.dropIndex({_id: 1, a: -1}));
+ assert.commandWorked(target.dropIndex({_id: 1, a: -1}));
+ })();
+
+ // Test $merge preserves indexes and options of the existing target collection.
+ (function testMergePresrvesIndexesAndOptions() {
+ const validator = {z: {$gt: 0}};
+ assert(target.drop());
+ assert.commandWorked(db.createCollection(target.getName(), {validator: validator}));
+ assert.commandWorked(target.createIndex({a: 1}));
+ assert.doesNotThrow(
+ () => source.aggregate(makeMergePipeline(
+ {target: target.getName(), updatePipeline: [{$addFields: {z: 1}}]})));
+ assertArrayEq(
+ {actual: target.find().toArray(), expected: [{_id: 1, z: 1}, {_id: 2, z: 1}]});
+ assert.eq(2, target.getIndexes().length);
+
+ const listColl = db.runCommand({listCollections: 1, filter: {name: target.getName()}});
+ assert.commandWorked(listColl);
+ assert.eq(validator, listColl.cursor.firstBatch[0].options["validator"]);
+ })();
+
+ // Test $merge implicitly creates a new database when the target collection's database doesn't
+ // exist.
+ (function testMergeImplicitlyCreatesTargetDatabase() {
+ assert(source.drop());
+ assert.commandWorked(source.insert({_id: 1, a: 1, b: "a"}));
+
+ const foreignDb = db.getSiblingDB(`${jsTest.name()}_foreign_db`);
+ assert.commandWorked(foreignDb.dropDatabase());
+ const foreignTarget = foreignDb[`${jsTest.name()}_target`];
+ const foreignPipeline = makeMergePipeline({
+ target: {db: foreignDb.getName(), coll: foreignTarget.getName()},
+ updatePipeline: [{$addFields: {z: 1}}]
+ });
+
+ if (!FixtureHelpers.isMongos(db)) {
+ assert.doesNotThrow(() => source.aggregate(foreignPipeline));
+ assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, z: 1}]});
+ } else {
+ // Implicit database creation is prohibited in a cluster.
+ const error = assert.throws(() => source.aggregate(foreignPipeline));
+ assert.commandFailedWithCode(error, ErrorCodes.NamespaceNotFound);
+
+ // Force a creation of the database and collection, then fall through the test
+ // below.
+ assert.commandWorked(foreignTarget.insert({_id: 1}));
+ }
+
+ assert.doesNotThrow(() => source.aggregate(foreignPipeline));
+ assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, z: 1}]});
+ assert.commandWorked(foreignDb.dropDatabase());
+ })();
+}());
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index c15aafce0d0..f6ff83cdad7 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -237,6 +237,9 @@ write_ops::UpdateModification::UpdateModification(const BSONObj& update) {
_type = Type::kClassic;
}
+write_ops::UpdateModification::UpdateModification(std::vector<BSONObj> pipeline)
+ : _type{Type::kPipeline}, _pipeline{std::move(pipeline)} {}
+
write_ops::UpdateModification write_ops::UpdateModification::parseFromBSON(BSONElement elem) {
return UpdateModification(elem);
}
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index f116b79b63e..2058ca4a055 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -63,7 +63,7 @@ public:
UpdateModification() = default;
UpdateModification(BSONElement update);
-
+ UpdateModification(std::vector<BSONObj> pipeline);
// This constructor exists only to provide a fast-path for constructing classic-style updates.
UpdateModification(const BSONObj& update);
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 6a0ccb087bb..393f9742a33 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -599,6 +599,7 @@ env.Library(
env.Idlc('document_source_change_stream.idl')[0],
env.Idlc('document_source_list_sessions.idl')[0],
env.Idlc('document_source_merge.idl')[0],
+ env.Idlc('document_source_merge_modes.idl')[0],
env.Idlc('document_source_out.idl')[0],
env.Idlc('exchange_spec.idl')[0],
env.Idlc('runtime_constants.idl')[0],
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 3177affe8b1..e02989eddf6 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -68,6 +68,7 @@ constexpr auto kMergeInsertMode = MergeMode{WhenMatched::kMerge, WhenNotMatched:
constexpr auto kKeepExistingInsertMode =
MergeMode{WhenMatched::kKeepExisting, WhenNotMatched::kInsert};
constexpr auto kFailInsertMode = MergeMode{WhenMatched::kFail, WhenNotMatched::kInsert};
+constexpr auto kPipelineInsertMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kInsert};
/**
* Creates a merge strategy which uses update semantics to do perform a merge operation. If
@@ -84,7 +85,7 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) {
expCtx->mongoProcessInterface->update(expCtx,
ns,
std::move(batch.uniqueKeys),
- std::move(batch.objects),
+ std::move(batch.modifications),
wc,
upsert,
multi,
@@ -97,7 +98,14 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) {
*/
MergeStrategy makeInsertStrategy() {
return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
- expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(batch.objects), wc, epoch);
+ std::vector<BSONObj> objectsToInsert(batch.size());
+ // The batch stores replacement style updates, but for this "insert" style of $merge we'd
+ // like to just insert the new document without attempting any sort of replacement.
+ std::transform(batch.modifications.begin(),
+ batch.modifications.end(),
+ objectsToInsert.begin(),
+ [](const auto& mod) { return mod.getUpdateClassic(); });
+ expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(objectsToInsert), wc, epoch);
};
}
@@ -107,10 +115,11 @@ MergeStrategy makeInsertStrategy() {
*/
BatchTransform makeUpdateTransform(const std::string& updateOp) {
return [updateOp](auto& batch) {
- std::transform(batch.objects.begin(),
- batch.objects.end(),
- batch.objects.begin(),
- [updateOp](const auto& obj) { return BSON(updateOp << obj); });
+ std::transform(
+ batch.modifications.begin(),
+ batch.modifications.end(),
+ batch.modifications.begin(),
+ [updateOp](const auto& mod) { return BSON(updateOp << mod.getUpdateClassic()); });
};
}
@@ -142,6 +151,10 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
{kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(true, makeUpdateTransform("$setOnInsert"))}},
+ {kPipelineInsertMode,
+ {kPipelineInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(true, {})}},
{kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}};
return mergeStrategyDescriptors;
}
@@ -331,7 +344,8 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed
auto mergeSpec =
parseMergeSpecAndResolveTargetNamespace(spec, request.getNamespaceString().db());
auto targetNss = mergeSpec.getTargetNss();
- auto whenMatched = mergeSpec.getWhenMatched().value_or(kDefaultWhenMatched);
+ auto whenMatched =
+ mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched;
auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched);
uassert(51181,
"Combination of {} modes 'whenMatched: {}' and 'whenNotMatched: {}' "
@@ -354,6 +368,7 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed
DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MergeStrategyDescriptor& descriptor,
+ boost::optional<std::vector<BSONObj>>&& pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion,
bool serializeAsOutStage)
@@ -363,6 +378,7 @@ DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
_targetCollectionVersion(targetCollectionVersion),
_done(false),
_descriptor(descriptor),
+ _pipeline(std::move(pipeline)),
_mergeOnFields(std::move(mergeOnFields)),
_mergeOnFieldsIncludesId(_mergeOnFields.count("_id") == 1),
_serializeAsOutStage(serializeAsOutStage) {}
@@ -372,6 +388,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WhenMatched whenMatched,
WhenNotMatched whenNotMatched,
+ boost::optional<std::vector<BSONObj>>&& pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion,
bool serializeAsOutStage) {
@@ -404,6 +421,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
return new DocumentSourceMerge(outputNs,
expCtx,
getDescriptors().at({whenMatched, whenNotMatched}),
+ std::move(pipeline),
mergeOnFields,
targetCollectionVersion,
serializeAsOutStage);
@@ -417,8 +435,10 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson(
auto mergeSpec = parseMergeSpecAndResolveTargetNamespace(spec, expCtx->ns.db());
auto targetNss = mergeSpec.getTargetNss();
- auto whenMatched = mergeSpec.getWhenMatched().value_or(kDefaultWhenMatched);
+ auto whenMatched =
+ mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched;
auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched);
+ auto pipeline = mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->pipeline : boost::none;
// TODO SERVER-40432: move resolveMergeOnFieldsOnMongo* into MongoProcessInterface.
auto[mergeOnFields, targetCollectionVersion] = expCtx->inMongos
? resolveMergeOnFieldsOnMongoS(expCtx, mergeSpec, targetNss)
@@ -428,6 +448,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson(
expCtx,
whenMatched,
whenNotMatched,
+ std::move(pipeline),
std::move(mergeOnFields),
targetCollectionVersion,
false /* serialize as $out stage */);
@@ -477,16 +498,17 @@ DocumentSource::GetNextResult DocumentSourceMerge::getNext() {
// Extract the 'on' fields before converting the document to BSON.
auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields);
- auto insertObj = doc.toBson();
+ auto mod = _pipeline ? write_ops::UpdateModification(*_pipeline)
+ : write_ops::UpdateModification(doc.toBson());
- bufferedBytes += insertObj.objsize();
+ bufferedBytes += mod.objsize();
if (!batch.empty() &&
(bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) {
spill(std::move(batch));
batch.clear();
- bufferedBytes = insertObj.objsize();
+ bufferedBytes = mod.objsize();
}
- batch.emplace(std::move(insertObj), std::move(mergeOnFields));
+ batch.emplace(std::move(mod), std::move(mergeOnFields));
}
if (!batch.empty()) {
spill(std::move(batch));
@@ -537,7 +559,7 @@ Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity>
} else {
DocumentSourceMergeSpec spec;
spec.setTargetNss(_outputNs);
- spec.setWhenMatched(_descriptor.mode.first);
+ spec.setWhenMatched(MergeWhenMatchedPolicy{_descriptor.mode.first, _pipeline});
spec.setWhenNotMatched(_descriptor.mode.second);
spec.setOn([&]() {
std::vector<std::string> mergeOnFields;
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index 9360cd978d6..037d2f54962 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -50,25 +50,29 @@ public:
* portion of the update or insert.
*/
struct BatchedObjects {
- void emplace(BSONObj&& obj, BSONObj&& key) {
- objects.emplace_back(std::move(obj));
+ void emplace(write_ops::UpdateModification&& mod, BSONObj&& key) {
+ modifications.emplace_back(std::move(mod));
uniqueKeys.emplace_back(std::move(key));
}
bool empty() const {
- return objects.empty();
+ return modifications.empty();
}
size_t size() const {
- return objects.size();
+ return modifications.size();
}
void clear() {
- objects.clear();
+ modifications.clear();
uniqueKeys.clear();
}
- std::vector<BSONObj> objects;
+ // For each element in the batch we store an UpdateModification which is either the new
+ // document we want to upsert or insert into the collection (i.e. a 'classic' replacement
+ // update), or the pipeline to run to compute the new document.
+ std::vector<write_ops::UpdateModification> modifications;
+
// Store the unique keys as BSON objects instead of Documents for compatibility with the
// batch update command. (e.g. {q: <array of uniqueKeys>, u: <array of objects>})
std::vector<BSONObj> uniqueKeys;
@@ -121,6 +125,7 @@ public:
DocumentSourceMerge(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MergeStrategyDescriptor& descriptor,
+ boost::optional<std::vector<BSONObj>>&& pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion,
bool serializeAsOutStage);
@@ -199,6 +204,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
MergeStrategyDescriptor::WhenMatched whenMatched,
MergeStrategyDescriptor::WhenNotMatched whenNotMatched,
+ boost::optional<std::vector<BSONObj>>&& pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion,
bool serializeAsOutStage);
@@ -250,6 +256,9 @@ private:
// descriptor.
const MergeStrategyDescriptor& _descriptor;
+ // A custom pipeline to compute a new version of merging documents.
+ boost::optional<std::vector<BSONObj>> _pipeline;
+
// Holds the fields used for uniquely identifying documents. There must exist a unique index
// with this key pattern. Default is "_id" for unsharded collections, and "_id" plus the shard
// key for sharded collections.
diff --git a/src/mongo/db/pipeline/document_source_merge.idl b/src/mongo/db/pipeline/document_source_merge.idl
index b2c70a19ea0..fabda43b017 100644
--- a/src/mongo/db/pipeline/document_source_merge.idl
+++ b/src/mongo/db/pipeline/document_source_merge.idl
@@ -35,25 +35,10 @@ global:
- "mongo/db/pipeline/document_source_merge_spec.h"
imports:
+ - "mongo/db/pipeline/document_source_merge_modes.idl"
- "mongo/idl/basic_types.idl"
- "mongo/s/chunk_version.idl"
-enums:
- MergeWhenMatchedMode:
- description: "Possible merge mode values for 'whenMatched'."
- type: string
- values:
- kFail: "fail"
- kMerge: "merge"
- kKeepExisting: "keepExisting"
- kReplaceWithNew: "replaceWithNew"
-
- MergeWhenNotMatchedMode:
- description: "Possible merge mode values for 'whenNotMatched'."
- type: string
- values:
- kInsert: "insert"
-
types:
MergeTargetNss:
bson_serialization_type: any
@@ -70,6 +55,15 @@ types:
serializer: "::mongo::mergeOnFieldsSerializeToBSON"
deserializer: "::mongo::mergeOnFieldsParseFromBSON"
+ MergeWhenMatchedPolicy:
+ bson_serialization_type: any
+ description: Defines a policy strategy describing what to do when there is a matching
+ document in the target collection. Can hold a value from the
+ MergeWhenMatchedMode enum, or a custom pipeline definition.
+ cpp_type: "::mongo::MergeWhenMatchedPolicy"
+ serializer: "::mongo::mergeWhenMatchedSerializeToBSON"
+ deserializer: "::mongo::mergeWhenMatchedParseFromBSON"
+
structs:
NamespaceSpec:
description: A document used to specify a namespace.
@@ -99,7 +93,7 @@ structs:
description: A single field or array of fields that uniquely identify a document.
whenMatched:
- type: MergeWhenMatchedMode
+ type: MergeWhenMatchedPolicy
optional: true
description: The merge mode for the merge operation when source and target elements
match.
diff --git a/src/mongo/db/pipeline/document_source_merge_modes.idl b/src/mongo/db/pipeline/document_source_merge_modes.idl
new file mode 100644
index 00000000000..300e6314622
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_merge_modes.idl
@@ -0,0 +1,58 @@
+# Copyright (C) 2019-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# Merge modes for the document source merge stage.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+enums:
+ MergeWhenMatchedMode:
+ description: "Possible merge mode values for $merge's 'whenMatched' field."
+ type: string
+ values:
+ kFail: "fail"
+ kKeepExisting: "keepExisting"
+ kMerge: "merge"
+ # Technically, we don't need this item, as the 'pipeline' value, as a string, is not
+ # supported by the 'whenMatched' field. Instead, a pipeline definition, as an array of
+ # objects, should be used. However, to avoid special casing logic in
+ # DocumentSourceMerge, and to keep all merge strategy definitions in a single
+ # descriptors map, which keys are pairs of whenMatched/whenNotMatched values, this
+ # 'kPipeline' element is added for internal use only.
+ kPipeline: "pipeline"
+ kReplaceWithNew: "replaceWithNew"
+
+ MergeWhenNotMatchedMode:
+ description: "Possible merge mode values for $merge's 'whenNotMatched'. field"
+ type: string
+ values:
+ kInsert: "insert"
diff --git a/src/mongo/db/pipeline/document_source_merge_spec.cpp b/src/mongo/db/pipeline/document_source_merge_spec.cpp
index cd3eaa77324..4a08f84cfa1 100644
--- a/src/mongo/db/pipeline/document_source_merge_spec.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_spec.cpp
@@ -34,6 +34,7 @@
#include <fmt/format.h>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document_source_merge.h"
#include "mongo/db/pipeline/document_source_merge_gen.h"
@@ -44,14 +45,14 @@ NamespaceString mergeTargetNssParseFromBSON(const BSONElement& elem) {
uassert(51178,
"{} 'into' field must be either a string or an object, "
"but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())),
- elem.type() == String || elem.type() == Object);
+ elem.type() == BSONType::String || elem.type() == BSONType::Object);
- if (elem.type() == String) {
+ if (elem.type() == BSONType::String) {
return {"", elem.valueStringData()};
- } else {
- auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject());
- return {spec.getDb().value_or(""), spec.getColl().value_or("")};
}
+
+ auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject());
+ return {spec.getDb().value_or(""), spec.getColl().value_or("")};
}
void mergeTargetNssSerializeToBSON(const NamespaceString& targetNss,
@@ -66,18 +67,20 @@ std::vector<std::string> mergeOnFieldsParseFromBSON(const BSONElement& elem) {
uassert(51186,
"{} 'into' field must be either a string or an array of strings, "
"but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())),
- elem.type() == String || elem.type() == Array);
+ elem.type() == BSONType::String || elem.type() == BSONType::Array);
- if (elem.type() == String) {
+ if (elem.type() == BSONType::String) {
fields.push_back(elem.str());
} else {
+ invariant(elem.type() == BSONType::Array);
+
BSONObjIterator iter(elem.Obj());
while (iter.more()) {
const BSONElement matchByElem = iter.next();
uassert(51134,
"{} 'on' array elements must be strings, but found "_format(
DocumentSourceMerge::kStageName, typeName(matchByElem.type())),
- matchByElem.type() == String);
+ matchByElem.type() == BSONType::String);
fields.push_back(matchByElem.str());
}
}
@@ -99,4 +102,40 @@ void mergeOnFieldsSerializeToBSON(const std::vector<std::string>& fields,
bob->append(fieldName, fields);
}
}
+
+MergeWhenMatchedPolicy mergeWhenMatchedParseFromBSON(const BSONElement& elem) {
+ uassert(51191,
+ "{} 'whenMatched' field must be either a string or an array, "
+ "but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())),
+ elem.type() == BSONType::String || elem.type() == BSONType::Array);
+
+ if (elem.type() == BSONType::Array) {
+ return {MergeWhenMatchedModeEnum::kPipeline,
+ uassertStatusOK(AggregationRequest::parsePipelineFromBSON(elem))};
+ }
+
+ invariant(elem.type() == BSONType::String);
+
+ IDLParserErrorContext ctx{DocumentSourceMergeSpec::kWhenMatchedFieldName};
+ auto value = elem.valueStringData();
+ auto mode = MergeWhenMatchedMode_parse(ctx, value);
+
+ // The 'kPipeline' mode cannot be specified explicitly, a custom pipeline definition must be
+ // used instead.
+ if (mode == MergeWhenMatchedModeEnum::kPipeline) {
+ ctx.throwBadEnumValue(value);
+ }
+ return {mode};
+}
+
+void mergeWhenMatchedSerializeToBSON(const MergeWhenMatchedPolicy& policy,
+ StringData fieldName,
+ BSONObjBuilder* bob) {
+ if (policy.mode == MergeWhenMatchedModeEnum::kPipeline) {
+ invariant(policy.pipeline);
+ bob->append(fieldName, *policy.pipeline);
+ } else {
+ bob->append(fieldName, MergeWhenMatchedMode_serializer(policy.mode));
+ }
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge_spec.h b/src/mongo/db/pipeline/document_source_merge_spec.h
index 358a379a39f..35c353bb78a 100644
--- a/src/mongo/db/pipeline/document_source_merge_spec.h
+++ b/src/mongo/db/pipeline/document_source_merge_spec.h
@@ -29,29 +29,49 @@
#pragma once
-#include "mongo/base/string_data.h"
-#include "mongo/db/namespace_string.h"
-
+#include <boost/optional.hpp>
#include <string>
#include <vector>
-namespace mongo {
+#include "mongo/base/string_data.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/document_source_merge_modes_gen.h"
+namespace mongo {
class BSONObjBuilder;
class BSONElement;
-// Serialize and deserialize functions for the $merge stage 'into' field which can be a single
-// string value, or an object
+// Defines a policy strategy describing what to do when there is a matching document in the target
+// collection. Can hold a value from the MergeWhenMatchedModeEnum, or a custom pipeline definition.
+struct MergeWhenMatchedPolicy {
+ MergeWhenMatchedModeEnum mode;
+ boost::optional<std::vector<BSONObj>> pipeline;
+};
+
+/**
+ * Serialize and deserialize functions for the $merge stage 'into' field which can be a single
+ * string value, or an object.
+ */
void mergeTargetNssSerializeToBSON(const NamespaceString& targetNss,
StringData fieldName,
BSONObjBuilder* bob);
NamespaceString mergeTargetNssParseFromBSON(const BSONElement& elem);
-// Serialize and deserialize functions for the $merge stage 'on' field which can be a single string
-// value, or array of strings.
+/**
+ * Serialize and deserialize functions for the $merge stage 'on' field which can be a single string
+ * value, or array of strings.
+ */
void mergeOnFieldsSerializeToBSON(const std::vector<std::string>& fields,
StringData fieldName,
BSONObjBuilder* bob);
std::vector<std::string> mergeOnFieldsParseFromBSON(const BSONElement& elem);
+/**
+ * Serialize and deserialize functions for the $merge stage 'whenMatched' field which can be either
+ * a string value, or an array of objects defining a custom pipeline.
+ */
+void mergeWhenMatchedSerializeToBSON(const MergeWhenMatchedPolicy& policy,
+ StringData fieldName,
+ BSONObjBuilder* bob);
+MergeWhenMatchedPolicy mergeWhenMatchedParseFromBSON(const BSONElement& elem);
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp
index 7a78fde7475..00f63636995 100644
--- a/src/mongo/db/pipeline/document_source_merge_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_test.cpp
@@ -130,6 +130,20 @@ TEST_F(DocumentSourceMergeTest, CorrectlyParsesIfIntoIsObject) {
ASSERT_EQ(mergeStage->getOutputNs().coll(), targetColl);
}
+TEST_F(DocumentSourceMergeTest, CorrectlyParsesIfWhenMatchedIsStringOrArray) {
+ auto spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
+ << "merge"));
+ ASSERT(createMergeStage(spec));
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
+ << BSONArray()));
+ ASSERT(createMergeStage(spec));
+}
+
TEST_F(DocumentSourceMergeTest, FailsToParseIncorrectMergeSpecType) {
auto spec = BSON("$merge" << 1);
ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51182);
@@ -276,24 +290,24 @@ TEST_F(DocumentSourceMergeTest, FailsToParseIfDbIsNotAValidDatabaseName) {
ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::InvalidNamespace);
}
-TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenMatchedModeIsNotString) {
+TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenMatchedModeIsNotStringOrArray) {
auto spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
<< true));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191);
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
- << BSONArray()));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+ << 100));
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191);
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
<< BSON("" << kDefaultWhenMatchedMode)));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191);
}
TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenNotMatchedModeIsNotString) {
@@ -623,6 +637,22 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
+ << BSON_ARRAY(BSON("$project" << BSON("x" << 1)))
+ << "whenNotMatched"
+ << "insert"));
+ ASSERT(createMergeStage(spec));
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
+ << "pipeline"
+ << "whenNotMatched"
+ << "insert"));
+ ASSERT_THROWS_CODE(createMergeStage(spec), DBException, ErrorCodes::BadValue);
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
<< "replaceWithNew"
<< "whenNotMatched"
<< "fail"));
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index f533ea3aaed..6dd3fda1d5c 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -321,6 +321,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::create(
expCtx,
MergeWhenMatchedModeEnum::kFail,
MergeWhenNotMatchedModeEnum::kInsert,
+ boost::none, /* no custom pipeline */
std::move(uniqueKey),
targetCollectionVersion,
true /* serialize as $out stage */);
@@ -329,6 +330,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::create(
expCtx,
MergeWhenMatchedModeEnum::kReplaceWithNew,
MergeWhenNotMatchedModeEnum::kInsert,
+ boost::none /* no custom pipeline */,
std::move(uniqueKey),
targetCollectionVersion,
true /* serialize as $out stage */);
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index d2e947f5a32..011b7671776 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -42,6 +42,7 @@
#include "mongo/db/generic_cursor.h"
#include "mongo/db/matcher/expression.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
@@ -134,7 +135,7 @@ public:
virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 0d29304b902..472805ae939 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -111,7 +111,7 @@ public:
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index fe8de93ea71..5540bc7e57e 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -133,7 +133,7 @@ void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionCont
void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index f87333eeda2..b3c0a5885f7 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -77,7 +77,7 @@ public:
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index a372c51ad93..fda80b67518 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -193,7 +193,7 @@ Insert MongoInterfaceStandalone::buildInsertOp(const NamespaceString& nss,
Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
bool upsert,
bool multi,
bool bypassDocValidation) {
@@ -245,7 +245,7 @@ void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionConte
void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
@@ -257,7 +257,6 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte
upsert,
multi,
expCtx->bypassDocumentValidation));
-
// Need to check each result in the batch since the writes are unordered.
uassertStatusOKWithContext(
[&writeResults]() {
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 25e9d04b701..0febf6073bf 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -65,7 +65,7 @@ public:
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
@@ -155,7 +155,7 @@ protected:
*/
Update buildUpdateOp(const NamespaceString& nss,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
bool upsert,
bool multi,
bool bypassDocValidation);
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index dde7a2395f5..931e3f5f17f 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -74,7 +74,7 @@ public:
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,