diff options
author | Anton Korshunov <anton.korshunov@mongodb.com> | 2019-05-06 20:37:22 +0100 |
---|---|---|
committer | Anton Korshunov <anton.korshunov@mongodb.com> | 2019-05-13 21:40:17 +0100 |
commit | dde091c07989ffaefc57705859abf6517beeeace (patch) | |
tree | c5639b56f03fa24f27aeb9e3422617907be6dc77 | |
parent | e4b13ae68a4eef9393357038f09f14bfd8102050 (diff) | |
download | mongo-dde091c07989ffaefc57705859abf6517beeeace.tar.gz |
SERVER-40431 Add merge support for whenMatched: pipeline
19 files changed, 643 insertions, 68 deletions
diff --git a/jstests/aggregation/sources/merge/mode_pipeline_insert.js b/jstests/aggregation/sources/merge/mode_pipeline_insert.js new file mode 100644 index 00000000000..ee8c63d19ce --- /dev/null +++ b/jstests/aggregation/sources/merge/mode_pipeline_insert.js @@ -0,0 +1,397 @@ +// Tests the behaviour of the $merge stage with whenMatched=[<pipeline>] and whenNotMatched=insert. +// +// Cannot implicitly shard accessed collections because a collection can be implictly created and +// exists when none is expected. +// @tags: [assumes_no_implicit_collection_creation_after_drop] +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertArrayEq. + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos. + + // Asserts that two arrays are equal - that is, if their sizes are equal and each element in + // the 'actual' array has a matching element in the 'expected' array, without honoring elements + // order. + function assertArrayEq({actual = [], expected = []} = {}) { + assert(arrayEq(actual, expected), `actual=${tojson(actual)}, expected=${tojson(expected)}`); + } + + // A helper function to create a pipeline with a $merge stage using a custom 'updatePipeline' + // for the whenMatched mode. If 'initialStages' array is specified, the $merge stage will be + // appened to this array and the result returned to the caller, otherwise an array with a + // single $merge stage is returned. An output collection for the $merge stage is specified + // in the 'target', and the $merge stage 'on' fields in the 'on' parameter. + function makeMergePipeline( + {target = "", initialStages = [], updatePipeline = [], on = "_id"} = {}) { + return initialStages.concat([{ + $merge: + {into: target, on: on, whenMatched: updatePipeline, whenNotMatched: "insert"} + }]); + } + + const source = db[`${jsTest.name()}_source`]; + source.drop(); + const target = db[`${jsTest.name()}_target`]; + target.drop(); + + (function testMergeIntoNonExistentCollection() { + assert.commandWorked(source.insert({_id: 1, a: 1, b: "a"})); + assert.doesNotThrow( + () => source.aggregate(makeMergePipeline( + {target: target.getName(), updatePipeline: [{$addFields: {x: 1}}]}))); + assertArrayEq({ + actual: target.find().toArray(), + expected: [ + {_id: 1, x: 1}, + ] + }); + })(); + + // Test $merge inserts a document into an existing target collection if no matching document + // is found. + (function testMergeInsertsDocumentIfMatchNotFound() { + assert.commandWorked(target.deleteMany({})); + assert.doesNotThrow( + () => source.aggregate(makeMergePipeline( + {target: target.getName(), updatePipeline: [{$addFields: {x: 1, y: 2}}]}))); + assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 1, y: 2}]}); + })(); + + // Test $merge updates an existing document in the target collection by applying a + // pipeline-style update. + (function testMergeUpdatesDocumentIfMatchFound() { + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ + target: target.getName(), + updatePipeline: [{$project: {x: {$add: ["$x", 1]}, y: {$add: ["$y", 2]}}}] + }))); + assertArrayEq({actual: target.find().toArray(), expected: [{_id: 1, x: 2, y: 4}]}); + })(); + + // Test $merge with various pipeline stages which are currently supported by the pipeline-style + // update. + (function testMergeWithSupportedUpdatePipelineStages() { + assert(source.drop()); + assert(target.drop()); + + assert.commandWorked(source.insert([{_id: 1, a: 1}, {_id: 2, a: 2}])); + assert.commandWorked(target.insert({_id: 1, b: 1})); + + // Test $addFields stage. + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ + target: target.getName(), + updatePipeline: [{$addFields: {x: {$add: ["$b", 1]}}}] + }))); + assertArrayEq( + {actual: target.find().toArray(), expected: [{_id: 1, b: 1, x: 2}, {_id: 2, x: null}]}); + + // Test $project stage. + assert(target.drop()); + assert.commandWorked(target.insert({_id: 1, b: 1})); + assert.doesNotThrow( + () => source.aggregate(makeMergePipeline( + {target: target.getName(), updatePipeline: [{$project: {x: {$add: ["$b", 1]}}}]}))); + assertArrayEq( + {actual: target.find().toArray(), expected: [{_id: 1, x: 2}, {_id: 2, x: null}]}); + + // Test $replaceRoot stage. + assert(target.drop()); + assert.commandWorked( + target.insert([{_id: 1, b: 1, c: {x: {y: 1}}}, {_id: 2, b: 2, c: {x: {y: 2}}}])); + assert.doesNotThrow( + () => source.aggregate(makeMergePipeline( + {target: target.getName(), updatePipeline: [{$replaceRoot: {newRoot: "$c"}}]}))); + assertArrayEq({ + actual: target.find().toArray(), + expected: [{_id: 1, x: {y: 1}}, {_id: 2, x: {y: 2}}] + }); + })(); + + // Test $merge inserts a new document into the target collection if not matching document is + // found by applying a pipeline-style update with upsert=true semantics. + (function testMergeInsertDocumentIfMatchNotFound() { + assert(source.drop()); + assert(target.drop()); + assert.commandWorked(source.insert({_id: 1, a: 1})); + assert.commandWorked(target.insert({_id: 2, a: 2})); + assert.doesNotThrow( + () => source.aggregate(makeMergePipeline( + {target: target.getName(), updatePipeline: [{$addFields: {x: 1}}]}))); + assertArrayEq( + {actual: target.find().toArray(), expected: [{_id: 1, x: 1}, {_id: 2, a: 2}]}); + })(); + + // Test $merge doesn't modify the target collection if a document has been removed from the + // source collection. + (function testMergeDoesNotUpdateDeletedDocument() { + assert.commandWorked(source.deleteOne({_id: 1})); + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ + target: target.getName(), + updatePipeline: [{$project: {x: {$add: ["$x", 1]}, a: 1}}] + }))); + assertArrayEq({ + actual: target.find().toArray(), + expected: [ + {_id: 1, x: 1}, + {_id: 2, a: 2}, + ] + }); + })(); + + // Test $merge fails if a unique index constraint in the target collection is violated. + (function testMergeFailsIfTargetUniqueKeyIsViolated() { + if (FixtureHelpers.isSharded(source)) { + // Skip this test if the collection sharded, because an implicitly created sharded + // key of {_id: 1} will not be covered by a unique index created in this test, which + // is not allowed. + return; + } + + assert(target.drop()); + assert.commandWorked(source.insert({_id: 4, a: 2})); + assert.commandWorked(target.insert([{_id: 1, x: 1}, {_id: 2, a: 2}])); + assert.commandWorked(target.createIndex({a: 1}, {unique: true})); + const error = assert.throws( + () => source.aggregate(makeMergePipeline( + {target: target.getName(), updatePipeline: [{$project: {x: 1, a: 1}}]}))); + assert.commandFailedWithCode(error, ErrorCodes.DuplicateKey); + assertArrayEq({ + actual: target.find().toArray(), + expected: [ + {_id: 1, x: 1}, + {_id: 2, a: 2}, + ] + }); + assert.commandWorked(target.dropIndex({a: 1})); + })(); + + // Test $merge fails if it cannot find an index to verify that the 'on' fields will be unique. + (function testMergeFailsIfOnFieldCannotBeVerifiedForUniquness() { + // The 'on' fields contains a single document field. + let error = assert.throws(() => source.aggregate(makeMergePipeline({ + target: target.getName(), + on: "nonexistent", + updatePipeline: [{$project: {x: 1, a: 1}}] + }))); + assert.commandFailedWithCode(error, [51190, 51183]); + + // The 'on' fields contains multiple document fields. + error = assert.throws(() => source.aggregate(makeMergePipeline({ + target: target.getName(), + on: ["nonexistent1", "nonexistent2"], + updatePipeline: [{$project: {x: 1, a: 1}}] + }))); + assert.commandFailedWithCode(error, [51190, 51183]); + })(); + + // Test $merge with an explicit 'on' field over a single or multiple document fields which + // differ from the _id field. + (function testMergeWithOnFields() { + if (FixtureHelpers.isSharded(source)) { + // Skip this test if the collection sharded, because an implicitly created sharded + // key of {_id: 1} will not be covered by a unique index created in this test, which + // is not allowed. + return; + } + + // The 'on' fields contains a single document field. + assert(source.drop()); + assert(target.drop()); + assert.commandWorked(source.createIndex({a: 1}, {unique: true})); + assert.commandWorked(target.createIndex({a: 1}, {unique: true})); + assert.commandWorked(source.insert([{_id: 1, a: 1}, {_id: 2, a: 2}, {_id: 3, a: 30}])); + assert.commandWorked( + target.insert([{_id: 1, a: 1, b: 1}, {_id: 4, a: 30, b: 2}, {_id: 5, a: 40, b: 3}])); + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ + initialStages: [{$project: {_id: 0}}], + target: target.getName(), + on: "a", + updatePipeline: [{$addFields: {z: 1}}] + }))); + assertArrayEq({ + actual: target.find({}, {_id: 0}).toArray(), + expected: [{a: 1, b: 1, z: 1}, {a: 2, z: 1}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}] + }); + + // The 'on' fields contains multiple document fields. + assert(source.drop()); + assert(target.drop()); + assert.commandWorked(source.createIndex({a: 1, b: 1}, {unique: true})); + assert.commandWorked(target.createIndex({a: 1, b: 1}, {unique: true})); + assert.commandWorked( + source.insert([{_id: 1, a: 1, b: 1}, {_id: 2, a: 2, b: 4}, {_id: 3, a: 30, b: 2}])); + assert.commandWorked( + target.insert([{_id: 1, a: 1, b: 1}, {_id: 4, a: 30, b: 2}, {_id: 5, a: 40, b: 3}])); + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ + initialStages: [{$project: {_id: 0}}], + target: target.getName(), + on: ["a", "b"], + updatePipeline: [{$addFields: {z: 1}}] + }))); + assertArrayEq({ + actual: target.find({}, {_id: 0}).toArray(), + expected: + [{a: 1, b: 1, z: 1}, {a: 2, b: 4, z: 1}, {a: 30, b: 2, z: 1}, {a: 40, b: 3}] + }); + assert.commandWorked(source.dropIndex({a: 1, b: 1})); + assert.commandWorked(target.dropIndex({a: 1, b: 1})); + })(); + + // Test $merge with a dotted path in the 'on' field. + (function testMergeWithDottedOnField() { + if (FixtureHelpers.isSharded(source)) { + // Skip this test if the collection sharded, because an implicitly created sharded + // key of {_id: 1} will not be covered by a unique index created in this test, which + // is not allowed. + return; + } + + assert(source.drop()); + assert(target.drop()); + assert.commandWorked(source.createIndex({"a.b": 1}, {unique: true})); + assert.commandWorked(target.createIndex({"a.b": 1}, {unique: true})); + assert.commandWorked(source.insert([ + {_id: 1, a: {b: "b"}, c: "x"}, + {_id: 2, a: {b: "c"}, c: "y"}, + {_id: 3, a: {b: 30}, b: "c"} + ])); + assert.commandWorked(target.insert({_id: 2, a: {b: "c"}, c: "y"})); + assert.doesNotThrow(() => source.aggregate(makeMergePipeline({ + initialStages: [{$project: {_id: 0}}], + target: target.getName(), + on: "a.b", + updatePipeline: [{$addFields: {z: 1}}] + }))); + assertArrayEq({ + actual: target.find().toArray(), + expected: [ + {_id: 1, a: {b: "b"}, z: 1}, + {_id: 2, a: {b: "c"}, c: "y", z: 1}, + {_id: 3, a: {b: 30}, z: 1} + ] + }); + })(); + + // Test $merge fails if the value of the 'on' field in a document is invalid, e.g. missing, + // null or an array. + (function testMergeFailsIfOnFieldIsInvalid() { + if (FixtureHelpers.isSharded(source)) { + // Skip this test if the collection sharded, because an implicitly created sharded + // key of {_id: 1} will not be covered by a unique index created in this test, which + // is not allowed. + return; + } + + assert(source.drop()); + assert(target.drop()); + assert.commandWorked(source.createIndex({"z": 1}, {unique: true})); + assert.commandWorked(target.createIndex({"z": 1}, {unique: true})); + + const pipeline = makeMergePipeline({ + initialStages: [{$project: {_id: 0}}], + target: target.getName(), + on: "z", + updatePipeline: [{$addFields: {z: 1}}] + }); + + // The 'on' field is missing. + assert.commandWorked(source.insert({_id: 1})); + let error = assert.throws(() => source.aggregate(pipeline)); + assert.commandFailedWithCode(error, 51132); + + // The 'on' field is null. + assert.commandWorked(source.update({_id: 1}, {z: null})); + error = assert.throws(() => source.aggregate(pipeline)); + assert.commandFailedWithCode(error, 51132); + + // The 'on' field is an array. + assert.commandWorked(source.update({_id: 1}, {z: [1, 2]})); + error = assert.throws(() => source.aggregate(pipeline)); + assert.commandFailedWithCode(error, 51185); + })(); + + // Test $merge when the _id field is removed from the aggregate projection but is used in the + // $merge's 'on' field. When the _id is missing, the $merge stage will create a new ObjectId in + // its place before performing the insert or update. + (function testMergeWhenDocIdIsRemovedFromProjection() { + const pipeline = makeMergePipeline({ + initialStages: [{$project: {_id: 0}}], + target: target.getName(), + updatePipeline: [{$addFields: {z: 1}}] + }); + + // The _id is a single 'on' field (a default one). + assert(source.drop()); + assert(target.drop()); + assert.commandWorked(source.insert([{_id: 1, a: 1, b: "a"}, {_id: 2, a: 2, b: "b"}])); + assert.commandWorked(target.insert({_id: 1, b: "c"})); + assert.doesNotThrow(() => source.aggregate(pipeline)); + assertArrayEq({ + actual: target.find({}, {_id: 0}).toArray(), + // There is a matching document in the target with {_id: 1}, but since we cannot match + // it (no _id in projection), we just insert two new documents from the source + // collection by applying a pipeline-style update. + expected: [{b: "c"}, {z: 1}, {z: 1}] + }); + + // The _id is part of the compound 'on' field. + assert(target.drop()); + assert.commandWorked(target.insert({_id: 1, b: "c"})); + assert.commandWorked(source.createIndex({_id: 1, a: -1})); + assert.commandWorked(target.createIndex({_id: 1, a: -1})); + assert.doesNotThrow(() => source.aggregate(pipeline)); + assertArrayEq( + {actual: target.find({}, {_id: 0}).toArray(), expected: [{b: "c"}, {z: 1}, {z: 1}]}); + assert.commandWorked(source.dropIndex({_id: 1, a: -1})); + assert.commandWorked(target.dropIndex({_id: 1, a: -1})); + })(); + + // Test $merge preserves indexes and options of the existing target collection. + (function testMergePresrvesIndexesAndOptions() { + const validator = {z: {$gt: 0}}; + assert(target.drop()); + assert.commandWorked(db.createCollection(target.getName(), {validator: validator})); + assert.commandWorked(target.createIndex({a: 1})); + assert.doesNotThrow( + () => source.aggregate(makeMergePipeline( + {target: target.getName(), updatePipeline: [{$addFields: {z: 1}}]}))); + assertArrayEq( + {actual: target.find().toArray(), expected: [{_id: 1, z: 1}, {_id: 2, z: 1}]}); + assert.eq(2, target.getIndexes().length); + + const listColl = db.runCommand({listCollections: 1, filter: {name: target.getName()}}); + assert.commandWorked(listColl); + assert.eq(validator, listColl.cursor.firstBatch[0].options["validator"]); + })(); + + // Test $merge implicitly creates a new database when the target collection's database doesn't + // exist. + (function testMergeImplicitlyCreatesTargetDatabase() { + assert(source.drop()); + assert.commandWorked(source.insert({_id: 1, a: 1, b: "a"})); + + const foreignDb = db.getSiblingDB(`${jsTest.name()}_foreign_db`); + assert.commandWorked(foreignDb.dropDatabase()); + const foreignTarget = foreignDb[`${jsTest.name()}_target`]; + const foreignPipeline = makeMergePipeline({ + target: {db: foreignDb.getName(), coll: foreignTarget.getName()}, + updatePipeline: [{$addFields: {z: 1}}] + }); + + if (!FixtureHelpers.isMongos(db)) { + assert.doesNotThrow(() => source.aggregate(foreignPipeline)); + assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, z: 1}]}); + } else { + // Implicit database creation is prohibited in a cluster. + const error = assert.throws(() => source.aggregate(foreignPipeline)); + assert.commandFailedWithCode(error, ErrorCodes.NamespaceNotFound); + + // Force a creation of the database and collection, then fall through the test + // below. + assert.commandWorked(foreignTarget.insert({_id: 1})); + } + + assert.doesNotThrow(() => source.aggregate(foreignPipeline)); + assertArrayEq({actual: foreignTarget.find().toArray(), expected: [{_id: 1, z: 1}]}); + assert.commandWorked(foreignDb.dropDatabase()); + })(); +}()); diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp index c15aafce0d0..f6ff83cdad7 100644 --- a/src/mongo/db/ops/write_ops_parsers.cpp +++ b/src/mongo/db/ops/write_ops_parsers.cpp @@ -237,6 +237,9 @@ write_ops::UpdateModification::UpdateModification(const BSONObj& update) { _type = Type::kClassic; } +write_ops::UpdateModification::UpdateModification(std::vector<BSONObj> pipeline) + : _type{Type::kPipeline}, _pipeline{std::move(pipeline)} {} + write_ops::UpdateModification write_ops::UpdateModification::parseFromBSON(BSONElement elem) { return UpdateModification(elem); } diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h index f116b79b63e..2058ca4a055 100644 --- a/src/mongo/db/ops/write_ops_parsers.h +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -63,7 +63,7 @@ public: UpdateModification() = default; UpdateModification(BSONElement update); - + UpdateModification(std::vector<BSONObj> pipeline); // This constructor exists only to provide a fast-path for constructing classic-style updates. UpdateModification(const BSONObj& update); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 6a0ccb087bb..393f9742a33 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -599,6 +599,7 @@ env.Library( env.Idlc('document_source_change_stream.idl')[0], env.Idlc('document_source_list_sessions.idl')[0], env.Idlc('document_source_merge.idl')[0], + env.Idlc('document_source_merge_modes.idl')[0], env.Idlc('document_source_out.idl')[0], env.Idlc('exchange_spec.idl')[0], env.Idlc('runtime_constants.idl')[0], diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 3177affe8b1..e02989eddf6 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -68,6 +68,7 @@ constexpr auto kMergeInsertMode = MergeMode{WhenMatched::kMerge, WhenNotMatched: constexpr auto kKeepExistingInsertMode = MergeMode{WhenMatched::kKeepExisting, WhenNotMatched::kInsert}; constexpr auto kFailInsertMode = MergeMode{WhenMatched::kFail, WhenNotMatched::kInsert}; +constexpr auto kPipelineInsertMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kInsert}; /** * Creates a merge strategy which uses update semantics to do perform a merge operation. If @@ -84,7 +85,7 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) { expCtx->mongoProcessInterface->update(expCtx, ns, std::move(batch.uniqueKeys), - std::move(batch.objects), + std::move(batch.modifications), wc, upsert, multi, @@ -97,7 +98,14 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) { */ MergeStrategy makeInsertStrategy() { return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { - expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(batch.objects), wc, epoch); + std::vector<BSONObj> objectsToInsert(batch.size()); + // The batch stores replacement style updates, but for this "insert" style of $merge we'd + // like to just insert the new document without attempting any sort of replacement. + std::transform(batch.modifications.begin(), + batch.modifications.end(), + objectsToInsert.begin(), + [](const auto& mod) { return mod.getUpdateClassic(); }); + expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(objectsToInsert), wc, epoch); }; } @@ -107,10 +115,11 @@ MergeStrategy makeInsertStrategy() { */ BatchTransform makeUpdateTransform(const std::string& updateOp) { return [updateOp](auto& batch) { - std::transform(batch.objects.begin(), - batch.objects.end(), - batch.objects.begin(), - [updateOp](const auto& obj) { return BSON(updateOp << obj); }); + std::transform( + batch.modifications.begin(), + batch.modifications.end(), + batch.modifications.begin(), + [updateOp](const auto& mod) { return BSON(updateOp << mod.getUpdateClassic()); }); }; } @@ -142,6 +151,10 @@ const MergeStrategyDescriptorsMap& getDescriptors() { {kKeepExistingInsertMode, {ActionType::insert, ActionType::update}, makeUpdateStrategy(true, makeUpdateTransform("$setOnInsert"))}}, + {kPipelineInsertMode, + {kPipelineInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(true, {})}}, {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}}; return mergeStrategyDescriptors; } @@ -331,7 +344,8 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed auto mergeSpec = parseMergeSpecAndResolveTargetNamespace(spec, request.getNamespaceString().db()); auto targetNss = mergeSpec.getTargetNss(); - auto whenMatched = mergeSpec.getWhenMatched().value_or(kDefaultWhenMatched); + auto whenMatched = + mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched; auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched); uassert(51181, "Combination of {} modes 'whenMatched: {}' and 'whenNotMatched: {}' " @@ -354,6 +368,7 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MergeStrategyDescriptor& descriptor, + boost::optional<std::vector<BSONObj>>&& pipeline, std::set<FieldPath> mergeOnFields, boost::optional<ChunkVersion> targetCollectionVersion, bool serializeAsOutStage) @@ -363,6 +378,7 @@ DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs, _targetCollectionVersion(targetCollectionVersion), _done(false), _descriptor(descriptor), + _pipeline(std::move(pipeline)), _mergeOnFields(std::move(mergeOnFields)), _mergeOnFieldsIncludesId(_mergeOnFields.count("_id") == 1), _serializeAsOutStage(serializeAsOutStage) {} @@ -372,6 +388,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, WhenMatched whenMatched, WhenNotMatched whenNotMatched, + boost::optional<std::vector<BSONObj>>&& pipeline, std::set<FieldPath> mergeOnFields, boost::optional<ChunkVersion> targetCollectionVersion, bool serializeAsOutStage) { @@ -404,6 +421,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create( return new DocumentSourceMerge(outputNs, expCtx, getDescriptors().at({whenMatched, whenNotMatched}), + std::move(pipeline), mergeOnFields, targetCollectionVersion, serializeAsOutStage); @@ -417,8 +435,10 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson( auto mergeSpec = parseMergeSpecAndResolveTargetNamespace(spec, expCtx->ns.db()); auto targetNss = mergeSpec.getTargetNss(); - auto whenMatched = mergeSpec.getWhenMatched().value_or(kDefaultWhenMatched); + auto whenMatched = + mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched; auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched); + auto pipeline = mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->pipeline : boost::none; // TODO SERVER-40432: move resolveMergeOnFieldsOnMongo* into MongoProcessInterface. auto[mergeOnFields, targetCollectionVersion] = expCtx->inMongos ? resolveMergeOnFieldsOnMongoS(expCtx, mergeSpec, targetNss) @@ -428,6 +448,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson( expCtx, whenMatched, whenNotMatched, + std::move(pipeline), std::move(mergeOnFields), targetCollectionVersion, false /* serialize as $out stage */); @@ -477,16 +498,17 @@ DocumentSource::GetNextResult DocumentSourceMerge::getNext() { // Extract the 'on' fields before converting the document to BSON. auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields); - auto insertObj = doc.toBson(); + auto mod = _pipeline ? write_ops::UpdateModification(*_pipeline) + : write_ops::UpdateModification(doc.toBson()); - bufferedBytes += insertObj.objsize(); + bufferedBytes += mod.objsize(); if (!batch.empty() && (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) { spill(std::move(batch)); batch.clear(); - bufferedBytes = insertObj.objsize(); + bufferedBytes = mod.objsize(); } - batch.emplace(std::move(insertObj), std::move(mergeOnFields)); + batch.emplace(std::move(mod), std::move(mergeOnFields)); } if (!batch.empty()) { spill(std::move(batch)); @@ -537,7 +559,7 @@ Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity> } else { DocumentSourceMergeSpec spec; spec.setTargetNss(_outputNs); - spec.setWhenMatched(_descriptor.mode.first); + spec.setWhenMatched(MergeWhenMatchedPolicy{_descriptor.mode.first, _pipeline}); spec.setWhenNotMatched(_descriptor.mode.second); spec.setOn([&]() { std::vector<std::string> mergeOnFields; diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index 9360cd978d6..037d2f54962 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -50,25 +50,29 @@ public: * portion of the update or insert. */ struct BatchedObjects { - void emplace(BSONObj&& obj, BSONObj&& key) { - objects.emplace_back(std::move(obj)); + void emplace(write_ops::UpdateModification&& mod, BSONObj&& key) { + modifications.emplace_back(std::move(mod)); uniqueKeys.emplace_back(std::move(key)); } bool empty() const { - return objects.empty(); + return modifications.empty(); } size_t size() const { - return objects.size(); + return modifications.size(); } void clear() { - objects.clear(); + modifications.clear(); uniqueKeys.clear(); } - std::vector<BSONObj> objects; + // For each element in the batch we store an UpdateModification which is either the new + // document we want to upsert or insert into the collection (i.e. a 'classic' replacement + // update), or the pipeline to run to compute the new document. + std::vector<write_ops::UpdateModification> modifications; + // Store the unique keys as BSON objects instead of Documents for compatibility with the // batch update command. (e.g. {q: <array of uniqueKeys>, u: <array of objects>}) std::vector<BSONObj> uniqueKeys; @@ -121,6 +125,7 @@ public: DocumentSourceMerge(NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MergeStrategyDescriptor& descriptor, + boost::optional<std::vector<BSONObj>>&& pipeline, std::set<FieldPath> mergeOnFields, boost::optional<ChunkVersion> targetCollectionVersion, bool serializeAsOutStage); @@ -199,6 +204,7 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, MergeStrategyDescriptor::WhenMatched whenMatched, MergeStrategyDescriptor::WhenNotMatched whenNotMatched, + boost::optional<std::vector<BSONObj>>&& pipeline, std::set<FieldPath> mergeOnFields, boost::optional<ChunkVersion> targetCollectionVersion, bool serializeAsOutStage); @@ -250,6 +256,9 @@ private: // descriptor. const MergeStrategyDescriptor& _descriptor; + // A custom pipeline to compute a new version of merging documents. + boost::optional<std::vector<BSONObj>> _pipeline; + // Holds the fields used for uniquely identifying documents. There must exist a unique index // with this key pattern. Default is "_id" for unsharded collections, and "_id" plus the shard // key for sharded collections. diff --git a/src/mongo/db/pipeline/document_source_merge.idl b/src/mongo/db/pipeline/document_source_merge.idl index b2c70a19ea0..fabda43b017 100644 --- a/src/mongo/db/pipeline/document_source_merge.idl +++ b/src/mongo/db/pipeline/document_source_merge.idl @@ -35,25 +35,10 @@ global: - "mongo/db/pipeline/document_source_merge_spec.h" imports: + - "mongo/db/pipeline/document_source_merge_modes.idl" - "mongo/idl/basic_types.idl" - "mongo/s/chunk_version.idl" -enums: - MergeWhenMatchedMode: - description: "Possible merge mode values for 'whenMatched'." - type: string - values: - kFail: "fail" - kMerge: "merge" - kKeepExisting: "keepExisting" - kReplaceWithNew: "replaceWithNew" - - MergeWhenNotMatchedMode: - description: "Possible merge mode values for 'whenNotMatched'." - type: string - values: - kInsert: "insert" - types: MergeTargetNss: bson_serialization_type: any @@ -70,6 +55,15 @@ types: serializer: "::mongo::mergeOnFieldsSerializeToBSON" deserializer: "::mongo::mergeOnFieldsParseFromBSON" + MergeWhenMatchedPolicy: + bson_serialization_type: any + description: Defines a policy strategy describing what to do when there is a matching + document in the target collection. Can hold a value from the + MergeWhenMatchedMode enum, or a custom pipeline definition. + cpp_type: "::mongo::MergeWhenMatchedPolicy" + serializer: "::mongo::mergeWhenMatchedSerializeToBSON" + deserializer: "::mongo::mergeWhenMatchedParseFromBSON" + structs: NamespaceSpec: description: A document used to specify a namespace. @@ -99,7 +93,7 @@ structs: description: A single field or array of fields that uniquely identify a document. whenMatched: - type: MergeWhenMatchedMode + type: MergeWhenMatchedPolicy optional: true description: The merge mode for the merge operation when source and target elements match. diff --git a/src/mongo/db/pipeline/document_source_merge_modes.idl b/src/mongo/db/pipeline/document_source_merge_modes.idl new file mode 100644 index 00000000000..300e6314622 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_merge_modes.idl @@ -0,0 +1,58 @@ +# Copyright (C) 2019-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# Merge modes for the document source merge stage. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +enums: + MergeWhenMatchedMode: + description: "Possible merge mode values for $merge's 'whenMatched' field." + type: string + values: + kFail: "fail" + kKeepExisting: "keepExisting" + kMerge: "merge" + # Technically, we don't need this item, as the 'pipeline' value, as a string, is not + # supported by the 'whenMatched' field. Instead, a pipeline definition, as an array of + # objects, should be used. However, to avoid special casing logic in + # DocumentSourceMerge, and to keep all merge strategy definitions in a single + # descriptors map, which keys are pairs of whenMatched/whenNotMatched values, this + # 'kPipeline' element is added for internal use only. + kPipeline: "pipeline" + kReplaceWithNew: "replaceWithNew" + + MergeWhenNotMatchedMode: + description: "Possible merge mode values for $merge's 'whenNotMatched'. field" + type: string + values: + kInsert: "insert" diff --git a/src/mongo/db/pipeline/document_source_merge_spec.cpp b/src/mongo/db/pipeline/document_source_merge_spec.cpp index cd3eaa77324..4a08f84cfa1 100644 --- a/src/mongo/db/pipeline/document_source_merge_spec.cpp +++ b/src/mongo/db/pipeline/document_source_merge_spec.cpp @@ -34,6 +34,7 @@ #include <fmt/format.h> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_source_merge.h" #include "mongo/db/pipeline/document_source_merge_gen.h" @@ -44,14 +45,14 @@ NamespaceString mergeTargetNssParseFromBSON(const BSONElement& elem) { uassert(51178, "{} 'into' field must be either a string or an object, " "but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())), - elem.type() == String || elem.type() == Object); + elem.type() == BSONType::String || elem.type() == BSONType::Object); - if (elem.type() == String) { + if (elem.type() == BSONType::String) { return {"", elem.valueStringData()}; - } else { - auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject()); - return {spec.getDb().value_or(""), spec.getColl().value_or("")}; } + + auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject()); + return {spec.getDb().value_or(""), spec.getColl().value_or("")}; } void mergeTargetNssSerializeToBSON(const NamespaceString& targetNss, @@ -66,18 +67,20 @@ std::vector<std::string> mergeOnFieldsParseFromBSON(const BSONElement& elem) { uassert(51186, "{} 'into' field must be either a string or an array of strings, " "but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())), - elem.type() == String || elem.type() == Array); + elem.type() == BSONType::String || elem.type() == BSONType::Array); - if (elem.type() == String) { + if (elem.type() == BSONType::String) { fields.push_back(elem.str()); } else { + invariant(elem.type() == BSONType::Array); + BSONObjIterator iter(elem.Obj()); while (iter.more()) { const BSONElement matchByElem = iter.next(); uassert(51134, "{} 'on' array elements must be strings, but found "_format( DocumentSourceMerge::kStageName, typeName(matchByElem.type())), - matchByElem.type() == String); + matchByElem.type() == BSONType::String); fields.push_back(matchByElem.str()); } } @@ -99,4 +102,40 @@ void mergeOnFieldsSerializeToBSON(const std::vector<std::string>& fields, bob->append(fieldName, fields); } } + +MergeWhenMatchedPolicy mergeWhenMatchedParseFromBSON(const BSONElement& elem) { + uassert(51191, + "{} 'whenMatched' field must be either a string or an array, " + "but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())), + elem.type() == BSONType::String || elem.type() == BSONType::Array); + + if (elem.type() == BSONType::Array) { + return {MergeWhenMatchedModeEnum::kPipeline, + uassertStatusOK(AggregationRequest::parsePipelineFromBSON(elem))}; + } + + invariant(elem.type() == BSONType::String); + + IDLParserErrorContext ctx{DocumentSourceMergeSpec::kWhenMatchedFieldName}; + auto value = elem.valueStringData(); + auto mode = MergeWhenMatchedMode_parse(ctx, value); + + // The 'kPipeline' mode cannot be specified explicitly, a custom pipeline definition must be + // used instead. + if (mode == MergeWhenMatchedModeEnum::kPipeline) { + ctx.throwBadEnumValue(value); + } + return {mode}; +} + +void mergeWhenMatchedSerializeToBSON(const MergeWhenMatchedPolicy& policy, + StringData fieldName, + BSONObjBuilder* bob) { + if (policy.mode == MergeWhenMatchedModeEnum::kPipeline) { + invariant(policy.pipeline); + bob->append(fieldName, *policy.pipeline); + } else { + bob->append(fieldName, MergeWhenMatchedMode_serializer(policy.mode)); + } +} } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_merge_spec.h b/src/mongo/db/pipeline/document_source_merge_spec.h index 358a379a39f..35c353bb78a 100644 --- a/src/mongo/db/pipeline/document_source_merge_spec.h +++ b/src/mongo/db/pipeline/document_source_merge_spec.h @@ -29,29 +29,49 @@ #pragma once -#include "mongo/base/string_data.h" -#include "mongo/db/namespace_string.h" - +#include <boost/optional.hpp> #include <string> #include <vector> -namespace mongo { +#include "mongo/base/string_data.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/document_source_merge_modes_gen.h" +namespace mongo { class BSONObjBuilder; class BSONElement; -// Serialize and deserialize functions for the $merge stage 'into' field which can be a single -// string value, or an object +// Defines a policy strategy describing what to do when there is a matching document in the target +// collection. Can hold a value from the MergeWhenMatchedModeEnum, or a custom pipeline definition. +struct MergeWhenMatchedPolicy { + MergeWhenMatchedModeEnum mode; + boost::optional<std::vector<BSONObj>> pipeline; +}; + +/** + * Serialize and deserialize functions for the $merge stage 'into' field which can be a single + * string value, or an object. + */ void mergeTargetNssSerializeToBSON(const NamespaceString& targetNss, StringData fieldName, BSONObjBuilder* bob); NamespaceString mergeTargetNssParseFromBSON(const BSONElement& elem); -// Serialize and deserialize functions for the $merge stage 'on' field which can be a single string -// value, or array of strings. +/** + * Serialize and deserialize functions for the $merge stage 'on' field which can be a single string + * value, or array of strings. + */ void mergeOnFieldsSerializeToBSON(const std::vector<std::string>& fields, StringData fieldName, BSONObjBuilder* bob); std::vector<std::string> mergeOnFieldsParseFromBSON(const BSONElement& elem); +/** + * Serialize and deserialize functions for the $merge stage 'whenMatched' field which can be either + * a string value, or an array of objects defining a custom pipeline. + */ +void mergeWhenMatchedSerializeToBSON(const MergeWhenMatchedPolicy& policy, + StringData fieldName, + BSONObjBuilder* bob); +MergeWhenMatchedPolicy mergeWhenMatchedParseFromBSON(const BSONElement& elem); } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp index 7a78fde7475..00f63636995 100644 --- a/src/mongo/db/pipeline/document_source_merge_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_test.cpp @@ -130,6 +130,20 @@ TEST_F(DocumentSourceMergeTest, CorrectlyParsesIfIntoIsObject) { ASSERT_EQ(mergeStage->getOutputNs().coll(), targetColl); } +TEST_F(DocumentSourceMergeTest, CorrectlyParsesIfWhenMatchedIsStringOrArray) { + auto spec = BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" + << "merge")); + ASSERT(createMergeStage(spec)); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" + << BSONArray())); + ASSERT(createMergeStage(spec)); +} + TEST_F(DocumentSourceMergeTest, FailsToParseIncorrectMergeSpecType) { auto spec = BSON("$merge" << 1); ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51182); @@ -276,24 +290,24 @@ TEST_F(DocumentSourceMergeTest, FailsToParseIfDbIsNotAValidDatabaseName) { ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::InvalidNamespace); } -TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenMatchedModeIsNotString) { +TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenMatchedModeIsNotStringOrArray) { auto spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" << true)); - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); + ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191); spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" - << BSONArray())); - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); + << 100)); + ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191); spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" << BSON("" << kDefaultWhenMatchedMode))); - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch); + ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191); } TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenNotMatchedModeIsNotString) { @@ -623,6 +637,22 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" + << BSON_ARRAY(BSON("$project" << BSON("x" << 1))) + << "whenNotMatched" + << "insert")); + ASSERT(createMergeStage(spec)); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" + << "pipeline" + << "whenNotMatched" + << "insert")); + ASSERT_THROWS_CODE(createMergeStage(spec), DBException, ErrorCodes::BadValue); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" << "replaceWithNew" << "whenNotMatched" << "fail")); diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index f533ea3aaed..6dd3fda1d5c 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -321,6 +321,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::create( expCtx, MergeWhenMatchedModeEnum::kFail, MergeWhenNotMatchedModeEnum::kInsert, + boost::none, /* no custom pipeline */ std::move(uniqueKey), targetCollectionVersion, true /* serialize as $out stage */); @@ -329,6 +330,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::create( expCtx, MergeWhenMatchedModeEnum::kReplaceWithNew, MergeWhenNotMatchedModeEnum::kInsert, + boost::none /* no custom pipeline */, std::move(uniqueKey), targetCollectionVersion, true /* serialize as $out stage */); diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index d2e947f5a32..011b7671776 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -42,6 +42,7 @@ #include "mongo/db/generic_cursor.h" #include "mongo/db/matcher/expression.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" @@ -134,7 +135,7 @@ public: virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, const WriteConcernOptions& wc, bool upsert, bool multi, diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 0d29304b902..472805ae939 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -111,7 +111,7 @@ public: void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, const WriteConcernOptions& wc, bool upsert, bool multi, diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index fe8de93ea71..5540bc7e57e 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -133,7 +133,7 @@ void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionCont void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, const WriteConcernOptions& wc, bool upsert, bool multi, diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index f87333eeda2..b3c0a5885f7 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -77,7 +77,7 @@ public: void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, const WriteConcernOptions& wc, bool upsert, bool multi, diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index a372c51ad93..fda80b67518 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -193,7 +193,7 @@ Insert MongoInterfaceStandalone::buildInsertOp(const NamespaceString& nss, Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, bool upsert, bool multi, bool bypassDocValidation) { @@ -245,7 +245,7 @@ void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionConte void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, const WriteConcernOptions& wc, bool upsert, bool multi, @@ -257,7 +257,6 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte upsert, multi, expCtx->bypassDocumentValidation)); - // Need to check each result in the batch since the writes are unordered. uassertStatusOKWithContext( [&writeResults]() { diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 25e9d04b701..0febf6073bf 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -65,7 +65,7 @@ public: void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, const WriteConcernOptions& wc, bool upsert, bool multi, @@ -155,7 +155,7 @@ protected: */ Update buildUpdateOp(const NamespaceString& nss, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, bool upsert, bool multi, bool bypassDocValidation); diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index dde7a2395f5..931e3f5f17f 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -74,7 +74,7 @@ public: void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, + std::vector<write_ops::UpdateModification>&& updates, const WriteConcernOptions& wc, bool upsert, bool multi, |