summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnton Korshunov <anton.korshunov@mongodb.com>2019-05-06 20:37:22 +0100
committerAnton Korshunov <anton.korshunov@mongodb.com>2019-05-13 21:40:17 +0100
commitdde091c07989ffaefc57705859abf6517beeeace (patch)
treec5639b56f03fa24f27aeb9e3422617907be6dc77 /src
parente4b13ae68a4eef9393357038f09f14bfd8102050 (diff)
downloadmongo-dde091c07989ffaefc57705859abf6517beeeace.tar.gz
SERVER-40431 Add merge support for whenMatched: pipeline
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp3
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp48
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h21
-rw-r--r--src/mongo/db/pipeline/document_source_merge.idl28
-rw-r--r--src/mongo/db/pipeline/document_source_merge_modes.idl58
-rw-r--r--src/mongo/db/pipeline/document_source_merge_spec.cpp55
-rw-r--r--src/mongo/db/pipeline/document_source_merge_spec.h36
-rw-r--r--src/mongo/db/pipeline/document_source_merge_test.cpp40
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp2
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h3
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h2
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp5
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h2
18 files changed, 246 insertions, 68 deletions
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index c15aafce0d0..f6ff83cdad7 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -237,6 +237,9 @@ write_ops::UpdateModification::UpdateModification(const BSONObj& update) {
_type = Type::kClassic;
}
+write_ops::UpdateModification::UpdateModification(std::vector<BSONObj> pipeline)
+ : _type{Type::kPipeline}, _pipeline{std::move(pipeline)} {}
+
write_ops::UpdateModification write_ops::UpdateModification::parseFromBSON(BSONElement elem) {
return UpdateModification(elem);
}
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index f116b79b63e..2058ca4a055 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -63,7 +63,7 @@ public:
UpdateModification() = default;
UpdateModification(BSONElement update);
-
+ UpdateModification(std::vector<BSONObj> pipeline);
// This constructor exists only to provide a fast-path for constructing classic-style updates.
UpdateModification(const BSONObj& update);
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 6a0ccb087bb..393f9742a33 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -599,6 +599,7 @@ env.Library(
env.Idlc('document_source_change_stream.idl')[0],
env.Idlc('document_source_list_sessions.idl')[0],
env.Idlc('document_source_merge.idl')[0],
+ env.Idlc('document_source_merge_modes.idl')[0],
env.Idlc('document_source_out.idl')[0],
env.Idlc('exchange_spec.idl')[0],
env.Idlc('runtime_constants.idl')[0],
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 3177affe8b1..e02989eddf6 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -68,6 +68,7 @@ constexpr auto kMergeInsertMode = MergeMode{WhenMatched::kMerge, WhenNotMatched:
constexpr auto kKeepExistingInsertMode =
MergeMode{WhenMatched::kKeepExisting, WhenNotMatched::kInsert};
constexpr auto kFailInsertMode = MergeMode{WhenMatched::kFail, WhenNotMatched::kInsert};
+constexpr auto kPipelineInsertMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kInsert};
/**
* Creates a merge strategy which uses update semantics to do perform a merge operation. If
@@ -84,7 +85,7 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) {
expCtx->mongoProcessInterface->update(expCtx,
ns,
std::move(batch.uniqueKeys),
- std::move(batch.objects),
+ std::move(batch.modifications),
wc,
upsert,
multi,
@@ -97,7 +98,14 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) {
*/
MergeStrategy makeInsertStrategy() {
return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
- expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(batch.objects), wc, epoch);
+ std::vector<BSONObj> objectsToInsert(batch.size());
+ // The batch stores replacement style updates, but for this "insert" style of $merge we'd
+ // like to just insert the new document without attempting any sort of replacement.
+ std::transform(batch.modifications.begin(),
+ batch.modifications.end(),
+ objectsToInsert.begin(),
+ [](const auto& mod) { return mod.getUpdateClassic(); });
+ expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(objectsToInsert), wc, epoch);
};
}
@@ -107,10 +115,11 @@ MergeStrategy makeInsertStrategy() {
*/
BatchTransform makeUpdateTransform(const std::string& updateOp) {
return [updateOp](auto& batch) {
- std::transform(batch.objects.begin(),
- batch.objects.end(),
- batch.objects.begin(),
- [updateOp](const auto& obj) { return BSON(updateOp << obj); });
+ std::transform(
+ batch.modifications.begin(),
+ batch.modifications.end(),
+ batch.modifications.begin(),
+ [updateOp](const auto& mod) { return BSON(updateOp << mod.getUpdateClassic()); });
};
}
@@ -142,6 +151,10 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
{kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(true, makeUpdateTransform("$setOnInsert"))}},
+ {kPipelineInsertMode,
+ {kPipelineInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(true, {})}},
{kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}};
return mergeStrategyDescriptors;
}
@@ -331,7 +344,8 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed
auto mergeSpec =
parseMergeSpecAndResolveTargetNamespace(spec, request.getNamespaceString().db());
auto targetNss = mergeSpec.getTargetNss();
- auto whenMatched = mergeSpec.getWhenMatched().value_or(kDefaultWhenMatched);
+ auto whenMatched =
+ mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched;
auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched);
uassert(51181,
"Combination of {} modes 'whenMatched: {}' and 'whenNotMatched: {}' "
@@ -354,6 +368,7 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed
DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MergeStrategyDescriptor& descriptor,
+ boost::optional<std::vector<BSONObj>>&& pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion,
bool serializeAsOutStage)
@@ -363,6 +378,7 @@ DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
_targetCollectionVersion(targetCollectionVersion),
_done(false),
_descriptor(descriptor),
+ _pipeline(std::move(pipeline)),
_mergeOnFields(std::move(mergeOnFields)),
_mergeOnFieldsIncludesId(_mergeOnFields.count("_id") == 1),
_serializeAsOutStage(serializeAsOutStage) {}
@@ -372,6 +388,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WhenMatched whenMatched,
WhenNotMatched whenNotMatched,
+ boost::optional<std::vector<BSONObj>>&& pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion,
bool serializeAsOutStage) {
@@ -404,6 +421,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
return new DocumentSourceMerge(outputNs,
expCtx,
getDescriptors().at({whenMatched, whenNotMatched}),
+ std::move(pipeline),
mergeOnFields,
targetCollectionVersion,
serializeAsOutStage);
@@ -417,8 +435,10 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson(
auto mergeSpec = parseMergeSpecAndResolveTargetNamespace(spec, expCtx->ns.db());
auto targetNss = mergeSpec.getTargetNss();
- auto whenMatched = mergeSpec.getWhenMatched().value_or(kDefaultWhenMatched);
+ auto whenMatched =
+ mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched;
auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched);
+ auto pipeline = mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->pipeline : boost::none;
// TODO SERVER-40432: move resolveMergeOnFieldsOnMongo* into MongoProcessInterface.
auto[mergeOnFields, targetCollectionVersion] = expCtx->inMongos
? resolveMergeOnFieldsOnMongoS(expCtx, mergeSpec, targetNss)
@@ -428,6 +448,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson(
expCtx,
whenMatched,
whenNotMatched,
+ std::move(pipeline),
std::move(mergeOnFields),
targetCollectionVersion,
false /* serialize as $out stage */);
@@ -477,16 +498,17 @@ DocumentSource::GetNextResult DocumentSourceMerge::getNext() {
// Extract the 'on' fields before converting the document to BSON.
auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields);
- auto insertObj = doc.toBson();
+ auto mod = _pipeline ? write_ops::UpdateModification(*_pipeline)
+ : write_ops::UpdateModification(doc.toBson());
- bufferedBytes += insertObj.objsize();
+ bufferedBytes += mod.objsize();
if (!batch.empty() &&
(bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) {
spill(std::move(batch));
batch.clear();
- bufferedBytes = insertObj.objsize();
+ bufferedBytes = mod.objsize();
}
- batch.emplace(std::move(insertObj), std::move(mergeOnFields));
+ batch.emplace(std::move(mod), std::move(mergeOnFields));
}
if (!batch.empty()) {
spill(std::move(batch));
@@ -537,7 +559,7 @@ Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity>
} else {
DocumentSourceMergeSpec spec;
spec.setTargetNss(_outputNs);
- spec.setWhenMatched(_descriptor.mode.first);
+ spec.setWhenMatched(MergeWhenMatchedPolicy{_descriptor.mode.first, _pipeline});
spec.setWhenNotMatched(_descriptor.mode.second);
spec.setOn([&]() {
std::vector<std::string> mergeOnFields;
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index 9360cd978d6..037d2f54962 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -50,25 +50,29 @@ public:
* portion of the update or insert.
*/
struct BatchedObjects {
- void emplace(BSONObj&& obj, BSONObj&& key) {
- objects.emplace_back(std::move(obj));
+ void emplace(write_ops::UpdateModification&& mod, BSONObj&& key) {
+ modifications.emplace_back(std::move(mod));
uniqueKeys.emplace_back(std::move(key));
}
bool empty() const {
- return objects.empty();
+ return modifications.empty();
}
size_t size() const {
- return objects.size();
+ return modifications.size();
}
void clear() {
- objects.clear();
+ modifications.clear();
uniqueKeys.clear();
}
- std::vector<BSONObj> objects;
+ // For each element in the batch we store an UpdateModification which is either the new
+ // document we want to upsert or insert into the collection (i.e. a 'classic' replacement
+ // update), or the pipeline to run to compute the new document.
+ std::vector<write_ops::UpdateModification> modifications;
+
// Store the unique keys as BSON objects instead of Documents for compatibility with the
// batch update command. (e.g. {q: <array of uniqueKeys>, u: <array of objects>})
std::vector<BSONObj> uniqueKeys;
@@ -121,6 +125,7 @@ public:
DocumentSourceMerge(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MergeStrategyDescriptor& descriptor,
+ boost::optional<std::vector<BSONObj>>&& pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion,
bool serializeAsOutStage);
@@ -199,6 +204,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
MergeStrategyDescriptor::WhenMatched whenMatched,
MergeStrategyDescriptor::WhenNotMatched whenNotMatched,
+ boost::optional<std::vector<BSONObj>>&& pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion,
bool serializeAsOutStage);
@@ -250,6 +256,9 @@ private:
// descriptor.
const MergeStrategyDescriptor& _descriptor;
+ // A custom pipeline to compute a new version of merging documents.
+ boost::optional<std::vector<BSONObj>> _pipeline;
+
// Holds the fields used for uniquely identifying documents. There must exist a unique index
// with this key pattern. Default is "_id" for unsharded collections, and "_id" plus the shard
// key for sharded collections.
diff --git a/src/mongo/db/pipeline/document_source_merge.idl b/src/mongo/db/pipeline/document_source_merge.idl
index b2c70a19ea0..fabda43b017 100644
--- a/src/mongo/db/pipeline/document_source_merge.idl
+++ b/src/mongo/db/pipeline/document_source_merge.idl
@@ -35,25 +35,10 @@ global:
- "mongo/db/pipeline/document_source_merge_spec.h"
imports:
+ - "mongo/db/pipeline/document_source_merge_modes.idl"
- "mongo/idl/basic_types.idl"
- "mongo/s/chunk_version.idl"
-enums:
- MergeWhenMatchedMode:
- description: "Possible merge mode values for 'whenMatched'."
- type: string
- values:
- kFail: "fail"
- kMerge: "merge"
- kKeepExisting: "keepExisting"
- kReplaceWithNew: "replaceWithNew"
-
- MergeWhenNotMatchedMode:
- description: "Possible merge mode values for 'whenNotMatched'."
- type: string
- values:
- kInsert: "insert"
-
types:
MergeTargetNss:
bson_serialization_type: any
@@ -70,6 +55,15 @@ types:
serializer: "::mongo::mergeOnFieldsSerializeToBSON"
deserializer: "::mongo::mergeOnFieldsParseFromBSON"
+ MergeWhenMatchedPolicy:
+ bson_serialization_type: any
+ description: Defines a policy strategy describing what to do when there is a matching
+ document in the target collection. Can hold a value from the
+ MergeWhenMatchedMode enum, or a custom pipeline definition.
+ cpp_type: "::mongo::MergeWhenMatchedPolicy"
+ serializer: "::mongo::mergeWhenMatchedSerializeToBSON"
+ deserializer: "::mongo::mergeWhenMatchedParseFromBSON"
+
structs:
NamespaceSpec:
description: A document used to specify a namespace.
@@ -99,7 +93,7 @@ structs:
description: A single field or array of fields that uniquely identify a document.
whenMatched:
- type: MergeWhenMatchedMode
+ type: MergeWhenMatchedPolicy
optional: true
description: The merge mode for the merge operation when source and target elements
match.
diff --git a/src/mongo/db/pipeline/document_source_merge_modes.idl b/src/mongo/db/pipeline/document_source_merge_modes.idl
new file mode 100644
index 00000000000..300e6314622
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_merge_modes.idl
@@ -0,0 +1,58 @@
+# Copyright (C) 2019-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# Merge modes for the document source merge stage.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+enums:
+ MergeWhenMatchedMode:
+ description: "Possible merge mode values for $merge's 'whenMatched' field."
+ type: string
+ values:
+ kFail: "fail"
+ kKeepExisting: "keepExisting"
+ kMerge: "merge"
+ # Technically, we don't need this item, as the 'pipeline' value, as a string, is not
+ # supported by the 'whenMatched' field. Instead, a pipeline definition, as an array of
+ # objects, should be used. However, to avoid special casing logic in
+ # DocumentSourceMerge, and to keep all merge strategy definitions in a single
+ # descriptors map, which keys are pairs of whenMatched/whenNotMatched values, this
+ # 'kPipeline' element is added for internal use only.
+ kPipeline: "pipeline"
+ kReplaceWithNew: "replaceWithNew"
+
+ MergeWhenNotMatchedMode:
+ description: "Possible merge mode values for $merge's 'whenNotMatched'. field"
+ type: string
+ values:
+ kInsert: "insert"
diff --git a/src/mongo/db/pipeline/document_source_merge_spec.cpp b/src/mongo/db/pipeline/document_source_merge_spec.cpp
index cd3eaa77324..4a08f84cfa1 100644
--- a/src/mongo/db/pipeline/document_source_merge_spec.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_spec.cpp
@@ -34,6 +34,7 @@
#include <fmt/format.h>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document_source_merge.h"
#include "mongo/db/pipeline/document_source_merge_gen.h"
@@ -44,14 +45,14 @@ NamespaceString mergeTargetNssParseFromBSON(const BSONElement& elem) {
uassert(51178,
"{} 'into' field must be either a string or an object, "
"but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())),
- elem.type() == String || elem.type() == Object);
+ elem.type() == BSONType::String || elem.type() == BSONType::Object);
- if (elem.type() == String) {
+ if (elem.type() == BSONType::String) {
return {"", elem.valueStringData()};
- } else {
- auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject());
- return {spec.getDb().value_or(""), spec.getColl().value_or("")};
}
+
+ auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject());
+ return {spec.getDb().value_or(""), spec.getColl().value_or("")};
}
void mergeTargetNssSerializeToBSON(const NamespaceString& targetNss,
@@ -66,18 +67,20 @@ std::vector<std::string> mergeOnFieldsParseFromBSON(const BSONElement& elem) {
uassert(51186,
"{} 'into' field must be either a string or an array of strings, "
"but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())),
- elem.type() == String || elem.type() == Array);
+ elem.type() == BSONType::String || elem.type() == BSONType::Array);
- if (elem.type() == String) {
+ if (elem.type() == BSONType::String) {
fields.push_back(elem.str());
} else {
+ invariant(elem.type() == BSONType::Array);
+
BSONObjIterator iter(elem.Obj());
while (iter.more()) {
const BSONElement matchByElem = iter.next();
uassert(51134,
"{} 'on' array elements must be strings, but found "_format(
DocumentSourceMerge::kStageName, typeName(matchByElem.type())),
- matchByElem.type() == String);
+ matchByElem.type() == BSONType::String);
fields.push_back(matchByElem.str());
}
}
@@ -99,4 +102,40 @@ void mergeOnFieldsSerializeToBSON(const std::vector<std::string>& fields,
bob->append(fieldName, fields);
}
}
+
+MergeWhenMatchedPolicy mergeWhenMatchedParseFromBSON(const BSONElement& elem) {
+ uassert(51191,
+ "{} 'whenMatched' field must be either a string or an array, "
+ "but found {}"_format(DocumentSourceMerge::kStageName, typeName(elem.type())),
+ elem.type() == BSONType::String || elem.type() == BSONType::Array);
+
+ if (elem.type() == BSONType::Array) {
+ return {MergeWhenMatchedModeEnum::kPipeline,
+ uassertStatusOK(AggregationRequest::parsePipelineFromBSON(elem))};
+ }
+
+ invariant(elem.type() == BSONType::String);
+
+ IDLParserErrorContext ctx{DocumentSourceMergeSpec::kWhenMatchedFieldName};
+ auto value = elem.valueStringData();
+ auto mode = MergeWhenMatchedMode_parse(ctx, value);
+
+ // The 'kPipeline' mode cannot be specified explicitly, a custom pipeline definition must be
+ // used instead.
+ if (mode == MergeWhenMatchedModeEnum::kPipeline) {
+ ctx.throwBadEnumValue(value);
+ }
+ return {mode};
+}
+
+void mergeWhenMatchedSerializeToBSON(const MergeWhenMatchedPolicy& policy,
+ StringData fieldName,
+ BSONObjBuilder* bob) {
+ if (policy.mode == MergeWhenMatchedModeEnum::kPipeline) {
+ invariant(policy.pipeline);
+ bob->append(fieldName, *policy.pipeline);
+ } else {
+ bob->append(fieldName, MergeWhenMatchedMode_serializer(policy.mode));
+ }
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge_spec.h b/src/mongo/db/pipeline/document_source_merge_spec.h
index 358a379a39f..35c353bb78a 100644
--- a/src/mongo/db/pipeline/document_source_merge_spec.h
+++ b/src/mongo/db/pipeline/document_source_merge_spec.h
@@ -29,29 +29,49 @@
#pragma once
-#include "mongo/base/string_data.h"
-#include "mongo/db/namespace_string.h"
-
+#include <boost/optional.hpp>
#include <string>
#include <vector>
-namespace mongo {
+#include "mongo/base/string_data.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/document_source_merge_modes_gen.h"
+namespace mongo {
class BSONObjBuilder;
class BSONElement;
-// Serialize and deserialize functions for the $merge stage 'into' field which can be a single
-// string value, or an object
+// Defines a policy strategy describing what to do when there is a matching document in the target
+// collection. Can hold a value from the MergeWhenMatchedModeEnum, or a custom pipeline definition.
+struct MergeWhenMatchedPolicy {
+ MergeWhenMatchedModeEnum mode;
+ boost::optional<std::vector<BSONObj>> pipeline;
+};
+
+/**
+ * Serialize and deserialize functions for the $merge stage 'into' field which can be a single
+ * string value, or an object.
+ */
void mergeTargetNssSerializeToBSON(const NamespaceString& targetNss,
StringData fieldName,
BSONObjBuilder* bob);
NamespaceString mergeTargetNssParseFromBSON(const BSONElement& elem);
-// Serialize and deserialize functions for the $merge stage 'on' field which can be a single string
-// value, or array of strings.
+/**
+ * Serialize and deserialize functions for the $merge stage 'on' field which can be a single string
+ * value, or array of strings.
+ */
void mergeOnFieldsSerializeToBSON(const std::vector<std::string>& fields,
StringData fieldName,
BSONObjBuilder* bob);
std::vector<std::string> mergeOnFieldsParseFromBSON(const BSONElement& elem);
+/**
+ * Serialize and deserialize functions for the $merge stage 'whenMatched' field which can be either
+ * a string value, or an array of objects defining a custom pipeline.
+ */
+void mergeWhenMatchedSerializeToBSON(const MergeWhenMatchedPolicy& policy,
+ StringData fieldName,
+ BSONObjBuilder* bob);
+MergeWhenMatchedPolicy mergeWhenMatchedParseFromBSON(const BSONElement& elem);
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp
index 7a78fde7475..00f63636995 100644
--- a/src/mongo/db/pipeline/document_source_merge_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_test.cpp
@@ -130,6 +130,20 @@ TEST_F(DocumentSourceMergeTest, CorrectlyParsesIfIntoIsObject) {
ASSERT_EQ(mergeStage->getOutputNs().coll(), targetColl);
}
+TEST_F(DocumentSourceMergeTest, CorrectlyParsesIfWhenMatchedIsStringOrArray) {
+ auto spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
+ << "merge"));
+ ASSERT(createMergeStage(spec));
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
+ << BSONArray()));
+ ASSERT(createMergeStage(spec));
+}
+
TEST_F(DocumentSourceMergeTest, FailsToParseIncorrectMergeSpecType) {
auto spec = BSON("$merge" << 1);
ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51182);
@@ -276,24 +290,24 @@ TEST_F(DocumentSourceMergeTest, FailsToParseIfDbIsNotAValidDatabaseName) {
ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::InvalidNamespace);
}
-TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenMatchedModeIsNotString) {
+TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenMatchedModeIsNotStringOrArray) {
auto spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
<< true));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191);
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
- << BSONArray()));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+ << 100));
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191);
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
<< BSON("" << kDefaultWhenMatchedMode)));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::TypeMismatch);
+ ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51191);
}
TEST_F(DocumentSourceMergeTest, FailsToParseIfWhenNotMatchedModeIsNotString) {
@@ -623,6 +637,22 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
+ << BSON_ARRAY(BSON("$project" << BSON("x" << 1)))
+ << "whenNotMatched"
+ << "insert"));
+ ASSERT(createMergeStage(spec));
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
+ << "pipeline"
+ << "whenNotMatched"
+ << "insert"));
+ ASSERT_THROWS_CODE(createMergeStage(spec), DBException, ErrorCodes::BadValue);
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
<< "replaceWithNew"
<< "whenNotMatched"
<< "fail"));
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index f533ea3aaed..6dd3fda1d5c 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -321,6 +321,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::create(
expCtx,
MergeWhenMatchedModeEnum::kFail,
MergeWhenNotMatchedModeEnum::kInsert,
+ boost::none, /* no custom pipeline */
std::move(uniqueKey),
targetCollectionVersion,
true /* serialize as $out stage */);
@@ -329,6 +330,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::create(
expCtx,
MergeWhenMatchedModeEnum::kReplaceWithNew,
MergeWhenNotMatchedModeEnum::kInsert,
+ boost::none /* no custom pipeline */,
std::move(uniqueKey),
targetCollectionVersion,
true /* serialize as $out stage */);
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index d2e947f5a32..011b7671776 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -42,6 +42,7 @@
#include "mongo/db/generic_cursor.h"
#include "mongo/db/matcher/expression.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
@@ -134,7 +135,7 @@ public:
virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 0d29304b902..472805ae939 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -111,7 +111,7 @@ public:
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index fe8de93ea71..5540bc7e57e 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -133,7 +133,7 @@ void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionCont
void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index f87333eeda2..b3c0a5885f7 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -77,7 +77,7 @@ public:
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index a372c51ad93..fda80b67518 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -193,7 +193,7 @@ Insert MongoInterfaceStandalone::buildInsertOp(const NamespaceString& nss,
Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
bool upsert,
bool multi,
bool bypassDocValidation) {
@@ -245,7 +245,7 @@ void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionConte
void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
@@ -257,7 +257,6 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte
upsert,
multi,
expCtx->bypassDocumentValidation));
-
// Need to check each result in the batch since the writes are unordered.
uassertStatusOKWithContext(
[&writeResults]() {
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 25e9d04b701..0febf6073bf 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -65,7 +65,7 @@ public:
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,
@@ -155,7 +155,7 @@ protected:
*/
Update buildUpdateOp(const NamespaceString& nss,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
bool upsert,
bool multi,
bool bypassDocValidation);
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index dde7a2395f5..931e3f5f17f 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -74,7 +74,7 @@ public:
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
+ std::vector<write_ops::UpdateModification>&& updates,
const WriteConcernOptions& wc,
bool upsert,
bool multi,