diff options
author | Drew Paroski <drew.paroski@mongodb.com> | 2022-02-11 05:28:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-21 21:02:50 +0000 |
commit | 65c4a53719c5f0dc19a1d1749d733f439155f957 (patch) | |
tree | bf51806ca1e1245e988afd66eb582a241efed4e9 /src | |
parent | b156a5b704741985498482166e90c18ca9b03b95 (diff) | |
download | mongo-65c4a53719c5f0dc19a1d1749d733f439155f957.tar.gz |
SERVER-63159 Implement the $_internalApplyOplogUpdate aggregation stage
(cherry picked from commit 3579b34f55aa53213455cdff348738f8595d28c7)
Diffstat (limited to 'src')
6 files changed, 499 insertions, 0 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index fecc5ceda1c..f693630bbe7 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -158,6 +158,7 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/logical_session_id_helpers', '$BUILD_DIR/mongo/db/pipeline/change_stream_pipeline', + '$BUILD_DIR/mongo/db/pipeline/document_source_internal_apply_oplog_update', '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/repl/isself', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index f3242ad6b0a..440cbfc80f9 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -340,6 +340,20 @@ pipelineEnv.Library( ) env.Library( + target="document_source_internal_apply_oplog_update", + source=[ + 'document_source_internal_apply_oplog_update.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/ops/write_ops_parsers', + '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', + '$BUILD_DIR/mongo/db/update/update_driver', + '$BUILD_DIR/mongo/s/query/router_exec_stage', + ], +) + +env.Library( target="change_stream_pipeline", source=[ 'change_stream_document_diff_parser.cpp', @@ -379,6 +393,7 @@ env.Library( source=[ 'document_source_change_stream.idl', 'document_source_coll_stats.idl', + 'document_source_internal_apply_oplog_update.idl', 'document_source_list_sessions.idl', 'document_source_merge.idl', 'document_source_merge_modes.idl', @@ -424,6 +439,7 @@ env.CppUnitTest( 'document_source_geo_near_test.cpp', 'document_source_graph_lookup_test.cpp', 'document_source_group_test.cpp', + 'document_source_internal_apply_oplog_update_test.cpp', 'document_source_internal_shard_filter_test.cpp', 'document_source_internal_split_pipeline_test.cpp', 'document_source_limit_test.cpp', @@ -529,6 +545,7 @@ env.CppUnitTest( 'accumulator', 'aggregation_request_helper', 'change_stream_pipeline', + 'document_source_internal_apply_oplog_update', 'document_source_mock', 'document_sources_idl', 'expression_context', diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp new file mode 100644 index 00000000000..51b0dee8ac1 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_internal_apply_oplog_update.h" + +#include <boost/optional.hpp> +#include <boost/smart_ptr/intrusive_ptr.hpp> + +#include "mongo/db/exec/add_fields_projection_executor.h" +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/db/pipeline/document_source_internal_apply_oplog_update_gen.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/update/update_driver.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(_internalApplyOplogUpdate, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceInternalApplyOplogUpdate::createFromBson, + AllowedWithApiStrict::kNeverInVersion1); + +boost::intrusive_ptr<DocumentSource> DocumentSourceInternalApplyOplogUpdate::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { + uassert(6315901, + str::stream() << "Argument to " << kStageName + << " stage must be an object, but found type: " << typeName(elem.type()), + elem.type() == BSONType::Object); + + auto spec = InternalApplyOplogUpdateSpec::parse(IDLParserErrorContext(kStageName), + elem.embeddedObject()); + + return new DocumentSourceInternalApplyOplogUpdate(pExpCtx, spec.getOplogUpdate()); +} + +DocumentSourceInternalApplyOplogUpdate::DocumentSourceInternalApplyOplogUpdate( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const BSONObj& oplogUpdate) + : DocumentSource(kStageName, pExpCtx), + _oplogUpdate(std::move(oplogUpdate)), + _updateDriver(pExpCtx) { + // Parse the raw oplog update description. + const auto updateMod = write_ops::UpdateModification::parseFromOplogEntry( + _oplogUpdate, {true /* mustCheckExistenceForInsertOperations */}); + + // UpdateDriver only expects to apply a diff in the context of oplog application. + _updateDriver.setFromOplogApplication(true); + _updateDriver.parse(updateMod, {}); +} + +DocumentSource::GetNextResult DocumentSourceInternalApplyOplogUpdate::doGetNext() { + auto next = pSource->getNext(); + if (!next.isAdvanced()) { + return next; + } + + // Use _updateDriver to apply the update to the document. + mutablebson::Document doc(next.getDocument().toBson()); + uassertStatusOK(_updateDriver.update(pExpCtx->opCtx, + StringData(), + &doc, + false /* validateForStorage */, + FieldRefSet(), + false /* isInsert */)); + + return Document(doc.getObject()); +} + +Value DocumentSourceInternalApplyOplogUpdate::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + return Value(Document{{kStageName, Document{{kOplogUpdateFieldName, _oplogUpdate}}}}); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h new file mode 100644 index 00000000000..b4801181bd7 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_single_document_transformation.h" +#include "mongo/db/update/update_driver.h" + +namespace mongo { + +/** + * This is an internal stage that takes an oplog update description and applies the update to the + * input Document. + */ +class DocumentSourceInternalApplyOplogUpdate final : public DocumentSource { +public: + static constexpr StringData kStageName = "$_internalApplyOplogUpdate"_sd; + static constexpr StringData kOplogUpdateFieldName = "oplogUpdate"_sd; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + DocumentSourceInternalApplyOplogUpdate(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const BSONObj& oplogUpdate); + + const char* getSourceName() const override { + return kStageName.rawData(); + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const override { + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kDenylist); + constraints.canSwapWithMatch = false; + constraints.canSwapWithSkippingOrLimitingStage = true; + constraints.isAllowedWithinUpdatePipeline = true; + constraints.isIndependentOfAnyCollection = false; + return constraints; + } + + DocumentSource::GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kAllPaths, {}, {}}; + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + return boost::none; + } + +private: + Value serialize( + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; + + GetNextResult doGetNext() override; + + BSONObj _oplogUpdate; + UpdateDriver _updateDriver; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl new file mode 100644 index 00000000000..f6b81d4b6fb --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl @@ -0,0 +1,42 @@ +# Copyright (C) 2018-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + InternalApplyOplogUpdateSpec: + description: $internalApplyOplogUpdate aggregation stage + strict: true + fields: + oplogUpdate: + description: An update encoded in oplog update format. + type: object diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp new file mode 100644 index 00000000000..fc95d45e6fa --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp @@ -0,0 +1,244 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/unordered_fields_bsonobj_comparator.h" +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_internal_apply_oplog_update.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using DocumentSourceInternalApplyOplogUpdateTest = AggregationContextFixture; + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldBeAbleToReParseSerializedStage) { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: {oplogUpdate: {"$v": NumberInt(2), diff: {u: {b: 3}}}}})"); + + auto stage = + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()); + + std::vector<Value> serialization; + stage->serializeToArray(serialization); + auto serializedBSON = serialization[0].getDocument().toBson(); + ASSERT_VALUE_EQ(serialization[0], Value(Document(spec))); + + auto roundTripped = DocumentSourceInternalApplyOplogUpdate::createFromBson( + serializedBSON.firstElement(), getExpCtx()); + + std::vector<Value> newSerialization; + roundTripped->serializeToArray(newSerialization); + + ASSERT_EQ(newSerialization.size(), 1UL); + ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); +} + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldRejectNonObjectSpecs) { + { + auto spec = fromjson("{$_internalApplyOplogUpdate: 1}"); + + ASSERT_THROWS_CODE(DocumentSourceInternalApplyOplogUpdate::createFromBson( + spec.firstElement(), getExpCtx()), + DBException, + 6315901); + } + + { + auto spec = fromjson("{$_internalApplyOplogUpdate: []}"); + + ASSERT_THROWS_CODE(DocumentSourceInternalApplyOplogUpdate::createFromBson( + spec.firstElement(), getExpCtx()), + DBException, + 6315901); + } +} + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldRejectMalformedSpecs) { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(999999999), diff: {u: {b: 3}}} + }})"); + ASSERT_THROWS_CODE( + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()), + DBException, + 4772600); + + spec = fromjson(R"({$_internalApplyOplogUpdate: {oplogUpdate: {"$v": NumberInt(2)}}})"); + ASSERT_THROWS_CODE( + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()), + DBException, + 4772601); + + spec = fromjson("{$_internalApplyOplogUpdate: {}}"); + ASSERT_THROWS_CODE( + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()), + DBException, + 40414); + + spec = + fromjson(R"({$_internalApplyOplogUpdate: {foo: {"$v": NumberInt(2), diff: {u: {b: 3}}}}})"); + ASSERT_THROWS_CODE( + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()), + DBException, + 40415); + + spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(2), diff: {u: {b: 3}}}, + foo: 1 + }})"); + ASSERT_THROWS_CODE( + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()), + DBException, + 40415); +} + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, UpdateMultipleDocuments) { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(2), diff: {sb: {u: {c: 3}}}} + }})"); + + auto stage = + DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()); + auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}, + Document{{"a", 1}, {"b", 1}}, + Document{{"a", 2}, {"b", Document{{"c", 1}}}}, + Document{{"a", 3}, {"b", Document{{"d", 2}}}}}, + getExpCtx()); + stage->setSource(mock.get()); + + auto next = stage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + Document expected = Document{{"a", 0}}; + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + next = stage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + expected = Document{{"a", 1}, {"b", BSONNULL}}; + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + next = stage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + expected = Document{{"a", 2}, {"b", Document{{"c", 3}}}}; + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + next = stage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + expected = Document{{"a", 3}, {"b", Document{{"d", 2}, {"c", 3}}}}; + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + ASSERT_TRUE(stage->getNext().isEOF()); + ASSERT_TRUE(stage->getNext().isEOF()); + ASSERT_TRUE(stage->getNext().isEOF()); +} + +TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldErrorOnInvalidDiffs) { + { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(2), diff: {}} + }})"); + auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), + getExpCtx()); + auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx()); + stage->setSource(mock.get()); + ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770500); + } + + { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(2), diff: {q: {z: -7}}} + }})"); + auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), + getExpCtx()); + auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx()); + stage->setSource(mock.get()); + ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770503); + } + + { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(2), diff: {"": {z: -7}}} + }})"); + auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), + getExpCtx()); + auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx()); + stage->setSource(mock.get()); + ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770505); + } + + { + auto spec = fromjson( + R"({$_internalApplyOplogUpdate: { + oplogUpdate: {"$v": NumberInt(2), diff: {sa: []}} + }})"); + auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), + getExpCtx()); + auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx()); + stage->setSource(mock.get()); + ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770507); + } + + { + auto spec = BSON("$_internalApplyOplogUpdate" + << BSON("oplogUpdate" << BSON("$v" << 2 << "diff" + << BSON("u" << BSON("b\0c"_sd << 3))))); + auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), + getExpCtx()); + auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx()); + stage->setSource(mock.get()); + ASSERT_THROWS_CODE(stage->getNext(), DBException, 4728000); + } + + { + auto spec = BSON( + "$_internalApplyOplogUpdate" << BSON( + "oplogUpdate" << BSON("$v" << 2 << "diff" << BSON("sb\0c"_sd << BSON("d" << 5))))); + auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), + getExpCtx()); + auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx()); + stage->setSource(mock.get()); + ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770505); + } +} + +} // namespace +} // namespace mongo |