diff options
author | Drew Paroski <drew.paroski@mongodb.com> | 2022-02-11 05:28:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-24 03:51:38 +0000 |
commit | 84c827702391857ef340b4e0315980f5b71d42b6 (patch) | |
tree | 8c3389e823bd23f3c6d4714c57b233c081ac3127 | |
parent | c32f52434e7ad84370296fddda09dceeee95505f (diff) | |
download | mongo-84c827702391857ef340b4e0315980f5b71d42b6.tar.gz |
SERVER-63159 Implement the $_internalApplyOplogUpdate aggregation stage
(cherry picked from commit 3579b34f55aa53213455cdff348738f8595d28c7)
(cherry picked from commit 65c4a53719c5f0dc19a1d1749d733f439155f957)
11 files changed, 613 insertions, 6 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 6b8ac1ac359..d1200365bde 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -211,6 +211,10 @@ all: test_file: jstests/sharding/chunk_migration_with_schema_validation.js - ticket: SERVER-61894 test_file: jstests/change_streams/show_raw_update_description_v1_oplog.js + - ticket: SERVER-63159 + test_file: jstests/change_streams/show_raw_update_description_v1_oplog.js + - ticket: SERVER-63159 + test_file: jstests/core/internal_apply_oplog_update.js suites: diff --git a/jstests/change_streams/show_raw_update_description_v1_oplog.js b/jstests/change_streams/show_raw_update_description_v1_oplog.js index cc4595883fd..882c529a574 100644 --- a/jstests/change_streams/show_raw_update_description_v1_oplog.js +++ b/jstests/change_streams/show_raw_update_description_v1_oplog.js @@ -7,11 +7,13 @@ (function() { "use strict"; +load("jstests/aggregation/extras/utils.js"); // For arrayEq. load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. // Drop and recreate the collections to be used in this set of tests. assertDropAndRecreateCollection(db, "t1"); +assertDropAndRecreateCollection(db, "t1Copy"); assert.commandWorked(db.t1.insert([ {_id: 3, a: 5, b: 1}, @@ -100,6 +102,28 @@ cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected}) // 'rawUpdateDescription'. // +function assertCollectionsAreIdentical(coll1, coll2) { + const values1 = coll1.find().toArray(); + const values2 = coll2.find().toArray(); + assert(arrayEq(values1, values2), + () => "actual: " + tojson(values1) + " expected: " + tojson(values2)); +} + +function assertCanApplyRawUpdate(origColl, copyColl, events) { + if (!Array.isArray(events)) { + events = [events]; + } + for (let event of events) { + assert.commandWorked(copyColl.update( + event.documentKey, + [{$_internalApplyOplogUpdate: {oplogUpdate: event.rawUpdateDescription}}])); + } + assertCollectionsAreIdentical(origColl, copyColl); +} + +assert.commandWorked(db.t1Copy.insert(db.t1.find().toArray())); +assertCollectionsAreIdentical(db.t1, db.t1Copy); + // // Test op-style updates. // @@ -109,9 +133,10 @@ expected = { documentKey: {_id: 3}, ns: {db: "test", coll: "t1"}, operationType: "update", - rawUpdateDescription: {"$v": 1, "$set": {b: 3}} + rawUpdateDescription: {"$v": NumberInt(1), "$set": {b: 3}} }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); +assertCanApplyRawUpdate(db.t1, db.t1Copy, expected); jsTestLog("Testing op-style update with $set and multi:true"); assert.commandWorked(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true})); @@ -120,16 +145,17 @@ expected = [ documentKey: {_id: 4}, ns: {db: "test", coll: "t1"}, operationType: "update", - rawUpdateDescription: {"$v": 1, "$set": {b: 2}} + rawUpdateDescription: {"$v": NumberInt(1), "$set": {b: 2}} }, { documentKey: {_id: 5}, ns: {db: "test", coll: "t1"}, operationType: "update", - rawUpdateDescription: {"$v": 1, "$set": {b: 2}} + rawUpdateDescription: {"$v": NumberInt(1), "$set": {b: 2}} } ]; cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected}); +assertCanApplyRawUpdate(db.t1, db.t1Copy, expected); jsTestLog("Testing op-style update with $unset"); assert.commandWorked(db.t1.update({_id: 3}, {$unset: {b: ""}})); @@ -137,9 +163,10 @@ expected = { documentKey: {_id: 3}, ns: {db: "test", coll: "t1"}, operationType: "update", - rawUpdateDescription: {"$v": 1, "$unset": {b: true}} + rawUpdateDescription: {"$v": NumberInt(1), "$unset": {b: true}} }; -cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected}); +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); +assertCanApplyRawUpdate(db.t1, db.t1Copy, expected); jsTestLog("Testing op-style update with $set on nested field"); assert.commandWorked(db.t1.update({_id: 8}, {$set: {"b.d": 2}})); @@ -147,9 +174,10 @@ expected = { documentKey: {_id: 8}, ns: {db: "test", coll: "t1"}, operationType: "update", - rawUpdateDescription: {"$v": 1, "$set": {"b.d": 2}} + rawUpdateDescription: {"$v": NumberInt(1), "$set": {"b.d": 2}} }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); +assertCanApplyRawUpdate(db.t1, db.t1Copy, expected); cst.cleanUp(); }()); diff --git a/jstests/core/internal_apply_oplog_update.js b/jstests/core/internal_apply_oplog_update.js new file mode 100644 index 00000000000..d9f0508f85a --- /dev/null +++ b/jstests/core/internal_apply_oplog_update.js @@ -0,0 +1,168 @@ +/** + * Tests pipeline-style updates that use the $_internalApplyOplogUpdate aggregate stage. + * @tags: [ + * requires_find_command, + * requires_multi_updates, + * requires_non_retryable_writes, + * requires_fcv_44, + * ] + */ +(function() { +"use strict"; + +load("jstests/aggregation/extras/utils.js"); // For arrayEq. +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. + +// Drop and recreate the collections to be used in this set of tests. +assertDropAndRecreateCollection(db, "t1"); +assertDropAndRecreateCollection(db, "t2"); + +let documents1 = [ + {_id: 2, a: 4}, + {_id: 3, a: 5, b: 1}, + {_id: 4, a: 0, b: 1}, + {_id: 5, a: 0, b: 1}, + {_id: 8, a: 2, b: {c: 1}} +]; + +assert.commandWorked(db.t1.insert(documents1)); + +const kGiantStr = '*'.repeat(1024); +const kMediumStr = '*'.repeat(128); +const kSmallStr = '*'.repeat(32); + +let documents2 = [{ + _id: 100, + "a": 1, + "b": 2, + "arrayForSubdiff": [kGiantStr, {a: kMediumStr}, 1, 2, 3], + "arrayForReplacement": [0, 1, 2, 3], + "giantStr": kGiantStr +}]; + +assert.commandWorked(db.t2.insert(documents2)); + +// +// Test $_internalApplyOplogUpdate with v1 oplog update descriptions. For each update description, +// we execute $_internalApplyOplogUpdate twice to verify idempotency. +// + +function testUpdate(expected, coll, filter, oplogUpdate, opts = {}) { + for (let i = 0; i < 2; ++i) { + assert.commandWorked( + coll.update(filter, [{$_internalApplyOplogUpdate: {oplogUpdate: oplogUpdate}}], opts)); + + let actual = coll.find().toArray(); + assert(arrayEq(expected, actual), + () => "i: " + i + " actual: " + tojson(actual) + " expected: " + tojson(expected)); + } +} + +let oplogUpdate = {"$v": NumberInt(1), "$set": {b: 3}}; +documents1[1].b = 3; +testUpdate(documents1, db.t1, {_id: 3}, oplogUpdate); + +oplogUpdate = { + "$v": NumberInt(1), + "$set": {b: 2} +}; +documents1[2].b = 2; +documents1[3].b = 2; +testUpdate(documents1, db.t1, {a: 0}, oplogUpdate, {multi: true}); + +oplogUpdate = { + "$unset": {b: true} +}; +delete documents1[1].b; +testUpdate(documents1, db.t1, {_id: 3}, oplogUpdate); + +oplogUpdate = { + "$v": NumberInt(1), + "$set": {"b.d": 2} +}; +documents1[4].b.d = 2; +testUpdate(documents1, db.t1, {_id: 8}, oplogUpdate); + +// Test an update with upsert=true where no documents match the filter prior to the update. +oplogUpdate = { + "$v": NumberInt(1), + "$set": {b: 3} +}; +documents1.push({_id: 9, b: 3}); +testUpdate(documents1, db.t1, {_id: 9}, oplogUpdate, {upsert: true}); + +oplogUpdate = { + "$v": NumberInt(1), + "$set": + {a: 2, arrayForReplacement: [0], c: 3, arrayForSubdiff: [kGiantStr, {a: kMediumStr, b: 3}]}, +}; +documents2[0].a = 2; +documents2[0].arrayForSubdiff = [kGiantStr, {a: kMediumStr, b: 3}]; +documents2[0].arrayForReplacement = [0]; +documents2[0].c = 3; +testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate); + +oplogUpdate = { + "$v": NumberInt(1), + "$unset": {a: 1} +}; +delete documents2[0].a; +testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate); + +oplogUpdate = { + "$v": NumberInt(1), + "$unset": {c: 1, arrayForReplacement: 1, arrayForSubdiff: 1, b: 1} +}; +documents2[0] = { + _id: 100, + "giantStr": kGiantStr +}; +testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate); + +oplogUpdate = { + "$v": NumberInt(1), + "$set": { + arr: [{x: 1, y: kSmallStr}, kMediumStr], + arr_a: [1, kMediumStr], + arr_b: [[1, kSmallStr], kMediumStr], + arr_c: [[kSmallStr, 1, 2, 3], kMediumStr], + obj: {x: {a: 1, b: 1, c: [kMediumStr, 1, 2, 3], str: kMediumStr}}, + a: "updated", + doc: {a: {0: "foo"}} + } +}; +documents2[0] = { + _id: 100, + giantStr: kGiantStr, + arr: [{x: 1, y: kSmallStr}, kMediumStr], + arr_a: [1, kMediumStr], + arr_b: [[1, kSmallStr], kMediumStr], + arr_c: [[kSmallStr, 1, 2, 3], kMediumStr], + obj: {x: {a: 1, b: 1, c: [kMediumStr, 1, 2, 3], str: kMediumStr}}, + a: "updated", + doc: {a: {0: "foo"}} +}; +testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate); + +oplogUpdate = { + "$v": NumberInt(1), + "$set": { + "arr_a.0": 2, + "arr_b.0.0": 2, + "arr_c.0": [kSmallStr], + "obj.x.b": 2, + "obj.x.c": [kMediumStr] + }, + "$unset": {a: 1, doc: 1, "arr.0.x": 1, "obj.x.a": 1} +}; +documents2[0] = { + _id: 100, + giantStr: kGiantStr, + arr: [{y: kSmallStr}, kMediumStr], + arr_a: [2, kMediumStr], + arr_b: [[2, kSmallStr], kMediumStr], + arr_c: [[kSmallStr], kMediumStr], + obj: {x: {b: 2, c: [kMediumStr], str: kMediumStr}}, +}; +testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate); +}()); diff --git a/jstests/libs/parallelTester.js b/jstests/libs/parallelTester.js index 0f056e5c765..6bdaf792b58 100644 --- a/jstests/libs/parallelTester.js +++ b/jstests/libs/parallelTester.js @@ -226,6 +226,7 @@ if (typeof _threadInject != "undefined") { if (db.getMongo().readMode() === "legacy") { var requires_find_command = [ "apply_ops_system_dot_views.js", + "internal_apply_oplog_update.js", "merge_sort_collation.js", "update_pipeline_shell_helpers.js", "update_with_pipeline.js", diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 1d4d971be89..8c7bb191865 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -163,6 +163,7 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_id_helpers', '$BUILD_DIR/mongo/db/logical_session_id', + '$BUILD_DIR/mongo/db/pipeline/document_source_internal_apply_oplog_update', '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/repl/isself', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp index 935139adfda..51b5e1b2cf7 100644 --- a/src/mongo/db/ops/write_ops_parsers.cpp +++ b/src/mongo/db/ops/write_ops_parsers.cpp @@ -34,6 +34,7 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/db/update/log_builder.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 7b19ed2b917..dc5c7fcf624 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -318,9 +318,24 @@ env.Library( ) env.Library( + target="document_source_internal_apply_oplog_update", + source=[ + 'document_source_internal_apply_oplog_update.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/ops/write_ops_parsers', + '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', + '$BUILD_DIR/mongo/db/update/update_driver', + '$BUILD_DIR/mongo/s/query/router_exec_stage', + ], +) + +env.Library( target='document_sources_idl', source=[ 'document_source_change_stream.idl', + 'document_source_internal_apply_oplog_update.idl', 'document_source_list_sessions.idl', 'document_source_merge.idl', 'document_source_merge_modes.idl', @@ -362,6 +377,7 @@ env.CppUnitTest( 'document_source_geo_near_test.cpp', 'document_source_graph_lookup_test.cpp', 'document_source_group_test.cpp', + 'document_source_internal_apply_oplog_update_test.cpp', 'document_source_internal_shard_filter_test.cpp', 'document_source_internal_split_pipeline_test.cpp', 'document_source_limit_test.cpp', @@ -435,6 +451,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/util/clock_source_mock', 'accumulator', 'aggregation_request', + 'document_source_internal_apply_oplog_update', 'document_source_mock', 'document_sources_idl', 'expression', diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp new file mode 100644 index 00000000000..5c8ca206b47 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_internal_apply_oplog_update.h" + +#include <boost/optional.hpp> +#include <boost/smart_ptr/intrusive_ptr.hpp> + +#include "mongo/db/exec/add_fields_projection_executor.h" +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/db/pipeline/document_source_internal_apply_oplog_update_gen.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/update/update_driver.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(_internalApplyOplogUpdate, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceInternalApplyOplogUpdate::createFromBson); + +boost::intrusive_ptr<DocumentSource> DocumentSourceInternalApplyOplogUpdate::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { + uassert(6315901, + str::stream() << "Argument to " << kStageName + << " stage must be an object, but found type: " << typeName(elem.type()), + elem.type() == BSONType::Object); + + auto spec = InternalApplyOplogUpdateSpec::parse(IDLParserErrorContext(kStageName), + elem.embeddedObject()); + + return new DocumentSourceInternalApplyOplogUpdate(pExpCtx, spec.getOplogUpdate()); +} + +DocumentSourceInternalApplyOplogUpdate::DocumentSourceInternalApplyOplogUpdate( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const BSONObj& oplogUpdate) + : DocumentSource(kStageName, pExpCtx), + _oplogUpdate(std::move(oplogUpdate)), + _updateDriver(pExpCtx) { + // Parse the raw oplog update description. We're treating this as a "classic" update, + // which can either be a full replacement or a modifier-style update. + const auto updateMod = write_ops::UpdateModification(_oplogUpdate); + + // UpdateDriver only expects to apply a diff in the context of oplog application. + _updateDriver.setFromOplogApplication(true); + _updateDriver.parse(updateMod, {}); +} + +DocumentSource::GetNextResult DocumentSourceInternalApplyOplogUpdate::doGetNext() { + auto next = pSource->getNext(); + if (!next.isAdvanced()) { + return next; + } + + // Use _updateDriver to apply the update to the document. + mutablebson::Document doc(next.getDocument().toBson()); + uassertStatusOK(_updateDriver.update( + StringData(), &doc, false /* validateForStorage */, FieldRefSet(), false /* isInsert */)); + + return Document(doc.getObject()); +} + +Value DocumentSourceInternalApplyOplogUpdate::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + return Value(Document{{kStageName, Document{{kOplogUpdateFieldName, _oplogUpdate}}}}); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h new file mode 100644 index 00000000000..a86c76b1fb7 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_single_document_transformation.h" +#include "mongo/db/update/update_driver.h" + +namespace mongo { + +/** + * This is an internal stage that takes an oplog update description and applies the update to the + * input Document. + */ +class DocumentSourceInternalApplyOplogUpdate final : public DocumentSource { +public: + static constexpr StringData kStageName = "$_internalApplyOplogUpdate"_sd; + static constexpr StringData kOplogUpdateFieldName = "oplogUpdate"_sd; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + DocumentSourceInternalApplyOplogUpdate(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const BSONObj& oplogUpdate); + + const char* getSourceName() const override { + return kStageName.rawData(); + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const override { + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kBlacklist); + constraints.canSwapWithMatch = false; + constraints.canSwapWithLimitAndSample = true; + constraints.isAllowedWithinUpdatePipeline = true; + constraints.isIndependentOfAnyCollection = false; + return constraints; + } + + DocumentSource::GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kAllPaths, {}, {}}; + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + return boost::none; + } + +private: + Value serialize( + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; + + GetNextResult doGetNext() override; + + BSONObj _oplogUpdate; + UpdateDriver _updateDriver; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl new file mode 100644 index 00000000000..f6b81d4b6fb --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl @@ -0,0 +1,42 @@ +# Copyright (C) 2018-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + InternalApplyOplogUpdateSpec: + description: $internalApplyOplogUpdate aggregation stage + strict: true + fields: + oplogUpdate: + description: An update encoded in oplog update format. + type: object diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp new file mode 100644 index 00000000000..e7afe63c66b --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/unordered_fields_bsonobj_comparator.h" +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_internal_apply_oplog_update.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using DocumentSourceInternalApplyOplogUpdateTest = AggregationContextFixture; + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldBeAbleToReParseSerializedStage) { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: {oplogUpdate: {"$v": NumberInt(1), "$set": {b: 3}}}})"); + + auto stage = + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()); + + std::vector<Value> serialization; + stage->serializeToArray(serialization); + auto serializedBSON = serialization[0].getDocument().toBson(); + ASSERT_VALUE_EQ(serialization[0], Value(Document(spec))); + + auto roundTripped = DocumentSourceInternalApplyOplogUpdate::createFromBson( + serializedBSON.firstElement(), getExpCtx()); + + std::vector<Value> newSerialization; + roundTripped->serializeToArray(newSerialization); + + ASSERT_EQ(newSerialization.size(), 1UL); + ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); +} + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldRejectNonObjectSpecs) { + { + auto spec = fromjson("{$_internalApplyOplogUpdate: 1}"); + + ASSERT_THROWS_CODE(DocumentSourceInternalApplyOplogUpdate::createFromBson( + spec.firstElement(), getExpCtx()), + DBException, + 6315901); + } + + { + auto spec = fromjson("{$_internalApplyOplogUpdate: []}"); + + ASSERT_THROWS_CODE(DocumentSourceInternalApplyOplogUpdate::createFromBson( + spec.firstElement(), getExpCtx()), + DBException, + 6315901); + } +} + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldRejectMalformedSpecs) { + auto spec = fromjson("{$_internalApplyOplogUpdate: {}}"); + ASSERT_THROWS_CODE( + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()), + DBException, + 40414); + + spec = fromjson(R"({$_internalApplyOplogUpdate: {foo: {"$v": NumberInt(1), "$set": {b: 3}}}})"); + ASSERT_THROWS_CODE( + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()), + DBException, + 40415); + + spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(1), "$set": {b: 3}}, + foo: 1 + }})"); + ASSERT_THROWS_CODE( + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()), + DBException, + 40415); +} + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, UpdateMultipleDocuments) { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(1), "$set": {"b.c": 3}} + }})"); + + auto stage = + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()); + auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}, + Document{{"a", 1}, {"b", 1}}, + Document{{"a", 2}, {"b", Document{{"c", 1}}}}, + Document{{"a", 3}, {"b", Document{{"d", 2}}}}}); + stage->setSource(mock.get()); + + auto next = stage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + Document expected = Document{{"a", 0}, {"b", Document{{"c", 3}}}}; + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + next = stage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + expected = Document{{"a", 1}, {"b", 1}}; + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + next = stage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + expected = Document{{"a", 2}, {"b", Document{{"c", 3}}}}; + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + next = stage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + expected = Document{{"a", 3}, {"b", Document{{"d", 2}, {"c", 3}}}}; + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + ASSERT_TRUE(stage->getNext().isEOF()); + ASSERT_TRUE(stage->getNext().isEOF()); + ASSERT_TRUE(stage->getNext().isEOF()); +} + +} // namespace +} // namespace mongo |