summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnton Korshunov <anton.korshunov@mongodb.com>2019-05-08 18:01:39 +0100
committerAnton Korshunov <anton.korshunov@mongodb.com>2019-05-14 23:19:33 +0100
commit089dd83af48cf198916e2dca50742378d4c3d361 (patch)
tree9b5db698d2624c85c7c19b018d77722470311185 /src
parent09db7023065f42ccc39dd3309536726814379c86 (diff)
downloadmongo-089dd83af48cf198916e2dca50742378d4c3d361.tar.gz
SERVER-40438 Add merge support for whenNotMatched: fail
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp69
-rw-r--r--src/mongo/db/pipeline/document_source_merge_modes.idl1
-rw-r--r--src/mongo/db/pipeline/document_source_merge_test.cpp42
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h16
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h11
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp31
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h8
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h11
9 files changed, 174 insertions, 16 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 837d4090b37..14be694a0bc 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -311,6 +311,7 @@ error_code("ShardKeyTooBig", 13334);
error_code("StaleConfig", 13388, extra="StaleConfigInfo");
error_code("DatabaseDifferCase", 13297);
error_code("OBSOLETE_PrepareConfigsFailed", 13104);
+error_code("MergeStageNoMatchingDocument", 13113);
# Group related errors into classes, can be checked against ErrorCodes::isXXXClassName methods.
error_class("NetworkError", ["HostUnreachable", "HostNotFound", "NetworkTimeout", "SocketException"])
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index e02989eddf6..c6bb2e3500e 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -64,14 +64,18 @@ constexpr auto kDefaultWhenMatched = WhenMatched::kMerge;
constexpr auto kDefaultWhenNotMatched = WhenNotMatched::kInsert;
constexpr auto kReplaceWithNewInsertMode =
MergeMode{WhenMatched::kReplaceWithNew, WhenNotMatched::kInsert};
+constexpr auto kReplaceWithNewFailMode =
+ MergeMode{WhenMatched::kReplaceWithNew, WhenNotMatched::kFail};
constexpr auto kMergeInsertMode = MergeMode{WhenMatched::kMerge, WhenNotMatched::kInsert};
+constexpr auto kMergeFailMode = MergeMode{WhenMatched::kMerge, WhenNotMatched::kFail};
constexpr auto kKeepExistingInsertMode =
MergeMode{WhenMatched::kKeepExisting, WhenNotMatched::kInsert};
constexpr auto kFailInsertMode = MergeMode{WhenMatched::kFail, WhenNotMatched::kInsert};
constexpr auto kPipelineInsertMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kInsert};
+constexpr auto kPipelineFailMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kFail};
/**
- * Creates a merge strategy which uses update semantics to do perform a merge operation. If
+ * Creates a merge strategy which uses update semantics to perform a merge operation. If
* 'BatchTransform' function is provided, it will be called to transform batched objects before
* passing them to the 'update'.
*/
@@ -81,6 +85,7 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) {
if (transform) {
transform(batch);
}
+
constexpr auto multi = false;
expCtx->mongoProcessInterface->update(expCtx,
ns,
@@ -94,6 +99,47 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) {
}
/**
+ * Creates a merge strategy which uses update semantics to perform a merge operation and ensures
+ * that each document in the batch has a matching document in the 'ns' collection (note that a
+ * matching document may not be modified as a result of an update operation, yet it still will be
+ * counted as matching). If at least one document doesn't have a match, this strategy returns an
+ * error. If 'BatchTransform' function is provided, it will be called to transform batched objects
+ * before passing them to the 'update'.
+ */
+MergeStrategy makeStrictUpdateStrategy(bool upsert, BatchTransform transform) {
+ return [upsert, transform](
+ const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
+ if (transform) {
+ transform(batch);
+ }
+
+ const auto batchSize = batch.size();
+ constexpr auto multi = false;
+ auto writeResult =
+ expCtx->mongoProcessInterface->updateWithResult(expCtx,
+ ns,
+ std::move(batch.uniqueKeys),
+ std::move(batch.modifications),
+ wc,
+ upsert,
+ multi,
+ epoch);
+ constexpr auto initValue = 0ULL;
+ auto nMatched =
+ std::accumulate(writeResult.results.begin(),
+ writeResult.results.end(),
+ initValue,
+ [](auto total, const auto& opRes) {
+ return total + (opRes.isOK() ? opRes.getValue().getN() : 0);
+ });
+ uassert(ErrorCodes::MergeStageNoMatchingDocument,
+ "{} could not find a matching document in the target collection "
+ "for at least one document in the source collection"_format(kStageName),
+ nMatched == batchSize);
+ };
+}
+
+/**
* Creates a merge strategy which uses insert semantics to perform a merge operation.
*/
MergeStrategy makeInsertStrategy() {
@@ -110,8 +156,8 @@ MergeStrategy makeInsertStrategy() {
}
/**
- * Creates a batched objects transformation function which wraps each element of the 'batch.objects'
- * array into the given 'updateOp' operator.
+ * Creates a batched objects transformation function which wraps each element of the
+ * 'batch.modifications' array into the given 'updateOp' operator.
*/
BatchTransform makeUpdateTransform(const std::string& updateOp) {
return [updateOp](auto& batch) {
@@ -139,22 +185,38 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
// initialized until the first use, which is when the program already started and all global
// variables had been initialized.
static const auto mergeStrategyDescriptors = MergeStrategyDescriptorsMap{
+ // whenMatched: replaceWithNew, whenNotMatched: insert
{kReplaceWithNewInsertMode,
{kReplaceWithNewInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(true, {})}},
+ // whenMatched: replaceWithNew, whenNotMatched: fail
+ {kReplaceWithNewFailMode,
+ {kReplaceWithNewFailMode, {ActionType::update}, makeStrictUpdateStrategy(false, {})}},
+ // whenMatched: merge, whenNotMatched: insert
{kMergeInsertMode,
{kMergeInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(true, makeUpdateTransform("$set"))}},
+ // whenMatched: merge, whenNotMatched: fail
+ {kMergeFailMode,
+ {kMergeFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(false, makeUpdateTransform("$set"))}},
+ // whenMatched: keepExisting, whenNotMatched: insert
{kKeepExistingInsertMode,
{kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(true, makeUpdateTransform("$setOnInsert"))}},
+ // whenMatched: [pipeline], whenNotMatched: insert
{kPipelineInsertMode,
{kPipelineInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(true, {})}},
+ // whenMatched: [pipeline], whenNotMatched: fail
+ {kPipelineFailMode,
+ {kPipelineFailMode, {ActionType::update}, makeStrictUpdateStrategy(false, {})}},
+ // whenMatched: fail, whenNotMatched: insert
{kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}};
return mergeStrategyDescriptors;
}
@@ -347,6 +409,7 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed
auto whenMatched =
mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched;
auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched);
+
uassert(51181,
"Combination of {} modes 'whenMatched: {}' and 'whenNotMatched: {}' "
"is not supported"_format(kStageName,
diff --git a/src/mongo/db/pipeline/document_source_merge_modes.idl b/src/mongo/db/pipeline/document_source_merge_modes.idl
index 300e6314622..5edfe42417a 100644
--- a/src/mongo/db/pipeline/document_source_merge_modes.idl
+++ b/src/mongo/db/pipeline/document_source_merge_modes.idl
@@ -55,4 +55,5 @@ enums:
description: "Possible merge mode values for $merge's 'whenNotMatched'. field"
type: string
values:
+ kFail: "fail"
kInsert: "insert"
diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp
index 00f63636995..7ed20c762f2 100644
--- a/src/mongo/db/pipeline/document_source_merge_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_test.cpp
@@ -605,6 +605,14 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
+ << "replaceWithNew"
+ << "whenNotMatched"
+ << "fail"));
+ ASSERT(createMergeStage(spec));
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
<< "fail"
<< "whenNotMatched"
<< "insert"));
@@ -613,6 +621,14 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
+ << "fail"
+ << "whenNotMatched"
+ << "fail"));
+ ASSERT_THROWS_CODE(createMergeStage(spec), DBException, 51189);
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
<< "merge"
<< "whenNotMatched"
<< "insert"));
@@ -621,6 +637,14 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
+ << "merge"
+ << "whenNotMatched"
+ << "fail"));
+ ASSERT(createMergeStage(spec));
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
<< "keepExisting"
<< "whenNotMatched"
<< "insert"));
@@ -629,10 +653,10 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
- << "[{$addFields: {x: 1}}]"
+ << "keepExisting"
<< "whenNotMatched"
- << "insert"));
- ASSERT_THROWS_CODE(createMergeStage(spec), DBException, ErrorCodes::BadValue);
+ << "fail"));
+ ASSERT_THROWS_CODE(createMergeStage(spec), DBException, 51189);
spec = BSON("$merge" << BSON("into"
<< "target_collection"
@@ -645,6 +669,14 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
+ << BSON_ARRAY(BSON("$project" << BSON("x" << 1)))
+ << "whenNotMatched"
+ << "fail"));
+ ASSERT(createMergeStage(spec));
+
+ spec = BSON("$merge" << BSON("into"
+ << "target_collection"
+ << "whenMatched"
<< "pipeline"
<< "whenNotMatched"
<< "insert"));
@@ -653,9 +685,9 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode
spec = BSON("$merge" << BSON("into"
<< "target_collection"
<< "whenMatched"
- << "replaceWithNew"
+ << "[{$addFields: {x: 1}}]"
<< "whenNotMatched"
- << "fail"));
+ << "insert"));
ASSERT_THROWS_CODE(createMergeStage(spec), DBException, ErrorCodes::BadValue);
}
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 011b7671776..f36adb1f662 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -42,6 +42,7 @@
#include "mongo/db/generic_cursor.h"
#include "mongo/db/matcher/expression.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/field_path.h"
@@ -141,6 +142,21 @@ public:
bool multi,
boost::optional<OID> targetEpoch) = 0;
+ /**
+ * Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException
+ * if any of the updates fail. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the
+ * targeted collection does not have the same epoch, or if the epoch changes during the update.
+ * Returns a 'WriteResult' object with information about the write operation.
+ */
+ virtual WriteResult updateWithResult(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<write_ops::UpdateModification>&& updates,
+ const WriteConcernOptions& wc,
+ bool upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) = 0;
+
virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) = 0;
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 472805ae939..0f6feca3356 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -119,6 +119,17 @@ public:
MONGO_UNREACHABLE;
}
+ WriteResult updateWithResult(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<write_ops::UpdateModification>&& updates,
+ const WriteConcernOptions& wc,
+ bool upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) final override {
+ MONGO_UNREACHABLE;
+ }
+
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) final {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index fda80b67518..ca5fa8e537d 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -242,14 +242,15 @@ void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionConte
"Insert failed: ");
}
-void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& queries,
- std::vector<write_ops::UpdateModification>&& updates,
- const WriteConcernOptions& wc,
- bool upsert,
- bool multi,
- boost::optional<OID> targetEpoch) {
+WriteResult MongoInterfaceStandalone::updateWithResult(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<write_ops::UpdateModification>&& updates,
+ const WriteConcernOptions& wc,
+ bool upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) {
auto writeResults = performUpdates(expCtx->opCtx,
buildUpdateOp(ns,
std::move(queries),
@@ -268,6 +269,20 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte
return Status::OK();
}(),
"Update failed: ");
+
+ return writeResults;
+}
+
+void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<write_ops::UpdateModification>&& updates,
+ const WriteConcernOptions& wc,
+ bool upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) {
+ [[maybe_unused]] auto writeResult = updateWithResult(
+ expCtx, ns, std::move(queries), std::move(updates), wc, upsert, multi, targetEpoch);
}
CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext* opCtx,
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 0febf6073bf..ac32ad69d19 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -70,6 +70,14 @@ public:
bool upsert,
bool multi,
boost::optional<OID> targetEpoch) override;
+ WriteResult updateWithResult(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<write_ops::UpdateModification>&& updates,
+ const WriteConcernOptions& wc,
+ bool upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) override;
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
void appendLatencyStats(OperationContext* opCtx,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 931e3f5f17f..f09aab9c217 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -82,6 +82,17 @@ public:
MONGO_UNREACHABLE;
}
+ WriteResult updateWithResult(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<write_ops::UpdateModification>&& updates,
+ const WriteConcernOptions& wc,
+ bool upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) final override {
+ MONGO_UNREACHABLE;
+ }
+
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) override {
MONGO_UNREACHABLE;