summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDrew Paroski <drew.paroski@mongodb.com>2022-02-11 05:28:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-24 15:48:32 +0000
commit0f7983d9be770a60ae3cf5c04bffb60d0b61595f (patch)
treec9fa4a5c7a98737aabec4f7660cf219c9edd4d5e
parentcd6b3e618c5d2988d0131ce81baec018eee79767 (diff)
downloadmongo-0f7983d9be770a60ae3cf5c04bffb60d0b61595f.tar.gz
SERVER-63159 Implement the $_internalApplyOplogUpdate aggregation stage
(cherry picked from commit 3579b34f55aa53213455cdff348738f8595d28c7) (cherry picked from commit 65c4a53719c5f0dc19a1d1749d733f439155f957) (cherry picked from commit 84c827702391857ef340b4e0315980f5b71d42b6)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/change_streams/show_raw_update_description_v1_oplog.js40
-rw-r--r--jstests/core/internal_apply_oplog_update.js168
-rw-r--r--jstests/libs/parallelTester.js1
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp1
-rw-r--r--src/mongo/db/pipeline/SConscript17
-rw-r--r--src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp96
-rw-r--r--src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h91
-rw-r--r--src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl42
-rw-r--r--src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp153
11 files changed, 608 insertions, 6 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index b37d69ed182..f02f103a733 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -140,6 +140,10 @@ all:
test_file: jstests/sharding/chunk_migration_with_schema_validation.js
- ticket: SERVER-61894
test_file: jstests/change_streams/show_raw_update_description_v1_oplog.js
+ - ticket: SERVER-63159
+ test_file: jstests/change_streams/show_raw_update_description_v1_oplog.js
+ - ticket: SERVER-63159
+ test_file: jstests/core/internal_apply_oplog_update.js
# Tests that should only be excluded from particular suites should be listed under that suite.
diff --git a/jstests/change_streams/show_raw_update_description_v1_oplog.js b/jstests/change_streams/show_raw_update_description_v1_oplog.js
index cc4595883fd..882c529a574 100644
--- a/jstests/change_streams/show_raw_update_description_v1_oplog.js
+++ b/jstests/change_streams/show_raw_update_description_v1_oplog.js
@@ -7,11 +7,13 @@
(function() {
"use strict";
+load("jstests/aggregation/extras/utils.js"); // For arrayEq.
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
// Drop and recreate the collections to be used in this set of tests.
assertDropAndRecreateCollection(db, "t1");
+assertDropAndRecreateCollection(db, "t1Copy");
assert.commandWorked(db.t1.insert([
{_id: 3, a: 5, b: 1},
@@ -100,6 +102,28 @@ cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected})
// 'rawUpdateDescription'.
//
+function assertCollectionsAreIdentical(coll1, coll2) {
+ const values1 = coll1.find().toArray();
+ const values2 = coll2.find().toArray();
+ assert(arrayEq(values1, values2),
+ () => "actual: " + tojson(values1) + " expected: " + tojson(values2));
+}
+
+function assertCanApplyRawUpdate(origColl, copyColl, events) {
+ if (!Array.isArray(events)) {
+ events = [events];
+ }
+ for (let event of events) {
+ assert.commandWorked(copyColl.update(
+ event.documentKey,
+ [{$_internalApplyOplogUpdate: {oplogUpdate: event.rawUpdateDescription}}]));
+ }
+ assertCollectionsAreIdentical(origColl, copyColl);
+}
+
+assert.commandWorked(db.t1Copy.insert(db.t1.find().toArray()));
+assertCollectionsAreIdentical(db.t1, db.t1Copy);
+
//
// Test op-style updates.
//
@@ -109,9 +133,10 @@ expected = {
documentKey: {_id: 3},
ns: {db: "test", coll: "t1"},
operationType: "update",
- rawUpdateDescription: {"$v": 1, "$set": {b: 3}}
+ rawUpdateDescription: {"$v": NumberInt(1), "$set": {b: 3}}
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+assertCanApplyRawUpdate(db.t1, db.t1Copy, expected);
jsTestLog("Testing op-style update with $set and multi:true");
assert.commandWorked(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true}));
@@ -120,16 +145,17 @@ expected = [
documentKey: {_id: 4},
ns: {db: "test", coll: "t1"},
operationType: "update",
- rawUpdateDescription: {"$v": 1, "$set": {b: 2}}
+ rawUpdateDescription: {"$v": NumberInt(1), "$set": {b: 2}}
},
{
documentKey: {_id: 5},
ns: {db: "test", coll: "t1"},
operationType: "update",
- rawUpdateDescription: {"$v": 1, "$set": {b: 2}}
+ rawUpdateDescription: {"$v": NumberInt(1), "$set": {b: 2}}
}
];
cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected});
+assertCanApplyRawUpdate(db.t1, db.t1Copy, expected);
jsTestLog("Testing op-style update with $unset");
assert.commandWorked(db.t1.update({_id: 3}, {$unset: {b: ""}}));
@@ -137,9 +163,10 @@ expected = {
documentKey: {_id: 3},
ns: {db: "test", coll: "t1"},
operationType: "update",
- rawUpdateDescription: {"$v": 1, "$unset": {b: true}}
+ rawUpdateDescription: {"$v": NumberInt(1), "$unset": {b: true}}
};
-cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected});
+cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+assertCanApplyRawUpdate(db.t1, db.t1Copy, expected);
jsTestLog("Testing op-style update with $set on nested field");
assert.commandWorked(db.t1.update({_id: 8}, {$set: {"b.d": 2}}));
@@ -147,9 +174,10 @@ expected = {
documentKey: {_id: 8},
ns: {db: "test", coll: "t1"},
operationType: "update",
- rawUpdateDescription: {"$v": 1, "$set": {"b.d": 2}}
+ rawUpdateDescription: {"$v": NumberInt(1), "$set": {"b.d": 2}}
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+assertCanApplyRawUpdate(db.t1, db.t1Copy, expected);
cst.cleanUp();
}());
diff --git a/jstests/core/internal_apply_oplog_update.js b/jstests/core/internal_apply_oplog_update.js
new file mode 100644
index 00000000000..2e7627bef2a
--- /dev/null
+++ b/jstests/core/internal_apply_oplog_update.js
@@ -0,0 +1,168 @@
+/**
+ * Tests pipeline-style updates that use the $_internalApplyOplogUpdate aggregate stage.
+ * @tags: [
+ * requires_find_command,
+ * requires_multi_updates,
+ * requires_non_retryable_writes,
+ * requires_fcv_42,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/utils.js"); // For arrayEq.
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+
+// Drop and recreate the collections to be used in this set of tests.
+assertDropAndRecreateCollection(db, "t1");
+assertDropAndRecreateCollection(db, "t2");
+
+let documents1 = [
+ {_id: 2, a: 4},
+ {_id: 3, a: 5, b: 1},
+ {_id: 4, a: 0, b: 1},
+ {_id: 5, a: 0, b: 1},
+ {_id: 8, a: 2, b: {c: 1}}
+];
+
+assert.commandWorked(db.t1.insert(documents1));
+
+const kGiantStr = '*'.repeat(1024);
+const kMediumStr = '*'.repeat(128);
+const kSmallStr = '*'.repeat(32);
+
+let documents2 = [{
+ _id: 100,
+ "a": 1,
+ "b": 2,
+ "arrayForSubdiff": [kGiantStr, {a: kMediumStr}, 1, 2, 3],
+ "arrayForReplacement": [0, 1, 2, 3],
+ "giantStr": kGiantStr
+}];
+
+assert.commandWorked(db.t2.insert(documents2));
+
+//
+// Test $_internalApplyOplogUpdate with v1 oplog update descriptions. For each update description,
+// we execute $_internalApplyOplogUpdate twice to verify idempotency.
+//
+
+function testUpdate(expected, coll, filter, oplogUpdate, opts = {}) {
+ for (let i = 0; i < 2; ++i) {
+ assert.commandWorked(
+ coll.update(filter, [{$_internalApplyOplogUpdate: {oplogUpdate: oplogUpdate}}], opts));
+
+ let actual = coll.find().toArray();
+ assert(arrayEq(expected, actual),
+ () => "i: " + i + " actual: " + tojson(actual) + " expected: " + tojson(expected));
+ }
+}
+
+let oplogUpdate = {"$v": NumberInt(1), "$set": {b: 3}};
+documents1[1].b = 3;
+testUpdate(documents1, db.t1, {_id: 3}, oplogUpdate);
+
+oplogUpdate = {
+ "$v": NumberInt(1),
+ "$set": {b: 2}
+};
+documents1[2].b = 2;
+documents1[3].b = 2;
+testUpdate(documents1, db.t1, {a: 0}, oplogUpdate, {multi: true});
+
+oplogUpdate = {
+ "$unset": {b: true}
+};
+delete documents1[1].b;
+testUpdate(documents1, db.t1, {_id: 3}, oplogUpdate);
+
+oplogUpdate = {
+ "$v": NumberInt(1),
+ "$set": {"b.d": 2}
+};
+documents1[4].b.d = 2;
+testUpdate(documents1, db.t1, {_id: 8}, oplogUpdate);
+
+// Test an update with upsert=true where no documents match the filter prior to the update.
+oplogUpdate = {
+ "$v": NumberInt(1),
+ "$set": {b: 3}
+};
+documents1.push({_id: 9, b: 3});
+testUpdate(documents1, db.t1, {_id: 9}, oplogUpdate, {upsert: true});
+
+oplogUpdate = {
+ "$v": NumberInt(1),
+ "$set":
+ {a: 2, arrayForReplacement: [0], c: 3, arrayForSubdiff: [kGiantStr, {a: kMediumStr, b: 3}]},
+};
+documents2[0].a = 2;
+documents2[0].arrayForSubdiff = [kGiantStr, {a: kMediumStr, b: 3}];
+documents2[0].arrayForReplacement = [0];
+documents2[0].c = 3;
+testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate);
+
+oplogUpdate = {
+ "$v": NumberInt(1),
+ "$unset": {a: 1}
+};
+delete documents2[0].a;
+testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate);
+
+oplogUpdate = {
+ "$v": NumberInt(1),
+ "$unset": {c: 1, arrayForReplacement: 1, arrayForSubdiff: 1, b: 1}
+};
+documents2[0] = {
+ _id: 100,
+ "giantStr": kGiantStr
+};
+testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate);
+
+oplogUpdate = {
+ "$v": NumberInt(1),
+ "$set": {
+ arr: [{x: 1, y: kSmallStr}, kMediumStr],
+ arr_a: [1, kMediumStr],
+ arr_b: [[1, kSmallStr], kMediumStr],
+ arr_c: [[kSmallStr, 1, 2, 3], kMediumStr],
+ obj: {x: {a: 1, b: 1, c: [kMediumStr, 1, 2, 3], str: kMediumStr}},
+ a: "updated",
+ doc: {a: {0: "foo"}}
+ }
+};
+documents2[0] = {
+ _id: 100,
+ giantStr: kGiantStr,
+ arr: [{x: 1, y: kSmallStr}, kMediumStr],
+ arr_a: [1, kMediumStr],
+ arr_b: [[1, kSmallStr], kMediumStr],
+ arr_c: [[kSmallStr, 1, 2, 3], kMediumStr],
+ obj: {x: {a: 1, b: 1, c: [kMediumStr, 1, 2, 3], str: kMediumStr}},
+ a: "updated",
+ doc: {a: {0: "foo"}}
+};
+testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate);
+
+oplogUpdate = {
+ "$v": NumberInt(1),
+ "$set": {
+ "arr_a.0": 2,
+ "arr_b.0.0": 2,
+ "arr_c.0": [kSmallStr],
+ "obj.x.b": 2,
+ "obj.x.c": [kMediumStr]
+ },
+ "$unset": {a: 1, doc: 1, "arr.0.x": 1, "obj.x.a": 1}
+};
+documents2[0] = {
+ _id: 100,
+ giantStr: kGiantStr,
+ arr: [{y: kSmallStr}, kMediumStr],
+ arr_a: [2, kMediumStr],
+ arr_b: [[2, kSmallStr], kMediumStr],
+ arr_c: [[kSmallStr], kMediumStr],
+ obj: {x: {b: 2, c: [kMediumStr], str: kMediumStr}},
+};
+testUpdate(documents2, db.t2, {_id: 100}, oplogUpdate);
+}());
diff --git a/jstests/libs/parallelTester.js b/jstests/libs/parallelTester.js
index a92c4f323a6..3777dbf4e8f 100644
--- a/jstests/libs/parallelTester.js
+++ b/jstests/libs/parallelTester.js
@@ -221,6 +221,7 @@ if (typeof _threadInject != "undefined") {
if (db.getMongo().readMode() === "legacy") {
var requires_find_command = [
"apply_ops_system_dot_views.js",
+ "internal_apply_oplog_update.js",
"merge_sort_collation.js",
"update_pipeline_shell_helpers.js",
"update_with_pipeline.js",
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 2184f741346..62a7832841b 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -158,6 +158,7 @@ env.Library(
'$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
'$BUILD_DIR/mongo/db/logical_session_id',
+ '$BUILD_DIR/mongo/db/pipeline/document_source_internal_apply_oplog_update',
'$BUILD_DIR/mongo/db/repl/isself',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/session_catalog',
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index 935139adfda..51b5e1b2cf7 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/update/log_builder.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index f1ba018554c..f034f2f87fb 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -214,6 +214,20 @@ env.Library(
],
)
+env.Library(
+ target="document_source_internal_apply_oplog_update",
+ source=[
+ 'document_source_internal_apply_oplog_update.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/ops/write_ops_parsers',
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
+ '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers',
+ '$BUILD_DIR/mongo/db/update/update_driver',
+ '$BUILD_DIR/mongo/s/query/router_exec_stage',
+ ],
+)
+
env.CppUnitTest(
target='document_source_test',
source=[
@@ -228,6 +242,7 @@ env.CppUnitTest(
'document_source_geo_near_test.cpp',
'document_source_graph_lookup_test.cpp',
'document_source_group_test.cpp',
+ 'document_source_internal_apply_oplog_update_test.cpp',
'document_source_internal_split_pipeline_test.cpp',
'document_source_limit_test.cpp',
'document_source_lookup_change_post_image_test.cpp',
@@ -262,6 +277,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/s/query/router_exec_stage',
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
'$BUILD_DIR/mongo/util/clock_source_mock',
+ 'document_source_internal_apply_oplog_update',
'document_source_mock',
'document_value_test_util',
'pipeline',
@@ -626,6 +642,7 @@ env.Library(
target='document_sources_idl',
source=[
env.Idlc('document_source_change_stream.idl')[0],
+ env.Idlc('document_source_internal_apply_oplog_update.idl')[0],
env.Idlc('document_source_list_sessions.idl')[0],
env.Idlc('document_source_merge.idl')[0],
env.Idlc('document_source_merge_modes.idl')[0],
diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp
new file mode 100644
index 00000000000..3c906a37c4e
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.cpp
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_internal_apply_oplog_update.h"
+
+#include <boost/optional.hpp>
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
+#include "mongo/db/ops/write_ops_parsers.h"
+#include "mongo/db/pipeline/document_source_internal_apply_oplog_update_gen.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/update/update_driver.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(_internalApplyOplogUpdate,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceInternalApplyOplogUpdate::createFromBson);
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceInternalApplyOplogUpdate::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
+ uassert(6315901,
+ str::stream() << "Argument to " << kStageName
+ << " stage must be an object, but found type: " << typeName(elem.type()),
+ elem.type() == BSONType::Object);
+
+ auto spec = InternalApplyOplogUpdateSpec::parse(IDLParserErrorContext(kStageName),
+ elem.embeddedObject());
+
+ return new DocumentSourceInternalApplyOplogUpdate(pExpCtx, spec.getOplogUpdate());
+}
+
+DocumentSourceInternalApplyOplogUpdate::DocumentSourceInternalApplyOplogUpdate(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const BSONObj& oplogUpdate)
+ : DocumentSource(pExpCtx), _oplogUpdate(std::move(oplogUpdate)), _updateDriver(pExpCtx) {
+ // Parse the raw oplog update description. We're treating this as a "classic" update,
+ // which can either be a full replacement or a modifier-style update.
+ const auto updateMod = write_ops::UpdateModification(_oplogUpdate);
+
+ // UpdateDriver only expects to apply a diff in the context of oplog application.
+ _updateDriver.setFromOplogApplication(true);
+ _updateDriver.parse(updateMod, {});
+}
+
+DocumentSource::GetNextResult DocumentSourceInternalApplyOplogUpdate::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ auto next = pSource->getNext();
+ if (!next.isAdvanced()) {
+ return next;
+ }
+
+ // Use _updateDriver to apply the update to the document.
+ mutablebson::Document doc(next.getDocument().toBson());
+ uassertStatusOK(_updateDriver.update(
+ StringData(), &doc, false /* validateForStorage */, FieldRefSet(), false /* isInsert */));
+
+ return Document(doc.getObject());
+}
+
+Value DocumentSourceInternalApplyOplogUpdate::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(Document{{kStageName, Document{{kOplogUpdateFieldName, _oplogUpdate}}}});
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h
new file mode 100644
index 00000000000..44bb0a0c8d4
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.h
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/update/update_driver.h"
+
+namespace mongo {
+
+/**
+ * This is an internal stage that takes an oplog update description and applies the update to the
+ * input Document.
+ */
+class DocumentSourceInternalApplyOplogUpdate final : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$_internalApplyOplogUpdate"_sd;
+ static constexpr StringData kOplogUpdateFieldName = "oplogUpdate"_sd;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ DocumentSourceInternalApplyOplogUpdate(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const BSONObj& oplogUpdate);
+
+ const char* getSourceName() const override {
+ return kStageName.rawData();
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed,
+ LookupRequirement::kNotAllowed,
+ ChangeStreamRequirement::kBlacklist);
+ constraints.canSwapWithMatch = false;
+ constraints.canSwapWithLimitAndSample = true;
+ constraints.isAllowedWithinUpdatePipeline = true;
+ constraints.isIndependentOfAnyCollection = false;
+ return constraints;
+ }
+
+ DocumentSource::GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kAllPaths, {}, {}};
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ return boost::none;
+ }
+
+ GetNextResult getNext() override;
+
+private:
+ Value serialize(
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
+
+ BSONObj _oplogUpdate;
+ UpdateDriver _updateDriver;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl
new file mode 100644
index 00000000000..f6b81d4b6fb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update.idl
@@ -0,0 +1,42 @@
+# Copyright (C) 2018-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ InternalApplyOplogUpdateSpec:
+ description: $internalApplyOplogUpdate aggregation stage
+ strict: true
+ fields:
+ oplogUpdate:
+ description: An update encoded in oplog update format.
+ type: object
diff --git a/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp
new file mode 100644
index 00000000000..e25fc5e5cfb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_apply_oplog_update_test.cpp
@@ -0,0 +1,153 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_internal_apply_oplog_update.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using DocumentSourceInternalApplyOplogUpdateTest = AggregationContextFixture;
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldBeAbleToReParseSerializedStage) {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {oplogUpdate: {"$v": NumberInt(1), "$set": {b: 3}}}})");
+
+ auto stage =
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx());
+
+ std::vector<Value> serialization;
+ stage->serializeToArray(serialization);
+ auto serializedBSON = serialization[0].getDocument().toBson();
+ ASSERT_VALUE_EQ(serialization[0], Value(Document(spec)));
+
+ auto roundTripped = DocumentSourceInternalApplyOplogUpdate::createFromBson(
+ serializedBSON.firstElement(), getExpCtx());
+
+ std::vector<Value> newSerialization;
+ roundTripped->serializeToArray(newSerialization);
+
+ ASSERT_EQ(newSerialization.size(), 1UL);
+ ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
+}
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldRejectNonObjectSpecs) {
+ {
+ auto spec = fromjson("{$_internalApplyOplogUpdate: 1}");
+
+ ASSERT_THROWS_CODE(DocumentSourceInternalApplyOplogUpdate::createFromBson(
+ spec.firstElement(), getExpCtx()),
+ DBException,
+ 6315901);
+ }
+
+ {
+ auto spec = fromjson("{$_internalApplyOplogUpdate: []}");
+
+ ASSERT_THROWS_CODE(DocumentSourceInternalApplyOplogUpdate::createFromBson(
+ spec.firstElement(), getExpCtx()),
+ DBException,
+ 6315901);
+ }
+}
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, ShouldRejectMalformedSpecs) {
+ auto spec = fromjson("{$_internalApplyOplogUpdate: {}}");
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()),
+ DBException,
+ 40414);
+
+ spec = fromjson(R"({$_internalApplyOplogUpdate: {foo: {"$v": NumberInt(1), "$set": {b: 3}}}})");
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()),
+ DBException,
+ 40415);
+
+ spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(1), "$set": {b: 3}},
+ foo: 1
+ }})");
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx()),
+ DBException,
+ 40415);
+}
+
+TEST_F(DocumentSourceInternalApplyOplogUpdateTest, UpdateMultipleDocuments) {
+ auto spec = fromjson(
+ R"({$_internalApplyOplogUpdate: {
+ oplogUpdate: {"$v": NumberInt(1), "$set": {"b.c": 3}}
+ }})");
+
+ auto stage =
+ DocumentSourceInternalApplyOplogUpdate::createFromBson(spec.firstElement(), getExpCtx());
+ auto mock = DocumentSourceMock::createForTest({Document{{"a", 0}},
+ Document{{"a", 1}, {"b", 1}},
+ Document{{"a", 2}, {"b", Document{{"c", 1}}}},
+ Document{{"a", 3}, {"b", Document{{"d", 2}}}}});
+ stage->setSource(mock.get());
+
+ auto next = stage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ Document expected = Document{{"a", 0}, {"b", Document{{"c", 3}}}};
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected);
+
+ next = stage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ expected = Document{{"a", 1}, {"b", 1}};
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected);
+
+ next = stage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ expected = Document{{"a", 2}, {"b", Document{{"c", 3}}}};
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected);
+
+ next = stage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ expected = Document{{"a", 3}, {"b", Document{{"d", 2}, {"c", 3}}}};
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected);
+
+ ASSERT_TRUE(stage->getNext().isEOF());
+ ASSERT_TRUE(stage->getNext().isEOF());
+ ASSERT_TRUE(stage->getNext().isEOF());
+}
+
+} // namespace
+} // namespace mongo