diff options
author | Anton Korshunov <anton.korshunov@mongodb.com> | 2019-05-08 18:01:39 +0100 |
---|---|---|
committer | Anton Korshunov <anton.korshunov@mongodb.com> | 2019-05-14 23:19:33 +0100 |
commit | 089dd83af48cf198916e2dca50742378d4c3d361 (patch) | |
tree | 9b5db698d2624c85c7c19b018d77722470311185 /src | |
parent | 09db7023065f42ccc39dd3309536726814379c86 (diff) | |
download | mongo-089dd83af48cf198916e2dca50742378d4c3d361.tar.gz |
SERVER-40438 Add merge support for whenNotMatched: fail
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_merge.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_merge_modes.idl | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_merge_test.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongo_process_interface.h | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongos_process_interface.h | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stub_mongo_process_interface.h | 11 |
9 files changed, 174 insertions, 16 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 837d4090b37..14be694a0bc 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -311,6 +311,7 @@ error_code("ShardKeyTooBig", 13334); error_code("StaleConfig", 13388, extra="StaleConfigInfo"); error_code("DatabaseDifferCase", 13297); error_code("OBSOLETE_PrepareConfigsFailed", 13104); +error_code("MergeStageNoMatchingDocument", 13113); # Group related errors into classes, can be checked against ErrorCodes::isXXXClassName methods. error_class("NetworkError", ["HostUnreachable", "HostNotFound", "NetworkTimeout", "SocketException"]) diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index e02989eddf6..c6bb2e3500e 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -64,14 +64,18 @@ constexpr auto kDefaultWhenMatched = WhenMatched::kMerge; constexpr auto kDefaultWhenNotMatched = WhenNotMatched::kInsert; constexpr auto kReplaceWithNewInsertMode = MergeMode{WhenMatched::kReplaceWithNew, WhenNotMatched::kInsert}; +constexpr auto kReplaceWithNewFailMode = + MergeMode{WhenMatched::kReplaceWithNew, WhenNotMatched::kFail}; constexpr auto kMergeInsertMode = MergeMode{WhenMatched::kMerge, WhenNotMatched::kInsert}; +constexpr auto kMergeFailMode = MergeMode{WhenMatched::kMerge, WhenNotMatched::kFail}; constexpr auto kKeepExistingInsertMode = MergeMode{WhenMatched::kKeepExisting, WhenNotMatched::kInsert}; constexpr auto kFailInsertMode = MergeMode{WhenMatched::kFail, WhenNotMatched::kInsert}; constexpr auto kPipelineInsertMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kInsert}; +constexpr auto kPipelineFailMode = MergeMode{WhenMatched::kPipeline, WhenNotMatched::kFail}; /** - * Creates a merge strategy which uses update semantics to do perform a merge operation. If + * Creates a merge strategy which uses update semantics to perform a merge operation. If * 'BatchTransform' function is provided, it will be called to transform batched objects before * passing them to the 'update'. */ @@ -81,6 +85,7 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) { if (transform) { transform(batch); } + constexpr auto multi = false; expCtx->mongoProcessInterface->update(expCtx, ns, @@ -94,6 +99,47 @@ MergeStrategy makeUpdateStrategy(bool upsert, BatchTransform transform) { } /** + * Creates a merge strategy which uses update semantics to perform a merge operation and ensures + * that each document in the batch has a matching document in the 'ns' collection (note that a + * matching document may not be modified as a result of an update operation, yet it still will be + * counted as matching). If at least one document doesn't have a match, this strategy returns an + * error. If 'BatchTransform' function is provided, it will be called to transform batched objects + * before passing them to the 'update'. + */ +MergeStrategy makeStrictUpdateStrategy(bool upsert, BatchTransform transform) { + return [upsert, transform]( + const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { + if (transform) { + transform(batch); + } + + const auto batchSize = batch.size(); + constexpr auto multi = false; + auto writeResult = + expCtx->mongoProcessInterface->updateWithResult(expCtx, + ns, + std::move(batch.uniqueKeys), + std::move(batch.modifications), + wc, + upsert, + multi, + epoch); + constexpr auto initValue = 0ULL; + auto nMatched = + std::accumulate(writeResult.results.begin(), + writeResult.results.end(), + initValue, + [](auto total, const auto& opRes) { + return total + (opRes.isOK() ? opRes.getValue().getN() : 0); + }); + uassert(ErrorCodes::MergeStageNoMatchingDocument, + "{} could not find a matching document in the target collection " + "for at least one document in the source collection"_format(kStageName), + nMatched == batchSize); + }; +} + +/** * Creates a merge strategy which uses insert semantics to perform a merge operation. */ MergeStrategy makeInsertStrategy() { @@ -110,8 +156,8 @@ MergeStrategy makeInsertStrategy() { } /** - * Creates a batched objects transformation function which wraps each element of the 'batch.objects' - * array into the given 'updateOp' operator. + * Creates a batched objects transformation function which wraps each element of the + * 'batch.modifications' array into the given 'updateOp' operator. */ BatchTransform makeUpdateTransform(const std::string& updateOp) { return [updateOp](auto& batch) { @@ -139,22 +185,38 @@ const MergeStrategyDescriptorsMap& getDescriptors() { // initialized until the first use, which is when the program already started and all global // variables had been initialized. static const auto mergeStrategyDescriptors = MergeStrategyDescriptorsMap{ + // whenMatched: replaceWithNew, whenNotMatched: insert {kReplaceWithNewInsertMode, {kReplaceWithNewInsertMode, {ActionType::insert, ActionType::update}, makeUpdateStrategy(true, {})}}, + // whenMatched: replaceWithNew, whenNotMatched: fail + {kReplaceWithNewFailMode, + {kReplaceWithNewFailMode, {ActionType::update}, makeStrictUpdateStrategy(false, {})}}, + // whenMatched: merge, whenNotMatched: insert {kMergeInsertMode, {kMergeInsertMode, {ActionType::insert, ActionType::update}, makeUpdateStrategy(true, makeUpdateTransform("$set"))}}, + // whenMatched: merge, whenNotMatched: fail + {kMergeFailMode, + {kMergeFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(false, makeUpdateTransform("$set"))}}, + // whenMatched: keepExisting, whenNotMatched: insert {kKeepExistingInsertMode, {kKeepExistingInsertMode, {ActionType::insert, ActionType::update}, makeUpdateStrategy(true, makeUpdateTransform("$setOnInsert"))}}, + // whenMatched: [pipeline], whenNotMatched: insert {kPipelineInsertMode, {kPipelineInsertMode, {ActionType::insert, ActionType::update}, makeUpdateStrategy(true, {})}}, + // whenMatched: [pipeline], whenNotMatched: fail + {kPipelineFailMode, + {kPipelineFailMode, {ActionType::update}, makeStrictUpdateStrategy(false, {})}}, + // whenMatched: fail, whenNotMatched: insert {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}}; return mergeStrategyDescriptors; } @@ -347,6 +409,7 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed auto whenMatched = mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched; auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched); + uassert(51181, "Combination of {} modes 'whenMatched: {}' and 'whenNotMatched: {}' " "is not supported"_format(kStageName, diff --git a/src/mongo/db/pipeline/document_source_merge_modes.idl b/src/mongo/db/pipeline/document_source_merge_modes.idl index 300e6314622..5edfe42417a 100644 --- a/src/mongo/db/pipeline/document_source_merge_modes.idl +++ b/src/mongo/db/pipeline/document_source_merge_modes.idl @@ -55,4 +55,5 @@ enums: description: "Possible merge mode values for $merge's 'whenNotMatched'. field" type: string values: + kFail: "fail" kInsert: "insert" diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp index 00f63636995..7ed20c762f2 100644 --- a/src/mongo/db/pipeline/document_source_merge_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_test.cpp @@ -605,6 +605,14 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" + << "replaceWithNew" + << "whenNotMatched" + << "fail")); + ASSERT(createMergeStage(spec)); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" << "fail" << "whenNotMatched" << "insert")); @@ -613,6 +621,14 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" + << "fail" + << "whenNotMatched" + << "fail")); + ASSERT_THROWS_CODE(createMergeStage(spec), DBException, 51189); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" << "merge" << "whenNotMatched" << "insert")); @@ -621,6 +637,14 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" + << "merge" + << "whenNotMatched" + << "fail")); + ASSERT(createMergeStage(spec)); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" << "keepExisting" << "whenNotMatched" << "insert")); @@ -629,10 +653,10 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" - << "[{$addFields: {x: 1}}]" + << "keepExisting" << "whenNotMatched" - << "insert")); - ASSERT_THROWS_CODE(createMergeStage(spec), DBException, ErrorCodes::BadValue); + << "fail")); + ASSERT_THROWS_CODE(createMergeStage(spec), DBException, 51189); spec = BSON("$merge" << BSON("into" << "target_collection" @@ -645,6 +669,14 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" + << BSON_ARRAY(BSON("$project" << BSON("x" << 1))) + << "whenNotMatched" + << "fail")); + ASSERT(createMergeStage(spec)); + + spec = BSON("$merge" << BSON("into" + << "target_collection" + << "whenMatched" << "pipeline" << "whenNotMatched" << "insert")); @@ -653,9 +685,9 @@ TEST_F(DocumentSourceMergeTest, CorrectlyHandlesWhenMatchedAndWhenNotMatchedMode spec = BSON("$merge" << BSON("into" << "target_collection" << "whenMatched" - << "replaceWithNew" + << "[{$addFields: {x: 1}}]" << "whenNotMatched" - << "fail")); + << "insert")); ASSERT_THROWS_CODE(createMergeStage(spec), DBException, ErrorCodes::BadValue); } diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 011b7671776..f36adb1f662 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -42,6 +42,7 @@ #include "mongo/db/generic_cursor.h" #include "mongo/db/matcher/expression.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/field_path.h" @@ -141,6 +142,21 @@ public: bool multi, boost::optional<OID> targetEpoch) = 0; + /** + * Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException + * if any of the updates fail. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the + * targeted collection does not have the same epoch, or if the epoch changes during the update. + * Returns a 'WriteResult' object with information about the write operation. + */ + virtual WriteResult updateWithResult(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<write_ops::UpdateModification>&& updates, + const WriteConcernOptions& wc, + bool upsert, + bool multi, + boost::optional<OID> targetEpoch) = 0; + virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) = 0; diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 472805ae939..0f6feca3356 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -119,6 +119,17 @@ public: MONGO_UNREACHABLE; } + WriteResult updateWithResult(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<write_ops::UpdateModification>&& updates, + const WriteConcernOptions& wc, + bool upsert, + bool multi, + boost::optional<OID> targetEpoch) final override { + MONGO_UNREACHABLE; + } + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final { MONGO_UNREACHABLE; diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index fda80b67518..ca5fa8e537d 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -242,14 +242,15 @@ void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionConte "Insert failed: "); } -void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& queries, - std::vector<write_ops::UpdateModification>&& updates, - const WriteConcernOptions& wc, - bool upsert, - bool multi, - boost::optional<OID> targetEpoch) { +WriteResult MongoInterfaceStandalone::updateWithResult( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<write_ops::UpdateModification>&& updates, + const WriteConcernOptions& wc, + bool upsert, + bool multi, + boost::optional<OID> targetEpoch) { auto writeResults = performUpdates(expCtx->opCtx, buildUpdateOp(ns, std::move(queries), @@ -268,6 +269,20 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte return Status::OK(); }(), "Update failed: "); + + return writeResults; +} + +void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<write_ops::UpdateModification>&& updates, + const WriteConcernOptions& wc, + bool upsert, + bool multi, + boost::optional<OID> targetEpoch) { + [[maybe_unused]] auto writeResult = updateWithResult( + expCtx, ns, std::move(queries), std::move(updates), wc, upsert, multi, targetEpoch); } CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext* opCtx, diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 0febf6073bf..ac32ad69d19 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -70,6 +70,14 @@ public: bool upsert, bool multi, boost::optional<OID> targetEpoch) override; + WriteResult updateWithResult(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<write_ops::UpdateModification>&& updates, + const WriteConcernOptions& wc, + bool upsert, + bool multi, + boost::optional<OID> targetEpoch) override; CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; void appendLatencyStats(OperationContext* opCtx, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 931e3f5f17f..f09aab9c217 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -82,6 +82,17 @@ public: MONGO_UNREACHABLE; } + WriteResult updateWithResult(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<write_ops::UpdateModification>&& updates, + const WriteConcernOptions& wc, + bool upsert, + bool multi, + boost::optional<OID> targetEpoch) final override { + MONGO_UNREACHABLE; + } + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) override { MONGO_UNREACHABLE; |