summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorDrew Paroski <drew.paroski@mongodb.com>2022-02-11 05:28:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-10 18:23:32 +0000
commit3579b34f55aa53213455cdff348738f8595d28c7 (patch)
tree0348f420d056322759c3081d8bd0c9e9b0bb98ae /src/mongo
parent6074b6898dfdbfdaa27c26e3923bc02109d5d1c1 (diff)
downloadmongo-3579b34f55aa53213455cdff348738f8595d28c7.tar.gz
SERVER-63159 Implement the $_internalApplyOplogUpdate aggregation stage
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/pipeline/SConscript18
-rw-r--r--src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp103
-rw-r--r--src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h92
-rw-r--r--src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl42
-rw-r--r--src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp244
6 files changed, 500 insertions, 0 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index f3a470b5ae1..aca30381f2b 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -163,6 +163,7 @@ env.Library(
'$BUILD_DIR/mongo/db/multitenancy',
'$BUILD_DIR/mongo/db/multitenancy_params',
'$BUILD_DIR/mongo/db/pipeline/change_stream_pipeline',
+ '$BUILD_DIR/mongo/db/pipeline/document_source_internal_apply_oplog_update',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/repl/isself',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 65a8edeac29..657125c2c64 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -395,6 +395,21 @@ pipelineEnv.Library(
)
env.Library(
+ target="document_source_internal_apply_oplog_update",
+ source=[
+ 'document_source_internal_apply_oplog_update.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/multitenancy',
+ '$BUILD_DIR/mongo/db/ops/write_ops_parsers',
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
+ '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers',
+ '$BUILD_DIR/mongo/db/update/update_driver',
+ '$BUILD_DIR/mongo/s/query/router_exec_stage',
+ ],
+)
+
+env.Library(
target="change_stream_pipeline",
source=[
'change_stream_document_diff_parser.cpp',
@@ -442,6 +457,7 @@ env.Library(
'document_source_coll_stats.idl',
'document_source_densify.idl',
'document_source_fill.idl',
+ 'document_source_internal_apply_oplog_update.idl',
'document_source_list_sessions.idl',
'document_source_merge.idl',
'document_source_merge_modes.idl',
@@ -526,6 +542,7 @@ env.CppUnitTest(
'document_source_geo_near_test.cpp',
'document_source_graph_lookup_test.cpp',
'document_source_group_test.cpp',
+ 'document_source_internal_apply_oplog_update_test.cpp',
'document_source_internal_shard_filter_test.cpp',
'document_source_internal_split_pipeline_test.cpp',
'document_source_limit_test.cpp',
@@ -641,6 +658,7 @@ env.CppUnitTest(
'accumulator',
'aggregation_request_helper',
'change_stream_pipeline',
+ 'document_source_internal_apply_oplog_update',
'document_source_mock',
'document_sources_idl',
'expression_context',
diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp
new file mode 100644
index 00000000000..51b0dee8ac1
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_internal_apply_oplog_update.h"
+
+#include <boost/optional.hpp>
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
+#include "mongo/db/exec/add_fields_projection_executor.h"
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/ops/write_ops_parsers.h"
+#include "mongo/db/pipeline/document_source_internal_apply_oplog_update_gen.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/update/update_driver.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(_internalApplyOplogUpdate,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceInternalApplyOplogUpdate::createFromBson,
+ AllowedWithApiStrict::kNeverInVersion1);
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceInternalApplyOplogUpdate::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
+ uassert(6315901,
+ str::stream() << "Argument to " << kStageName
+ << " stage must be an object, but found type: " << typeName(elem.type()),
+ elem.type() == BSONType::Object);
+
+ auto spec = InternalApplyOplogUpdateSpec::parse(IDLParserErrorContext(kStageName),
+ elem.embeddedObject());
+
+ return new DocumentSourceInternalApplyOplogUpdate(pExpCtx, spec.getOplogUpdate());
+}
+
+DocumentSourceInternalApplyOplogUpdate::DocumentSourceInternalApplyOplogUpdate(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const BSONObj& oplogUpdate)
+ : DocumentSource(kStageName, pExpCtx),
+ _oplogUpdate(std::move(oplogUpdate)),
+ _updateDriver(pExpCtx) {
+ // Parse the raw oplog update description.
+ const auto updateMod = write_ops::UpdateModification::parseFromOplogEntry(
+ _oplogUpdate, {true /* mustCheckExistenceForInsertOperations */});
+
+ // UpdateDriver only expects to apply a diff in the context of oplog application.
+ _updateDriver.setFromOplogApplication(true);
+ _updateDriver.parse(updateMod, {});
+}
+
+DocumentSource::GetNextResult DocumentSourceInternalApplyOplogUpdate::doGetNext() {
+ auto next = pSource->getNext();
+ if (!next.isAdvanced()) {
+ return next;
+ }
+
+ // Use _updateDriver to apply the update to the document.
+ mutablebson::Document doc(next.getDocument().toBson());
+ uassertStatusOK(_updateDriver.update(pExpCtx->opCtx,
+ StringData(),
+ &doc,
+ false /* validateForStorage */,
+ FieldRefSet(),
+ false /* isInsert */));
+
+ return Document(doc.getObject());
+}
+
+Value DocumentSourceInternalApplyOplogUpdate::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(Document{{kStageName, Document{{kOplogUpdateFieldName, _oplogUpdate}}}});
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h
new file mode 100644
index 00000000000..b4801181bd7
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/update/update_driver.h"
+
+namespace mongo {
+
+/**
+ * This is an internal stage that takes an oplog update description and applies the update to the
+ * input Document.
+ */
+class DocumentSourceInternalApplyOplogUpdate final : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$_internalApplyOplogUpdate"_sd;
+ static constexpr StringData kOplogUpdateFieldName = "oplogUpdate"_sd;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ DocumentSourceInternalApplyOplogUpdate(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const BSONObj& oplogUpdate);
+
+ const char* getSourceName() const override {
+ return kStageName.rawData();
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed,
+ LookupRequirement::kNotAllowed,
+ UnionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kDenylist);
+ constraints.canSwapWithMatch = false;
+ constraints.canSwapWithSkippingOrLimitingStage = true;
+ constraints.isAllowedWithinUpdatePipeline = true;
+ constraints.isIndependentOfAnyCollection = false;
+ return constraints;
+ }
+
+ DocumentSource::GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kAllPaths, {}, {}};
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ return boost::none;
+ }
+
+private:
+ Value serialize(
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
+
+ GetNextResult doGetNext() override;
+
+ BSONObj _oplogUpdate;
+ UpdateDriver _updateDriver;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl
new file mode 100644
index 00000000000..f6b81d4b6fb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl
@@ -0,0 +1,42 @@
+# Copyright (C) 2018-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ InternalApplyOplogUpdateSpec:
+ description: $internalApplyOplogUpdate aggregation stage
+ strict: true
+ fields:
+ oplogUpdate:
+ description: An update encoded in oplog update format.
+ type: object
diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp
new file mode 100644
index 00000000000..fc95d45e6fa
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp
@@ -0,0 +1,244 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_internal_apply_oplog_update.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using DocumentSourceInternalApplyOplogUpdateTest = AggregationContextFixture;
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldBeAbleToReParseSerializedStage) {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {oplogUpdate: {"$v": NumberInt(2), diff: {u: {b: 3}}}}})");
+
+ auto stage =
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx());
+
+ std::vector<Value> serialization;
+ stage->serializeToArray(serialization);
+ auto serializedBSON = serialization[0].getDocument().toBson();
+ ASSERT_VALUE_EQ(serialization[0], Value(Document(spec)));
+
+ auto roundTripped = DocumentSourceInternalApplyOplogUpdate::createFromBson(
+ serializedBSON.firstElement(), getExpCtx());
+
+ std::vector<Value> newSerialization;
+ roundTripped->serializeToArray(newSerialization);
+
+ ASSERT_EQ(newSerialization.size(), 1UL);
+ ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
+}
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldRejectNonObjectSpecs) {
+ {
+ auto spec = fromjson("{$_internalApplyOplogUpdate: 1}");
+
+ ASSERT_THROWS_CODE(DocumentSourceInternalApplyOplogUpdate::createFromBson(
+ spec.firstElement(), getExpCtx()),
+ DBException,
+ 6315901);
+ }
+
+ {
+ auto spec = fromjson("{$_internalApplyOplogUpdate: []}");
+
+ ASSERT_THROWS_CODE(DocumentSourceInternalApplyOplogUpdate::createFromBson(
+ spec.firstElement(), getExpCtx()),
+ DBException,
+ 6315901);
+ }
+}
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldRejectMalformedSpecs) {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(999999999), diff: {u: {b: 3}}}
+ }})");
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()),
+ DBException,
+ 4772600);
+
+ spec = fromjson(R"({$_internalApplyOplogUpdate: {oplogUpdate: {"$v": NumberInt(2)}}})");
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()),
+ DBException,
+ 4772601);
+
+ spec = fromjson("{$_internalApplyOplogUpdate: {}}");
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()),
+ DBException,
+ 40414);
+
+ spec =
+ fromjson(R"({$_internalApplyOplogUpdate: {foo: {"$v": NumberInt(2), diff: {u: {b: 3}}}}})");
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()),
+ DBException,
+ 40415);
+
+ spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(2), diff: {u: {b: 3}}},
+ foo: 1
+ }})");
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()),
+ DBException,
+ 40415);
+}
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, UpdateMultipleDocuments) {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(2), diff: {sb: {u: {c: 3}}}}
+ }})");
+
+ auto stage =
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx());
+ auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}},
+ Document{{"a", 1}, {"b", 1}},
+ Document{{"a", 2}, {"b", Document{{"c", 1}}}},
+ Document{{"a", 3}, {"b", Document{{"d", 2}}}}},
+ getExpCtx());
+ stage->setSource(mock.get());
+
+ auto next = stage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ Document expected = Document{{"a", 0}};
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected);
+
+ next = stage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ expected = Document{{"a", 1}, {"b", BSONNULL}};
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected);
+
+ next = stage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ expected = Document{{"a", 2}, {"b", Document{{"c", 3}}}};
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected);
+
+ next = stage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ expected = Document{{"a", 3}, {"b", Document{{"d", 2}, {"c", 3}}}};
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected);
+
+ ASSERT_TRUE(stage->getNext().isEOF());
+ ASSERT_TRUE(stage->getNext().isEOF());
+ ASSERT_TRUE(stage->getNext().isEOF());
+}
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldErrorOnInvalidDiffs) {
+ {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(2), diff: {}}
+ }})");
+ auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(),
+ getExpCtx());
+ auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx());
+ stage->setSource(mock.get());
+ ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770500);
+ }
+
+ {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(2), diff: {q: {z: -7}}}
+ }})");
+ auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(),
+ getExpCtx());
+ auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx());
+ stage->setSource(mock.get());
+ ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770503);
+ }
+
+ {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(2), diff: {"": {z: -7}}}
+ }})");
+ auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(),
+ getExpCtx());
+ auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx());
+ stage->setSource(mock.get());
+ ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770505);
+ }
+
+ {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(2), diff: {sa: []}}
+ }})");
+ auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(),
+ getExpCtx());
+ auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx());
+ stage->setSource(mock.get());
+ ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770507);
+ }
+
+ {
+ auto spec = BSON("$_internalApplyOplogUpdate"
+ << BSON("oplogUpdate" << BSON("$v" << 2 << "diff"
+ << BSON("u" << BSON("b\0c"_sd << 3)))));
+ auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(),
+ getExpCtx());
+ auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx());
+ stage->setSource(mock.get());
+ ASSERT_THROWS_CODE(stage->getNext(), DBException, 4728000);
+ }
+
+ {
+ auto spec = BSON(
+ "$_internalApplyOplogUpdate" << BSON(
+ "oplogUpdate" << BSON("$v" << 2 << "diff" << BSON("sb\0c"_sd << BSON("d" << 5)))));
+ auto stage = DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(),
+ getExpCtx());
+ auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}}}, getExpCtx());
+ stage->setSource(mock.get());
+ ASSERT_THROWS_CODE(stage->getNext(), DBException, 4770505);
+ }
+}
+
+} // namespace
+} // namespace mongo