summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Boros <ian.boros@mongodb.com>2020-06-16 14:48:07 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-09 23:51:46 +0000
commitef1b465f2085a3a31e6dac4611fab3c75919b258 (patch)
tree79cc91c5ed02779d657ecaeaddf4f6b907f04ae0
parent40373c0e6e2c49d62c6e934393af87c14fe6fb14 (diff)
downloadmongo-ef1b465f2085a3a31e6dac4611fab3c75919b258.tar.gz
SERVER-47726 Log delta-style oplog entries from pipeline based updates
-rw-r--r--jstests/core/apply_ops1.js39
-rw-r--r--jstests/replsets/v2_delta_oplog_entries.js338
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp110
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h76
-rw-r--r--src/mongo/db/query/query_knobs.idl6
-rw-r--r--src/mongo/db/repl/oplog.cpp2
-rw-r--r--src/mongo/db/update/delta_executor.h69
-rw-r--r--src/mongo/db/update/log_builder.cpp68
-rw-r--r--src/mongo/db/update/log_builder.h42
-rw-r--r--src/mongo/db/update/log_builder_test.cpp124
-rw-r--r--src/mongo/db/update/modifier_node.cpp9
-rw-r--r--src/mongo/db/update/object_replace_executor.cpp22
-rw-r--r--src/mongo/db/update/object_replace_executor.h12
-rw-r--r--src/mongo/db/update/object_replace_executor_test.cpp22
-rw-r--r--src/mongo/db/update/pipeline_executor.cpp37
-rw-r--r--src/mongo/db/update/pipeline_executor_test.cpp70
-rw-r--r--src/mongo/db/update/rename_node.cpp5
-rw-r--r--src/mongo/db/update/update_array_node.cpp16
-rw-r--r--src/mongo/db/update/update_driver.cpp90
-rw-r--r--src/mongo/db/update/update_driver.h2
-rw-r--r--src/mongo/db/update/update_executor.h29
-rw-r--r--src/mongo/db/update/update_node.h4
-rw-r--r--src/mongo/db/update/update_node_test_fixture.h7
-rw-r--r--src/mongo/db/update/update_oplog_entry_serialization.h57
-rw-r--r--src/mongo/db/update/update_oplog_entry_version.h63
-rw-r--r--src/mongo/db/update/update_tree_executor.h31
-rw-r--r--src/mongo/shell/bench.cpp5
27 files changed, 958 insertions, 397 deletions
diff --git a/jstests/core/apply_ops1.js b/jstests/core/apply_ops1.js
index e63a46cfb67..ee6c2de894b 100644
--- a/jstests/core/apply_ops1.js
+++ b/jstests/core/apply_ops1.js
@@ -4,6 +4,8 @@
// requires_non_retryable_commands,
// # applyOps uses the oplog that require replication support
// requires_replication,
+// # Uses $v: 2 update oplog entries, only available in 4.6.
+// requires_fcv_46,
// ]
(function() {
@@ -413,11 +415,11 @@ res = assert.commandFailed(db.adminCommand({
"op": "u",
"ns": t.getFullName(),
"o2": {_id: 7},
- "o": {$v: NumberLong(0), $set: {z: 1, a: 2}}
+ "o": {$v: NumberInt(0), $set: {z: 1, a: 2}}
}
]
}));
-assert.eq(res.code, 40682);
+assert.eq(res.code, 4772604);
// When we explicitly specify {$v: 1}, we should get 'UpdateNode' update semantics, and $set
// operations get performed in lexicographic order.
@@ -428,9 +430,40 @@ res = assert.commandWorked(db.adminCommand({
"op": "u",
"ns": t.getFullName(),
"o2": {_id: 10},
- "o": {$v: NumberLong(1), $set: {z: 1, a: 2}}
+ "o": {$v: NumberInt(1), $set: {z: 1, a: 2}}
}
]
}));
assert.eq(t.findOne({_id: 10}), {_id: 10, a: 2, z: 1}); // Note: 'a' and 'z' have been sorted.
+
+// {$v: 2} entries encode diffs differently, and operations are applied in the order specified
+// rather than in lexicographic order.
+res = assert.commandWorked(db.adminCommand({
+ applyOps: [
+ {"op": "i", "ns": t.getFullName(), "o": {_id: 11, deleteField: 1}},
+ {
+ "op": "u",
+ "ns": t.getFullName(),
+ "o2": {_id: 11},
+ // The diff indicates that 'deleteField' will be removed and 'newField' will be added
+ // with value "foo".
+ "o": {$v: NumberInt(2), diff: {d: {deleteField: false}, i: {newField: "foo"}}}
+ }
+ ]
+}));
+assert.eq(t.findOne({_id: 11}), {_id: 11, newField: "foo"});
+
+// {$v: 3} does not exist yet, and we check that trying to use it throws an error.
+res = assert.commandFailed(db.adminCommand({
+ applyOps: [
+ {"op": "i", "ns": t.getFullName(), "o": {_id: 12}},
+ {
+ "op": "u",
+ "ns": t.getFullName(),
+ "o2": {_id: 12},
+ "o": {$v: NumberInt(3), diff: {d: {deleteField: false}}}
+ }
+ ]
+}));
+assert.eq(res.code, 4772604);
})();
diff --git a/jstests/replsets/v2_delta_oplog_entries.js b/jstests/replsets/v2_delta_oplog_entries.js
new file mode 100644
index 00000000000..aa0d086aa04
--- /dev/null
+++ b/jstests/replsets/v2_delta_oplog_entries.js
@@ -0,0 +1,338 @@
+/**
+ * Tests use of $v: 2 delta style oplog entries for pipeline based updates. This test only checks
+ * steady-state replication cases. It does not attempt to target cases where delta entries are
+ * re-applied as part of initial sync or rollback.
+ *
+ * This test relies on the DBHash checker to run at the end to ensure that the primaries and
+ * secondaries have the same data. For that reason it's important that this test not drop
+ * intermediate collections.
+ *
+ * @tags: [requires_fcv_46]
+ */
+(function() {
+const rst = new ReplSetTest({
+ name: "v2_delta_oplog_entries",
+ nodes: 2,
+ nodeOptions: {setParameter: {internalQueryEnableLoggingV2OplogEntries: true}}
+});
+
+rst.startSet();
+rst.initiate();
+
+const primary = rst.getPrimary();
+const primaryColl = primary.getDB("test").coll;
+const secondary = rst.getSecondary();
+const secondaryColl = secondary.getDB("test").coll;
+
+// Used for padding documents, in order to make full replacements expensive.
+function makeGiantStr() {
+ let s = "";
+ for (let i = 0; i < 1024; i++) {
+ s += "_";
+ }
+ return s;
+}
+
+const kGiantStr = makeGiantStr();
+const kMediumLengthStr = "zzzzzzzzzzzzzzzzzzzzzzzzzz";
+
+let idGenGlob = 0;
+function generateId() {
+ return idGenGlob++;
+}
+
+const kExpectDeltaEntry = "expectDelta";
+const kExpectReplacementEntry = "expectReplacement";
+// Indicates that the update ran was a noop and we should not expect to see a 'u' oplog
+// entry.
+const kExpectNoUpdateEntry = "expectNoEntry";
+
+/**
+ * Given a connection to a node, check that the most recent oplog entry for document with
+ * 'expectedId' matches the type 'expectedOplogEntryType'.
+ */
+function checkOplogEntry(node, expectedOplogEntryType, expectedId) {
+ const oplog = node.getDB("local").getCollection("oplog.rs");
+
+ const res = oplog
+ .find({
+ $and: [
+ {ns: primaryColl.getFullName()},
+ {$or: [{"o._id": expectedId}, {"o2._id": expectedId}]}
+ ]
+ })
+ .limit(1)
+ .hint({$natural: -1}) // Reverse scan, so we get the most recent entry.
+ .toArray();
+ assert.eq(res.length, 1);
+
+ const oplogEntry = res[0];
+
+ if (expectedOplogEntryType === kExpectDeltaEntry) {
+ assert.eq(oplogEntry.op, "u");
+ assert.eq(oplogEntry.o.$v, 2, oplogEntry);
+ assert.eq(typeof (oplogEntry.o.diff), "object", oplogEntry);
+
+ // Check that the oplog entry's _id field is for the document we updated.
+ assert.eq(oplogEntry.o2._id, expectedId);
+
+ // Do some cursory/weak checks about the format of the 'o' field.
+ assert.eq(Object.keys(oplogEntry.o), ["$v", "diff"]);
+ for (let key of Object.keys(oplogEntry.o.diff)) {
+ assert.contains(key, ["i", "u", "s", "d"]);
+ }
+ } else if (expectedOplogEntryType === kExpectReplacementEntry) {
+ assert.eq(oplogEntry.op, "u");
+ assert.eq(oplogEntry.o.hasOwnProperty("$v"), false, oplogEntry);
+ } else if (expectedOplogEntryType == kExpectNoUpdateEntry) {
+ assert.eq(oplogEntry.op, "i");
+ assert.eq(oplogEntry.o._id, expectedId);
+ }
+}
+
+// Last parameter is whether we expect the oplog entry to only record an update rather than
+// replacement.
+function testUpdateReplicates({preImage, pipeline, postImage, expectedOplogEntry}) {
+ const idKey = preImage._id;
+ assert.commandWorked(primaryColl.insert(preImage));
+ assert.commandWorked(primaryColl.update({_id: idKey}, pipeline));
+
+ rst.awaitReplication();
+ const secondaryDoc = secondaryColl.findOne({_id: idKey});
+ assert.eq(postImage, secondaryDoc);
+
+ checkOplogEntry(primary, expectedOplogEntry, preImage._id);
+}
+
+const oplog = primary.getDB("local").getCollection("oplog.rs");
+let id;
+
+// Removing fields.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: 3, y: 3, giantStr: kGiantStr},
+ pipeline: [{$unset: ["x", "y"]}],
+ postImage: {_id: id, giantStr: kGiantStr},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Adding a field and updating an existing one.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: 0, y: 0},
+ pipeline: [{$set: {a: "foo", y: 999}}],
+ postImage: {_id: id, x: 0, y: 999, a: "foo"},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Updating a subfield to a string.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: 4, subObj: {a: 1, b: 2}},
+ pipeline: [{$set: {"subObj.a": "foo", y: 1}}],
+ postImage: {_id: id, x: 4, subObj: {a: "foo", b: 2}, y: 1},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Updating a subfield to have the same value but different type. This is designed to check that the
+// server uses strict binary comparison to determine whether a field needs to be updated, rather
+// than a weak BSON type insensitive comparison.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: 4, subObj: {a: NumberLong(1), b: 2}},
+ pipeline: [{$set: {"subObj.a": 1, y: 1}}],
+ postImage: {_id: id, x: 4, subObj: {a: 1, b: 2}, y: 1},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Update a subfield to an object.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: 4, subObj: {a: NumberLong(1), b: 2}},
+ pipeline: [{$set: {"subObj.a": {$const: {newObj: {subField: 1}}}, y: 1}}],
+ postImage: {_id: id, x: 4, subObj: {a: {newObj: {subField: 1}}, b: 2}, y: 1},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Adding a field to a sub object.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, subObj: {a: 1, b: 2}},
+ pipeline: [{$set: {"subObj.c": "foo"}}],
+ postImage: {_id: id, subObj: {a: 1, b: 2, c: "foo"}},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Adding a field to a sub object while removing a top level field.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, subObj: {a: 1, b: 2}, toRemove: "foo", giantStr: kGiantStr},
+ pipeline: [{$project: {subObj: 1, giantStr: 1}}, {$set: {"subObj.c": "foo"}}],
+ postImage: {_id: id, subObj: {a: 1, b: 2, c: "foo"}, giantStr: kGiantStr},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Dropping a field via inclusion projection.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, toRemove: "foo", subObj: {a: 1, b: 2}},
+ pipeline: [{$project: {subObj: 1}}],
+ postImage: {_id: id, subObj: {a: 1, b: 2}},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Inclusion projection dropping a subfield (subObj.toRemove).
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: "foo", subObj: {a: 1, toRemove: 2}, giantStr: kGiantStr},
+ pipeline: [{$project: {subObj: {a: 1}, giantStr: 1}}],
+ postImage: {_id: id, subObj: {a: 1}, giantStr: kGiantStr},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// $replaceRoot with identical document. We should expect no update oplog entry in this case.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: "foo", subObj: {a: 1, b: 2}},
+ pipeline: [{$replaceRoot: {newRoot: {_id: id, x: "foo", subObj: {a: 1, b: 2}}}}],
+ postImage: {_id: id, x: "foo", subObj: {a: 1, b: 2}},
+ expectedOplogEntry: kExpectNoUpdateEntry
+});
+
+// $replaceRoot with a similar document. In this case the diff should be small enough to use
+// delta oplog entries instead of doing a full replacement.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: "foo", subObj: {a: 1, b: 2}, giantStr: kGiantStr},
+ pipeline: [{$replaceRoot: {newRoot: {x: "bar", subObj: {a: 1, b: 2}, giantStr: kGiantStr}}}],
+ postImage: {_id: id, x: "bar", subObj: {a: 1, b: 2}, giantStr: kGiantStr},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Replace root with a very different document. In this case we should fall back to a replacement
+// style update.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: "foo", subObj: {a: 1, b: 2}},
+ pipeline: [{$replaceRoot: {newRoot: {_id: id, newField: kMediumLengthStr}}}],
+ postImage: {_id: id, newField: kMediumLengthStr},
+ expectedOplogEntry: kExpectReplacementEntry
+});
+
+// Combine updates to existing fields and insertions of new fields.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, a: 1, b: {c: 2, d: {e: 3, f: 6}}, z: 3},
+ pipeline: [
+ {$unset: ["b.d.f"]},
+ {$set: {"b.a": 5, "b.b": 3, "b.c": 2, "b.d.d": 2, "b.d.e": 10, z: 7}}
+ ],
+ postImage: {_id: id, padding: kGiantStr, a: 1, b: {c: 2, d: {e: 10, d: 2}, a: 5, b: 3}, z: 7},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Setting a sub object inside an array.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: kGiantStr, arrField: [{x: 1}, {x: 2}]},
+ pipeline: [{$set: {"arrField.x": 5}}],
+ postImage: {_id: id, x: kGiantStr, arrField: [{x: 5}, {x: 5}]},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Reordering fields with replaceRoot.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, x: "foo", y: "bar", z: "baz"},
+ pipeline: [{$replaceRoot: {newRoot: {_id: id, z: "baz", y: "bar", x: "foo"}}}],
+ postImage: {_id: id, z: "baz", y: "bar", x: "foo"},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Reordering two small fields in a very large document.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, x: "foo", y: "bar"},
+ pipeline: [{$replaceRoot: {newRoot: {_id: id, padding: kGiantStr, y: "bar", x: "foo"}}}],
+ postImage: {_id: id, padding: kGiantStr, y: "bar", x: "foo"},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Similar case of reordering fields.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, p: kGiantStr, a: 1, b: 1, c: 1, d: 1},
+ pipeline: [{$replaceRoot: {newRoot: {_id: id, p: kGiantStr, a: 1, c: 1, b: 1, d: 1}}}],
+ postImage: {_id: id, p: kGiantStr, a: 1, c: 1, b: 1, d: 1},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Modify an element in the middle of an array.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, a: [1, 2, 3, 4, 5]},
+ pipeline: [{$set: {a: [1, 2, 999, 4, 5]}}],
+ postImage: {_id: id, padding: kGiantStr, a: [1, 2, 999, 4, 5]},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Modify an object inside an array.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, a: [1, 2, 3, {b: 1}, 5]},
+ pipeline: [{$set: {a: [1, 2, 3, {b: 2}, 5]}}],
+ postImage: {_id: id, padding: kGiantStr, a: [1, 2, 3, {b: 2}, 5]},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Object inside an array inside an object inside an array.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, a: [1, 2, 3, {b: [{c: 1}]}, 5]},
+ pipeline: [{$set: {a: [1, 2, 3, {b: [{c: 999}]}, 5]}}],
+ postImage: {_id: id, padding: kGiantStr, a: [1, 2, 3, {b: [{c: 999}]}, 5]},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Case where we append to an array.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, a: [1, 2, 3]},
+ pipeline: [{$set: {a: [1, 2, 3, 4, 5]}}],
+ postImage: {_id: id, padding: kGiantStr, a: [1, 2, 3, 4, 5]},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Case where we make an array shorter.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, a: [1, 2, 3]},
+ pipeline: [{$set: {a: [1, 2]}}],
+ postImage: {_id: id, padding: kGiantStr, a: [1, 2]},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Change element of array AND shorten it
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, a: [1, {b: 10}, 3]},
+ pipeline: [{$set: {a: [1, {b: 9}]}}],
+ postImage: {_id: id, padding: kGiantStr, a: [1, {b: 9}]},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Remove element from the middle of an array. Should still use a delta, and only rewrite the last
+// parts of the array.
+id = generateId();
+testUpdateReplicates({
+ preImage: {_id: id, padding: kGiantStr, a: [1, 2, 999, 3, 4]},
+ pipeline: [{$set: {a: [1, 2, 3, 4]}}],
+ postImage: {_id: id, padding: kGiantStr, a: [1, 2, 3, 4]},
+ expectedOplogEntry: kExpectDeltaEntry
+});
+
+// Don't drop any collections. At the end we want the DBHash checker will make sure there's no
+// corruption.
+
+rst.stopSet();
+})();
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index 935139adfda..bf00e6d40a3 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -34,8 +34,10 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/update/update_oplog_entry_version.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
+#include "mongo/util/visit_helper.h"
namespace mongo {
@@ -208,11 +210,47 @@ write_ops::Delete DeleteOp::parseLegacy(const Message& msgRaw) {
return op;
}
+write_ops::UpdateModification write_ops::UpdateModification::parseFromOplogEntry(
+ const BSONObj& oField) {
+ BSONElement vField = oField[kUpdateOplogEntryVersionFieldName];
+
+ // If this field appears it should be an integer.
+ uassert(4772600,
+ str::stream() << "Expected $v field to be missing or an integer, but got type: "
+ << vField.type(),
+ !vField.ok() ||
+ (vField.type() == BSONType::NumberInt || vField.type() == BSONType::NumberLong));
+
+ if (vField.ok() && vField.numberInt() == static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)) {
+ // Make sure there's a diff field.
+ BSONElement diff = oField["diff"];
+ uassert(4772601,
+ str::stream() << "Expected 'diff' field to be an object, instead got type: "
+ << diff.type(),
+ diff.type() == BSONType::Object);
+
+ return UpdateModification(doc_diff::Diff{diff.embeddedObject()}, DiffTag{});
+ } else if (!vField.ok() ||
+ vField.numberInt() == static_cast<int>(UpdateOplogEntryVersion::kUpdateNodeV1)) {
+ // Treat it as a "classic" update which can either be a full replacement or a
+ // modifier-style update. Which variant it is will be determined when the update driver is
+ // constructed.
+ return UpdateModification(oField);
+ }
+
+ // The $v field must be present, but have some unsupported value.
+ uasserted(4772604,
+ str::stream() << "Unrecognized value for '$v' (Version) field: "
+ << vField.numberInt());
+}
+
+write_ops::UpdateModification::UpdateModification(doc_diff::Diff diff, DiffTag)
+ : _update(std::move(diff)) {}
+
write_ops::UpdateModification::UpdateModification(BSONElement update) {
const auto type = update.type();
if (type == BSONType::Object) {
- _classicUpdate = update.Obj();
- _type = Type::kClassic;
+ _update = ClassicUpdate{update.Obj()};
return;
}
@@ -220,18 +258,24 @@ write_ops::UpdateModification::UpdateModification(BSONElement update) {
"Update argument must be either an object or an array",
type == BSONType::Array);
- _type = Type::kPipeline;
-
- _pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(update));
+ _update = PipelineUpdate{uassertStatusOK(AggregationRequest::parsePipelineFromBSON(update))};
}
write_ops::UpdateModification::UpdateModification(const BSONObj& update) {
- _classicUpdate = update;
- _type = Type::kClassic;
+ // Do a sanity check that the $v field is either not provided or has value of 1.
+ const auto versionElem = update["$v"];
+ uassert(4772602,
+ str::stream() << "Expected classic update either contain no '$v' field, or "
+ << "'$v' field with value 1, but found: " << versionElem,
+ !versionElem.ok() ||
+ versionElem.numberInt() ==
+ static_cast<int>(UpdateOplogEntryVersion::kUpdateNodeV1));
+
+ _update = ClassicUpdate{update};
}
write_ops::UpdateModification::UpdateModification(std::vector<BSONObj> pipeline)
- : _type{Type::kPipeline}, _pipeline{std::move(pipeline)} {}
+ : _update{PipelineUpdate{std::move(pipeline)}} {}
write_ops::UpdateModification write_ops::UpdateModification::parseFromBSON(BSONElement elem) {
return UpdateModification(elem);
@@ -242,18 +286,50 @@ write_ops::UpdateModification write_ops::UpdateModification::parseLegacyOpUpdate
return UpdateModification(obj);
}
+int write_ops::UpdateModification::objsize() const {
+ return stdx::visit(
+ visit_helper::Overloaded{
+ [](const ClassicUpdate& classic) -> int { return classic.bson.objsize(); },
+ [](const PipelineUpdate& pipeline) -> int {
+ int size = 0;
+ std::for_each(pipeline.begin(), pipeline.end(), [&size](const BSONObj& obj) {
+ size += obj.objsize() + kWriteCommandBSONArrayPerElementOverheadBytes;
+ });
+
+ return size + kWriteCommandBSONArrayPerElementOverheadBytes;
+ },
+ [](const doc_diff::Diff& diff) -> int { return diff.objsize(); }},
+ _update);
+}
+
+
+write_ops::UpdateModification::Type write_ops::UpdateModification::type() const {
+ return stdx::visit(
+ visit_helper::Overloaded{
+ [](const ClassicUpdate& classic) { return Type::kClassic; },
+ [](const PipelineUpdate& pipelineUpdate) { return Type::kPipeline; },
+ [](const doc_diff::Diff& diff) { return Type::kDelta; }},
+ _update);
+}
+
void write_ops::UpdateModification::serializeToBSON(StringData fieldName,
BSONObjBuilder* bob) const {
- if (_type == Type::kClassic) {
- *bob << fieldName << *_classicUpdate;
- return;
- }
- BSONArrayBuilder arrayBuilder(bob->subarrayStart(fieldName));
- for (auto&& stage : *_pipeline) {
- arrayBuilder << stage;
- }
- arrayBuilder.doneFast();
+ stdx::visit(
+ visit_helper::Overloaded{
+ [fieldName, bob](const ClassicUpdate& classic) { *bob << fieldName << classic.bson; },
+ [fieldName, bob](const PipelineUpdate& pipeline) {
+ BSONArrayBuilder arrayBuilder(bob->subarrayStart(fieldName));
+ for (auto&& stage : pipeline) {
+ arrayBuilder << stage;
+ }
+ arrayBuilder.doneFast();
+ },
+ [](const doc_diff::Diff& diff) {
+ // We never serialize delta style updates.
+ MONGO_UNREACHABLE;
+ }},
+ _update);
}
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index 70eb8d1a7d3..0464a3349df 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -33,10 +33,12 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/update/document_diff_serialization.h"
+#include "mongo/stdx/variant.h"
+#include "mongo/util/visit_helper.h"
namespace mongo {
namespace write_ops {
-
// Conservative per array element overhead. This value was calculated as 1 byte (element type) + 5
// bytes (max string encoding of the array index encoded as string and the maximum key is 99999) + 1
// byte (zero terminator) = 7 bytes
@@ -55,19 +57,24 @@ void writeMultiDeleteProperty(bool isMulti, StringData fieldName, BSONObjBuilder
class UpdateModification {
public:
- enum class Type { kClassic, kPipeline };
+ enum class Type { kClassic, kPipeline, kDelta };
- static StringData typeToString(Type type) {
- return (type == Type::kClassic ? "Classic"_sd : "Pipeline"_sd);
- }
+ /**
+ * Used to indicate that a diff is being passed to the constructor.
+ */
+ struct DiffTag {};
+
+ // Given the 'o' field of an update oplog entry, will return an UpdateModification that can be
+ // applied.
+ static UpdateModification parseFromOplogEntry(const BSONObj& oField);
UpdateModification() = default;
UpdateModification(BSONElement update);
UpdateModification(std::vector<BSONObj> pipeline);
+ UpdateModification(doc_diff::Diff, DiffTag);
// This constructor exists only to provide a fast-path for constructing classic-style updates.
UpdateModification(const BSONObj& update);
-
/**
* These methods support IDL parsing of the "u" field from the update command and OP_UPDATE.
*/
@@ -83,50 +90,51 @@ public:
// representing an aggregation stage, due to the leading '$'' character.
static UpdateModification parseLegacyOpUpdateFromBSON(const BSONObj& obj);
- int objsize() const {
- if (_type == Type::kClassic) {
- return _classicUpdate->objsize();
- }
+ int objsize() const;
- int size = 0;
- std::for_each(_pipeline->begin(), _pipeline->end(), [&size](const BSONObj& obj) {
- size += obj.objsize() + kWriteCommandBSONArrayPerElementOverheadBytes;
- });
-
- return size + kWriteCommandBSONArrayPerElementOverheadBytes;
- }
-
- Type type() const {
- return _type;
- }
+ Type type() const;
BSONObj getUpdateClassic() const {
- invariant(_type == Type::kClassic);
- return *_classicUpdate;
+ invariant(type() == Type::kClassic);
+ return stdx::get<ClassicUpdate>(_update).bson;
}
const std::vector<BSONObj>& getUpdatePipeline() const {
- invariant(_type == Type::kPipeline);
- return *_pipeline;
+ invariant(type() == Type::kPipeline);
+ return stdx::get<PipelineUpdate>(_update);
+ }
+
+ doc_diff::Diff getDiff() const {
+ invariant(type() == Type::kDelta);
+ return stdx::get<doc_diff::Diff>(_update);
}
std::string toString() const {
StringBuilder sb;
- sb << "{type: " << typeToString(_type) << ", update: ";
- if (_type == Type::kClassic) {
- sb << *_classicUpdate << "}";
- } else {
- sb << Value(*_pipeline).toString();
- }
+ stdx::visit(visit_helper::Overloaded{[&sb](const ClassicUpdate& classic) {
+ sb << "{type: Classic, update: " << classic.bson
+ << "}";
+ },
+ [&sb](const PipelineUpdate& pipeline) {
+ sb << "{type: Pipeline, update: "
+ << Value(pipeline).toString() << "}";
+ },
+ [&sb](const doc_diff::Diff& diff) {
+ sb << "{type: Delta, update: " << diff << "}";
+ }},
+ _update);
return sb.str();
}
private:
- Type _type = Type::kClassic;
- boost::optional<BSONObj> _classicUpdate;
- boost::optional<std::vector<BSONObj>> _pipeline;
+ // Wrapper class used to avoid having a variant where multiple alternatives have the same type.
+ struct ClassicUpdate {
+ BSONObj bson;
+ };
+ using PipelineUpdate = std::vector<BSONObj>;
+ stdx::variant<ClassicUpdate, PipelineUpdate, doc_diff::Diff> _update;
};
} // namespace write_ops
diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl
index 3035d6ed545..ba127f85fe8 100644
--- a/src/mongo/db/query/query_knobs.idl
+++ b/src/mongo/db/query/query_knobs.idl
@@ -364,3 +364,9 @@ server_parameters:
validator:
gt: 0
+ internalQueryEnableLoggingV2OplogEntries:
+ description: "If true, this node may log $v:2 delta-style oplog entries."
+ set_at: [ startup, runtime ]
+ cpp_varname: "internalQueryEnableLoggingV2OplogEntries"
+ cpp_vartype: AtomicWord<bool>
+ default: false
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 0181230f177..f534ad78877 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1220,7 +1220,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
auto request = UpdateRequest();
request.setNamespaceString(requestNss);
request.setQuery(updateCriteria);
- request.setUpdateModification(o);
+ request.setUpdateModification(write_ops::UpdateModification::parseFromOplogEntry(o));
request.setUpsert(upsert);
request.setFromOplogApplication(true);
diff --git a/src/mongo/db/update/delta_executor.h b/src/mongo/db/update/delta_executor.h
new file mode 100644
index 00000000000..18d97f299e5
--- /dev/null
+++ b/src/mongo/db/update/delta_executor.h
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/update/update_executor.h"
+
+#include "mongo/db/update/document_diff_applier.h"
+#include "mongo/db/update/document_diff_serialization.h"
+
+namespace mongo {
+
+/**
+ * An UpdateExecutor representing a delta-style update. Delta-style updates apply a diff
+ * to the pre image document in order to recover the post image.
+ */
+class DeltaExecutor : public UpdateExecutor {
+public:
+ /**
+ * Initializes the executor with the diff to apply.
+ */
+ explicit DeltaExecutor(doc_diff::Diff diff) : _diff(std::move(diff)) {}
+
+ ApplyResult applyUpdate(ApplyParams applyParams) const final {
+ const auto originalDoc = applyParams.element.getDocument().getObject();
+ auto postImage = doc_diff::applyDiff(originalDoc, _diff);
+ auto postImageHasId = postImage.hasField("_id");
+
+ return ObjectReplaceExecutor::applyReplacementUpdate(
+ applyParams, postImage, postImageHasId);
+ }
+
+ Value serialize() const final {
+ // Delta updates are only applied internally on secondaries. They are never passed between
+ // nodes or re-written.
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ doc_diff::Diff _diff;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/update/log_builder.cpp b/src/mongo/db/update/log_builder.cpp
index 5fbd6514791..e148f2df4be 100644
--- a/src/mongo/db/update/log_builder.cpp
+++ b/src/mongo/db/update/log_builder.cpp
@@ -39,17 +39,9 @@ const char kSet[] = "$set";
const char kUnset[] = "$unset";
} // namespace
-constexpr StringData LogBuilder::kUpdateSemanticsFieldName;
-
inline Status LogBuilder::addToSection(Element newElt, Element* section, const char* sectionName) {
// If we don't already have this section, try to create it now.
if (!section->ok()) {
- // If we already have object replacement data, we can't also have section entries.
- if (hasObjectReplacement())
- return Status(ErrorCodes::IllegalOperation,
- "LogBuilder: Invalid attempt to add a $set/$unset entry"
- "to a log with an existing object replacement");
-
mutablebson::Document& doc = _logRoot.getDocument();
// We should not already have an element with the section name under the root.
@@ -66,16 +58,11 @@ inline Status LogBuilder::addToSection(Element newElt, Element* section, const c
if (!result.isOK())
return result;
*section = newElement;
-
- // Invalidate attempts to add an object replacement, now that we have a named
- // section under the root.
- _objectReplacementAccumulator = doc.end();
}
// Whatever transpired, we should now have an ok accumulator for the section, and not
// have a replacement accumulator.
dassert(section->ok());
- dassert(!_objectReplacementAccumulator.ok());
// Enqueue the provided element to the section and propagate the result.
return section->pushBack(newElt);
@@ -126,60 +113,17 @@ Status LogBuilder::addToUnsets(StringData path) {
return addToSection(logElement, &_unsetAccumulator, kUnset);
}
-Status LogBuilder::setUpdateSemantics(UpdateSemantics updateSemantics) {
- if (hasObjectReplacement()) {
- return Status(ErrorCodes::IllegalOperation,
- "LogBuilder: Invalid attempt to add a $v entry to a log with an existing "
- "object replacement");
- }
-
- if (_updateSemantics.ok()) {
+Status LogBuilder::setVersion(UpdateOplogEntryVersion oplogVersion) {
+ if (_version.ok()) {
return Status(ErrorCodes::IllegalOperation, "LogBuilder: Invalid attempt to set $v twice.");
}
mutablebson::Document& doc = _logRoot.getDocument();
- _updateSemantics =
- doc.makeElementInt(kUpdateSemanticsFieldName, static_cast<int>(updateSemantics));
+ _version =
+ doc.makeElementInt(kUpdateOplogEntryVersionFieldName, static_cast<int>(oplogVersion));
- dassert(_logRoot[kUpdateSemanticsFieldName] == doc.end());
+ dassert(_logRoot[kUpdateOplogEntryVersionFieldName] == doc.end());
- return _logRoot.pushFront(_updateSemantics);
+ return _logRoot.pushFront(_version);
}
-
-Status LogBuilder::getReplacementObject(Element* outElt) {
- // If the replacement accumulator is not ok, we must have started a $set or $unset
- // already, so an object replacement is not permitted.
- if (!_objectReplacementAccumulator.ok()) {
- dassert(_setAccumulator.ok() || _unsetAccumulator.ok());
- return Status(ErrorCodes::IllegalOperation,
- "LogBuilder: Invalid attempt to obtain the object replacement slot "
- "for a log containing $set or $unset entries");
- }
-
- if (hasObjectReplacement())
- return Status(ErrorCodes::IllegalOperation,
- "LogBuilder: Invalid attempt to acquire the replacement object "
- "in a log with existing object replacement data");
-
- if (_updateSemantics.ok()) {
- return Status(ErrorCodes::IllegalOperation,
- "LogBuilder: Invalid attempt to acquire the replacement object in a log with "
- "an update semantics value");
- }
-
- // OK to enqueue object replacement items.
- *outElt = _objectReplacementAccumulator;
- return Status::OK();
-}
-
-inline bool LogBuilder::hasObjectReplacement() const {
- if (!_objectReplacementAccumulator.ok())
- return false;
-
- dassert(!_setAccumulator.ok());
- dassert(!_unsetAccumulator.ok());
-
- return _objectReplacementAccumulator.hasChildren();
-}
-
} // namespace mongo
diff --git a/src/mongo/db/update/log_builder.h b/src/mongo/db/update/log_builder.h
index 38cf9d11e87..8f01f9e0f4a 100644
--- a/src/mongo/db/update/log_builder.h
+++ b/src/mongo/db/update/log_builder.h
@@ -31,40 +31,25 @@
#include "mongo/base/status.h"
#include "mongo/bson/mutable/document.h"
+#include "mongo/db/update/update_oplog_entry_version.h"
namespace mongo {
/**
- * Previously, there were multiple supported versions of the update language.
- */
-enum class UpdateSemantics {
- // The update system introduced in v3.6, and is the only supported system. When a single update
- // adds multiple fields, those fields are added in lexicographic order by field name. This
- // system introduces support for arrayFilters and $[] syntax.
- kUpdateNode = 1,
-
- // Must be last.
- kNumUpdateSemantics
-};
-
-/** LogBuilder abstracts away some of the details of producing a properly constructed oplog
- * update entry. It manages separate regions into which it accumulates $set and $unset
- * operations, and distinguishes object replacement style oplog generation from
- * $set/$unset style generation and prevents admixture.
+ * LogBuilder abstracts away some of the details of producing a properly constructed oplog $v:1
+ * modifier-style update entry. It manages separate regions into which it accumulates $set and
+ * $unset operations.
*/
class LogBuilder {
public:
- static constexpr StringData kUpdateSemanticsFieldName = "$v"_sd;
-
/** Construct a new LogBuilder. Log entries will be recorded as new children under the
* 'logRoot' Element, which must be of type mongo::Object and have no children.
*/
inline LogBuilder(mutablebson::Element logRoot)
: _logRoot(logRoot),
- _objectReplacementAccumulator(_logRoot),
_setAccumulator(_logRoot.getDocument().end()),
_unsetAccumulator(_setAccumulator),
- _updateSemantics(_setAccumulator) {
+ _version(_setAccumulator) {
dassert(logRoot.isType(mongo::Object));
dassert(!logRoot.hasChildren());
}
@@ -115,30 +100,17 @@ public:
/**
* Add a "$v" field to the log. Fails if there is already a "$v" field.
*/
- Status setUpdateSemantics(UpdateSemantics updateSemantics);
-
- /** Obtain, via the out parameter 'outElt', a pointer to the mongo::Object type Element
- * to which the components of an object replacement should be recorded. It is an error
- * to call this if any Elements have been added by calling either addToSets or
- * addToUnsets, and attempts to do so will return a non-OK Status. Similarly, if there
- * is already object replacement data recorded for this log, the call will fail.
- */
- Status getReplacementObject(mutablebson::Element* outElt);
+ Status setVersion(UpdateOplogEntryVersion);
private:
- // Returns true if the object replacement accumulator is valid and has children, false
- // otherwise.
- inline bool hasObjectReplacement() const;
-
inline Status addToSection(mutablebson::Element newElt,
mutablebson::Element* section,
const char* sectionName);
mutablebson::Element _logRoot;
- mutablebson::Element _objectReplacementAccumulator;
mutablebson::Element _setAccumulator;
mutablebson::Element _unsetAccumulator;
- mutablebson::Element _updateSemantics;
+ mutablebson::Element _version;
};
} // namespace mongo
diff --git a/src/mongo/db/update/log_builder_test.cpp b/src/mongo/db/update/log_builder_test.cpp
index f43bc57e93f..a6cdf6421b6 100644
--- a/src/mongo/db/update/log_builder_test.cpp
+++ b/src/mongo/db/update/log_builder_test.cpp
@@ -114,44 +114,6 @@ TEST(LogBuilder, AddOneToEach) {
doc);
}
-TEST(LogBuilder, AddOneObjectReplacementEntry) {
- mmb::Document doc;
- LogBuilder lb(doc.root());
-
- mmb::Element replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_OK(lb.getReplacementObject(&replacement));
- ASSERT_TRUE(replacement.ok());
- ASSERT_TRUE(replacement.isType(mongo::Object));
-
- const mmb::Element elt_a = doc.makeElementInt("a", 1);
- ASSERT_TRUE(elt_a.ok());
- ASSERT_OK(replacement.pushBack(elt_a));
-
- ASSERT_EQUALS(mongo::fromjson("{ a : 1 }"), doc);
-}
-
-TEST(LogBuilder, AddTwoObjectReplacementEntry) {
- mmb::Document doc;
- LogBuilder lb(doc.root());
-
- mmb::Element replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_OK(lb.getReplacementObject(&replacement));
- ASSERT_TRUE(replacement.ok());
- ASSERT_TRUE(replacement.isType(mongo::Object));
-
- const mmb::Element elt_a = doc.makeElementInt("a", 1);
- ASSERT_TRUE(elt_a.ok());
- ASSERT_OK(replacement.pushBack(elt_a));
-
- const mmb::Element elt_b = doc.makeElementInt("b", 2);
- ASSERT_TRUE(elt_b.ok());
- ASSERT_OK(replacement.pushBack(elt_b));
-
- ASSERT_EQUALS(mongo::fromjson("{ a : 1, b: 2 }"), doc);
-}
-
TEST(LogBuilder, VerifySetsAreGrouped) {
mmb::Document doc;
LogBuilder lb(doc.root());
@@ -184,90 +146,4 @@ TEST(LogBuilder, VerifyUnsetsAreGrouped) {
"} }"),
doc);
}
-
-TEST(LogBuilder, PresenceOfSetPreventsObjectReplacement) {
- mmb::Document doc;
- LogBuilder lb(doc.root());
-
- mmb::Element replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_OK(lb.getReplacementObject(&replacement));
- ASSERT_TRUE(replacement.ok());
-
- const mmb::Element elt_ab = doc.makeElementInt("a.b", 1);
- ASSERT_TRUE(elt_ab.ok());
- ASSERT_OK(lb.addToSets(elt_ab));
-
- replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_NOT_OK(lb.getReplacementObject(&replacement));
- ASSERT_FALSE(replacement.ok());
-}
-
-TEST(LogBuilder, PresenceOfUnsetPreventsObjectReplacement) {
- mmb::Document doc;
- LogBuilder lb(doc.root());
-
- mmb::Element replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_OK(lb.getReplacementObject(&replacement));
- ASSERT_TRUE(replacement.ok());
-
- const mmb::Element elt_ab = doc.makeElementInt("a.b", 1);
- ASSERT_TRUE(elt_ab.ok());
- ASSERT_OK(lb.addToSets(elt_ab));
-
- replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_NOT_OK(lb.getReplacementObject(&replacement));
- ASSERT_FALSE(replacement.ok());
-}
-
-TEST(LogBuilder, CantAddSetWithObjectReplacementDataPresent) {
- mmb::Document doc;
- LogBuilder lb(doc.root());
-
- mmb::Element replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_OK(lb.getReplacementObject(&replacement));
- ASSERT_TRUE(replacement.ok());
- ASSERT_OK(replacement.appendInt("a", 1));
-
- mmb::Element setCandidate = doc.makeElementInt("x", 0);
- ASSERT_NOT_OK(lb.addToSets(setCandidate));
-}
-
-TEST(LogBuilder, CantAddUnsetWithObjectReplacementDataPresent) {
- mmb::Document doc;
- LogBuilder lb(doc.root());
-
- mmb::Element replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_OK(lb.getReplacementObject(&replacement));
- ASSERT_TRUE(replacement.ok());
- ASSERT_OK(replacement.appendInt("a", 1));
-
- ASSERT_NOT_OK(lb.addToUnsets("x"));
-}
-
-// Ensure that once you have obtained the object replacement slot and mutated it, that the
-// object replacement slot becomes in accessible. This is a bit paranoid, since in practice
-// the modifier conflict detection logic should prevent that outcome at a higher level, but
-// preventing it here costs us nothing and add an extra safety check.
-TEST(LogBuilder, CantReacquireObjectReplacementData) {
- mmb::Document doc;
- LogBuilder lb(doc.root());
-
- mmb::Element replacement = doc.end();
- ASSERT_FALSE(replacement.ok());
- ASSERT_OK(lb.getReplacementObject(&replacement));
- ASSERT_TRUE(replacement.ok());
- ASSERT_OK(replacement.appendInt("a", 1));
-
- mmb::Element again = doc.end();
- ASSERT_FALSE(again.ok());
- ASSERT_NOT_OK(lb.getReplacementObject(&again));
- ASSERT_FALSE(again.ok());
-}
-
} // namespace
diff --git a/src/mongo/db/update/modifier_node.cpp b/src/mongo/db/update/modifier_node.cpp
index 2ea03766cf5..9d5e48bcff9 100644
--- a/src/mongo/db/update/modifier_node.cpp
+++ b/src/mongo/db/update/modifier_node.cpp
@@ -200,8 +200,8 @@ UpdateExecutor::ApplyResult ModifierNode::applyToExistingElement(
applyParams.element, leftSibling, rightSibling, recursionLevel, updateResult);
}
- if (applyParams.logBuilder) {
- logUpdate(applyParams.logBuilder,
+ if (auto logBuilder = updateNodeApplyParams.logBuilder) {
+ logUpdate(logBuilder,
updateNodeApplyParams.pathTaken->dottedField(),
applyParams.element,
updateResult);
@@ -299,9 +299,8 @@ UpdateExecutor::ApplyResult ModifierNode::applyToNonexistentElement(
applyResult.indexesAffected = false;
}
- if (applyParams.logBuilder) {
- logUpdate(
- applyParams.logBuilder, fullPath.dottedField(), newElement, ModifyResult::kCreated);
+ if (auto logBuilder = updateNodeApplyParams.logBuilder) {
+ logUpdate(logBuilder, fullPath.dottedField(), newElement, ModifyResult::kCreated);
}
return applyResult;
diff --git a/src/mongo/db/update/object_replace_executor.cpp b/src/mongo/db/update/object_replace_executor.cpp
index 8f0251efc99..5ab7c1d39c9 100644
--- a/src/mongo/db/update/object_replace_executor.cpp
+++ b/src/mongo/db/update/object_replace_executor.cpp
@@ -32,9 +32,11 @@
#include "mongo/db/update/object_replace_executor.h"
#include "mongo/base/data_view.h"
+#include "mongo/bson/mutable/document.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/service_context.h"
#include "mongo/db/update/storage_validation.h"
+#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/db/vector_clock_mutable.h"
namespace mongo {
@@ -142,20 +144,16 @@ UpdateExecutor::ApplyResult ObjectReplaceExecutor::applyReplacementUpdate(
}
}
- if (applyParams.logBuilder) {
- auto replacementObject = applyParams.logBuilder->getDocument().end();
- invariant(applyParams.logBuilder->getReplacementObject(&replacementObject));
- for (auto current = applyParams.element.leftChild(); current.ok();
- current = current.rightSibling()) {
- invariant(replacementObject.appendElement(current.getValue()));
- }
- }
-
- return ApplyResult();
+ return ApplyResult{};
}
UpdateExecutor::ApplyResult ObjectReplaceExecutor::applyUpdate(ApplyParams applyParams) const {
- return applyReplacementUpdate(applyParams, _replacementDoc, _containsId);
-}
+ auto ret = applyReplacementUpdate(applyParams, _replacementDoc, _containsId);
+ if (!ret.noop && applyParams.logMode != ApplyParams::LogMode::kDoNotGenerateOplogEntry) {
+ ret.oplogEntry = update_oplog_entry::makeReplacementOplogEntry(
+ applyParams.element.getDocument().getObject());
+ }
+ return ret;
+}
} // namespace mongo
diff --git a/src/mongo/db/update/object_replace_executor.h b/src/mongo/db/update/object_replace_executor.h
index c2b38154560..4b7d8d87acb 100644
--- a/src/mongo/db/update/object_replace_executor.h
+++ b/src/mongo/db/update/object_replace_executor.h
@@ -45,9 +45,15 @@ namespace mongo {
class ObjectReplaceExecutor : public UpdateExecutor {
public:
- // Applies a replacement style update to 'applyParams.element'. If
- // 'replacementDocContainsIdField' is false then the _id field from the original document will
- // be preserved.
+ /**
+ * Applies a replacement style update to 'applyParams.element'.
+ *
+ * If 'replacementDocContainsIdField' is false then the _id field from the original document
+ * will be preserved.
+ *
+ * This function will ignore the log mode provided in 'applyParams'. The 'oplogEntry' field
+ * of the returned ApplyResult is always empty.
+ */
static ApplyResult applyReplacementUpdate(ApplyParams applyParams,
const BSONObj& replacementDoc,
bool replacementDocContainsIdField);
diff --git a/src/mongo/db/update/object_replace_executor_test.cpp b/src/mongo/db/update/object_replace_executor_test.cpp
index 6b0d93f6e46..0f7adf40df5 100644
--- a/src/mongo/db/update/object_replace_executor_test.cpp
+++ b/src/mongo/db/update/object_replace_executor_test.cpp
@@ -55,7 +55,7 @@ TEST_F(ObjectReplaceExecutorTest, Noop) {
ASSERT_FALSE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc);
ASSERT_TRUE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, ShouldNotCreateIdIfNoIdExistsAndNoneIsSpecified) {
@@ -68,7 +68,7 @@ TEST_F(ObjectReplaceExecutorTest, ShouldNotCreateIdIfNoIdExistsAndNoneIsSpecifie
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: 1, b: 2}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, ShouldPreserveIdOfExistingDocumentIfIdNotSpecified) {
@@ -81,7 +81,7 @@ TEST_F(ObjectReplaceExecutorTest, ShouldPreserveIdOfExistingDocumentIfIdNotSpeci
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{_id: 0, a: 1, b: 2}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{_id: 0, a: 1, b: 2}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0, a: 1, b: 2}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, ShouldSucceedWhenImmutableIdIsNotModified) {
@@ -95,7 +95,7 @@ TEST_F(ObjectReplaceExecutorTest, ShouldSucceedWhenImmutableIdIsNotModified) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{_id: 0, a: 1, b: 2}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{_id: 0, a: 1, b: 2}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0, a: 1, b: 2}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, IdTimestampNotModified) {
@@ -108,7 +108,7 @@ TEST_F(ObjectReplaceExecutorTest, IdTimestampNotModified) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{_id: Timestamp(0,0)}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{_id: Timestamp(0,0)}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: Timestamp(0,0)}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, NonIdTimestampsModified) {
@@ -135,7 +135,7 @@ TEST_F(ObjectReplaceExecutorTest, NonIdTimestampsModified) {
ASSERT_NOT_EQUALS(0U, elemB.getValueTimestamp().getInc());
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(doc, getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(doc.getObject(), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, ComplexDoc) {
@@ -148,7 +148,7 @@ TEST_F(ObjectReplaceExecutorTest, ComplexDoc) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 1, b: [0, 1, 2], c: {d: 1}}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{a: 1, b: [0, 1, 2], c: {d: 1}}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: 1, b: [0, 1, 2], c: {d: 1}}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, CannotRemoveImmutablePath) {
@@ -175,7 +175,7 @@ TEST_F(ObjectReplaceExecutorTest, IdFieldIsNotRemoved) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{_id: 0, a: 1}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{_id: 0, a: 1}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0, a: 1}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, CannotReplaceImmutablePathWithArrayField) {
@@ -241,7 +241,7 @@ TEST_F(ObjectReplaceExecutorTest, CanAddImmutableField) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: {b: 1}}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{a: {b: 1}}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: {b: 1}}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, CanAddImmutableId) {
@@ -255,7 +255,7 @@ TEST_F(ObjectReplaceExecutorTest, CanAddImmutableId) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{_id: 0}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{_id: 0}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, CannotCreateDollarPrefixedNameWhenValidateForStorageIsTrue) {
@@ -281,7 +281,7 @@ TEST_F(ObjectReplaceExecutorTest, CanCreateDollarPrefixedNameWhenValidateForStor
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: {b: 1, $bad: 1}}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{a: {b: 1, $bad: 1}}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: {b: 1, $bad: 1}}"), result.oplogEntry);
}
TEST_F(ObjectReplaceExecutorTest, NoLogBuilder) {
diff --git a/src/mongo/db/update/pipeline_executor.cpp b/src/mongo/db/update/pipeline_executor.cpp
index 2dd47908309..d12cfd7f292 100644
--- a/src/mongo/db/update/pipeline_executor.cpp
+++ b/src/mongo/db/update/pipeline_executor.cpp
@@ -31,11 +31,14 @@
#include "mongo/db/update/pipeline_executor.h"
+#include "mongo/bson/mutable/document.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/pipeline/document_source_queue.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
+#include "mongo/db/update/document_diff_calculator.h"
#include "mongo/db/update/object_replace_executor.h"
#include "mongo/db/update/storage_validation.h"
+#include "mongo/db/update/update_oplog_entry_serialization.h"
namespace mongo {
@@ -86,13 +89,39 @@ PipelineExecutor::PipelineExecutor(const boost::intrusive_ptr<ExpressionContext>
}
UpdateExecutor::ApplyResult PipelineExecutor::applyUpdate(ApplyParams applyParams) const {
+ const auto originalDoc = applyParams.element.getDocument().getObject();
+
DocumentSourceQueue* queueStage = static_cast<DocumentSourceQueue*>(_pipeline->peekFront());
- queueStage->emplace_back(Document{applyParams.element.getDocument().getObject()});
- auto transformedDoc = _pipeline->getNext()->toBson();
- auto transformedDocHasIdField = transformedDoc.hasField(kIdFieldName);
+ queueStage->emplace_back(Document{originalDoc});
+
+ const auto transformedDoc = _pipeline->getNext()->toBson();
+ const auto transformedDocHasIdField = transformedDoc.hasField(kIdFieldName);
- return ObjectReplaceExecutor::applyReplacementUpdate(
+ // Replace the pre-image document in applyParams with the post image we got from running the
+ // post image.
+ auto ret = ObjectReplaceExecutor::applyReplacementUpdate(
applyParams, transformedDoc, transformedDocHasIdField);
+ // The oplog entry should not have been populated yet.
+ invariant(ret.oplogEntry.isEmpty());
+
+ if (applyParams.logMode != ApplyParams::LogMode::kDoNotGenerateOplogEntry && !ret.noop) {
+ if (applyParams.logMode == ApplyParams::LogMode::kGenerateOplogEntry) {
+ // We're allowed to generate $v: 2 log entries.
+ const auto diff = doc_diff::computeDiff(originalDoc, transformedDoc);
+ if (diff) {
+ ret.oplogEntry = update_oplog_entry::makeDeltaOplogEntry(*diff);
+ return ret;
+ }
+ }
+
+ // Either we can't use diffing or diffing failed so fall back to full replacement. Set the
+ // replacement to the value set by the object replace executor, in case it changed _id or
+ // anything like that.
+ ret.oplogEntry = update_oplog_entry::makeReplacementOplogEntry(
+ applyParams.element.getDocument().getObject());
+ }
+
+ return ret;
}
Value PipelineExecutor::serialize() const {
diff --git a/src/mongo/db/update/pipeline_executor_test.cpp b/src/mongo/db/update/pipeline_executor_test.cpp
index 8faebb0d383..d9566bbd4f7 100644
--- a/src/mongo/db/update/pipeline_executor_test.cpp
+++ b/src/mongo/db/update/pipeline_executor_test.cpp
@@ -37,15 +37,47 @@
#include "mongo/db/json.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
+#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/update/update_node_test_fixture.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace {
-using PipelineExecutorTest = UpdateNodeTest;
-using mongo::mutablebson::countChildren;
-using mongo::mutablebson::Element;
+/**
+ * Harness for running the tests with both $v:2 oplog entries enabled and disabled.
+ */
+class PipelineExecutorTest : public UpdateNodeTest {
+public:
+ void resetApplyParams() override {
+ UpdateNodeTest::resetApplyParams();
+ }
+
+ UpdateExecutor::ApplyParams getApplyParams(mutablebson::Element element) override {
+ auto applyParams = UpdateNodeTest::getApplyParams(element);
+
+ // Use the same parameters as the parent test fixture, but make sure a v2 log builder
+ // is provided and a normal log builder is not.
+ applyParams.logMode = _allowDeltaOplogEntries
+ ? ApplyParams::LogMode::kGenerateOplogEntry
+ : ApplyParams::LogMode::kGenerateOnlyV1OplogEntry;
+ return applyParams;
+ }
+
+ void run() {
+ _allowDeltaOplogEntries = false;
+ UpdateNodeTest::run();
+ _allowDeltaOplogEntries = true;
+ UpdateNodeTest::run();
+ }
+
+ bool deltaOplogEntryAllowed() const {
+ return _allowDeltaOplogEntries;
+ }
+
+private:
+ bool _allowDeltaOplogEntries = false;
+};
TEST_F(PipelineExecutorTest, Noop) {
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
@@ -59,7 +91,7 @@ TEST_F(PipelineExecutorTest, Noop) {
ASSERT_FALSE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc);
ASSERT_TRUE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{}"), getLogDoc());
+ ASSERT_TRUE(result.oplogEntry.isEmpty());
}
TEST_F(PipelineExecutorTest, ShouldNotCreateIdIfNoIdExistsAndNoneIsSpecified) {
@@ -74,7 +106,11 @@ TEST_F(PipelineExecutorTest, ShouldNotCreateIdIfNoIdExistsAndNoneIsSpecified) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{c: 1, d: 2, a: 1, b: 2}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{c: 1, d: 2, a: 1, b: 2}"), getLogDoc());
+ if (deltaOplogEntryAllowed()) {
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{$v: 2, diff: {i: {a: 1, b: 2}}}"), result.oplogEntry);
+ } else {
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{c: 1, d: 2, a: 1, b: 2}"), result.oplogEntry);
+ }
}
TEST_F(PipelineExecutorTest, ShouldPreserveIdOfExistingDocumentIfIdNotReplaced) {
@@ -90,7 +126,7 @@ TEST_F(PipelineExecutorTest, ShouldPreserveIdOfExistingDocumentIfIdNotReplaced)
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{_id: 0, a: 1, b: 2}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{_id: 0, a: 1, b: 2}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0, a: 1, b: 2}"), result.oplogEntry);
}
TEST_F(PipelineExecutorTest, ShouldSucceedWhenImmutableIdIsNotModified) {
@@ -106,7 +142,11 @@ TEST_F(PipelineExecutorTest, ShouldSucceedWhenImmutableIdIsNotModified) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{_id: 0, c: 1, d: 2, a: 1, b: 2}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{_id: 0, c: 1, d: 2, a: 1, b: 2}"), getLogDoc());
+ if (deltaOplogEntryAllowed()) {
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{$v: 2, diff: {i: {a: 1, b: 2 }}}"), result.oplogEntry);
+ } else {
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0, c: 1, d: 2, a: 1, b: 2}"), result.oplogEntry);
+ }
}
TEST_F(PipelineExecutorTest, ComplexDoc) {
@@ -121,7 +161,7 @@ TEST_F(PipelineExecutorTest, ComplexDoc) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 1, b: [0, 1, 2], e: [], c: {d: 1}}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{a: 1, b: [0, 1, 2], e: [], c: {d: 1}}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: 1, b: [0, 1, 2], e: [], c: {d: 1}}"), result.oplogEntry);
}
TEST_F(PipelineExecutorTest, CannotRemoveImmutablePath) {
@@ -153,7 +193,7 @@ TEST_F(PipelineExecutorTest, IdFieldIsNotRemoved) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{_id: 0}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{_id: 0}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0}"), result.oplogEntry);
}
TEST_F(PipelineExecutorTest, CannotReplaceImmutablePathWithArrayField) {
@@ -229,7 +269,7 @@ TEST_F(PipelineExecutorTest, CanAddImmutableField) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{c: 1, a: {b: 1}}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{c: 1, a: {b: 1}}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{c: 1, a: {b: 1}}"), result.oplogEntry);
}
TEST_F(PipelineExecutorTest, CanAddImmutableId) {
@@ -245,7 +285,7 @@ TEST_F(PipelineExecutorTest, CanAddImmutableId) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{c: 1, _id: 0}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{c: 1, _id: 0}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{c: 1, _id: 0}"), result.oplogEntry);
}
TEST_F(PipelineExecutorTest, CannotCreateDollarPrefixedName) {
@@ -315,7 +355,7 @@ TEST_F(PipelineExecutorTest, CanUseConstants) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 1, b: 10, c : {x: 1, y: 2}}"), doc);
ASSERT_FALSE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{a: 1, b: 10, c : {x: 1, y: 2}}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: 1, b: 10, c : {x: 1, y: 2}}"), result.oplogEntry);
}
TEST_F(PipelineExecutorTest, CanUseConstantsAcrossMultipleUpdates) {
@@ -333,7 +373,7 @@ TEST_F(PipelineExecutorTest, CanUseConstantsAcrossMultipleUpdates) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 1, b: 'foo'}"), doc1);
ASSERT_FALSE(doc1.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{a: 1, b: 'foo'}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: 1, b: 'foo'}"), result.oplogEntry);
// Update second doc.
mutablebson::Document doc2(fromjson("{a: 2}"));
@@ -343,7 +383,7 @@ TEST_F(PipelineExecutorTest, CanUseConstantsAcrossMultipleUpdates) {
ASSERT_TRUE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 2, b: 'foo'}"), doc2);
ASSERT_FALSE(doc2.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{a: 2, b: 'foo'}"), getLogDoc());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: 2, b: 'foo'}"), result.oplogEntry);
}
TEST_F(PipelineExecutorTest, NoopWithConstants) {
@@ -359,7 +399,7 @@ TEST_F(PipelineExecutorTest, NoopWithConstants) {
ASSERT_FALSE(result.indexesAffected);
ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc);
ASSERT_TRUE(doc.isInPlaceModeEnabled());
- ASSERT_EQUALS(fromjson("{}"), getLogDoc());
+ ASSERT_TRUE(result.oplogEntry.isEmpty());
}
} // namespace
diff --git a/src/mongo/db/update/rename_node.cpp b/src/mongo/db/update/rename_node.cpp
index c4a7f896e6d..6f9119d41ac 100644
--- a/src/mongo/db/update/rename_node.cpp
+++ b/src/mongo/db/update/rename_node.cpp
@@ -240,9 +240,8 @@ UpdateExecutor::ApplyResult RenameNode::apply(ApplyParams applyParams,
ApplyParams unsetParams(applyParams);
unsetParams.element = fromElement;
- UpdateNodeApplyParams unsetUpdateNodeApplyParams;
- unsetUpdateNodeApplyParams.pathToCreate = std::make_shared<FieldRef>();
- unsetUpdateNodeApplyParams.pathTaken = fromFieldRef;
+ UpdateNodeApplyParams unsetUpdateNodeApplyParams{
+ std::make_shared<FieldRef>(), fromFieldRef, updateNodeApplyParams.logBuilder};
UnsetNode unsetElement;
auto unsetElementApplyResult = unsetElement.apply(unsetParams, unsetUpdateNodeApplyParams);
diff --git a/src/mongo/db/update/update_array_node.cpp b/src/mongo/db/update/update_array_node.cpp
index 53a4070e0ed..46d4f88828d 100644
--- a/src/mongo/db/update/update_array_node.cpp
+++ b/src/mongo/db/update/update_array_node.cpp
@@ -139,10 +139,11 @@ UpdateExecutor::ApplyResult UpdateArrayNode::apply(
auto childApplyParams = applyParams;
childApplyParams.element = childElement;
+ auto childUpdateNodeApplyParams = updateNodeApplyParams;
if (!childrenShouldLogThemselves) {
- childApplyParams.logBuilder = nullptr;
+ childApplyParams.logMode = ApplyParams::LogMode::kDoNotGenerateOplogEntry;
+ childUpdateNodeApplyParams.logBuilder = nullptr;
}
- auto childUpdateNodeApplyParams = updateNodeApplyParams;
auto childApplyResult =
mergedChild->apply(childApplyParams, childUpdateNodeApplyParams);
@@ -165,24 +166,25 @@ UpdateExecutor::ApplyResult UpdateArrayNode::apply(
}
// If the child updates have not been logged, log the updated array elements.
- if (!childrenShouldLogThemselves && applyParams.logBuilder) {
+ auto* const logBuilder = updateNodeApplyParams.logBuilder;
+ if (!childrenShouldLogThemselves && logBuilder) {
if (nModified > 1) {
// Log the entire array.
- auto logElement = applyParams.logBuilder->getDocument().makeElementWithNewFieldName(
+ auto logElement = logBuilder->getDocument().makeElementWithNewFieldName(
updateNodeApplyParams.pathTaken->dottedField(), applyParams.element);
invariant(logElement.ok());
- uassertStatusOK(applyParams.logBuilder->addToSets(logElement));
+ uassertStatusOK(logBuilder->addToSets(logElement));
} else if (nModified == 1) {
// Log the modified array element.
invariant(modifiedElement);
FieldRef::FieldRefTempAppend tempAppend(*(updateNodeApplyParams.pathTaken),
modifiedElement->getFieldName());
- auto logElement = applyParams.logBuilder->getDocument().makeElementWithNewFieldName(
+ auto logElement = logBuilder->getDocument().makeElementWithNewFieldName(
updateNodeApplyParams.pathTaken->dottedField(), *modifiedElement);
invariant(logElement.ok());
- uassertStatusOK(applyParams.logBuilder->addToSets(logElement));
+ uassertStatusOK(logBuilder->addToSets(logElement));
}
}
diff --git a/src/mongo/db/update/update_driver.cpp b/src/mongo/db/update/update_driver.cpp
index 9899fd44366..92dca56582f 100644
--- a/src/mongo/db/update/update_driver.cpp
+++ b/src/mongo/db/update/update_driver.cpp
@@ -39,37 +39,21 @@
#include "mongo/db/matcher/expression_leaf.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/server_options.h"
-#include "mongo/db/update/log_builder.h"
+#include "mongo/db/update/delta_executor.h"
#include "mongo/db/update/modifier_table.h"
#include "mongo/db/update/object_replace_executor.h"
#include "mongo/db/update/path_support.h"
#include "mongo/db/update/storage_validation.h"
+#include "mongo/stdx/variant.h"
#include "mongo/util/embedded_builder.h"
#include "mongo/util/str.h"
+#include "mongo/util/visit_helper.h"
namespace mongo {
using pathsupport::EqualityMatches;
namespace {
-
-StatusWith<UpdateSemantics> updateSemanticsFromElement(BSONElement element) {
- if (element.type() != BSONType::NumberInt && element.type() != BSONType::NumberLong) {
- return {ErrorCodes::BadValue, "'$v' (UpdateSemantics) field must be an integer."};
- }
-
- auto updateSemantics = element.numberLong();
-
- // As of 3.7, we only support one version of the update language.
- if (updateSemantics != static_cast<int>(UpdateSemantics::kUpdateNode)) {
- return {ErrorCodes::Error(40682),
- str::stream() << "Unrecognized value for '$v' (UpdateSemantics) field: "
- << updateSemantics};
- }
-
- return static_cast<UpdateSemantics>(updateSemantics);
-}
-
modifiertable::ModifierType validateMod(BSONElement mod) {
auto modType = modifiertable::getType(mod.fieldName());
@@ -104,16 +88,16 @@ bool parseUpdateExpression(
const std::map<StringData, std::unique_ptr<ExpressionWithPlaceholder>>& arrayFilters) {
bool positional = false;
std::set<std::string> foundIdentifiers;
- bool foundUpdateSemanticsField = false;
+ bool foundVersionField = false;
for (auto&& mod : updateExpr) {
// If there is a "$v" field among the modifiers, it should have already been used by the
// caller to determine that this is the correct parsing function.
- if (mod.fieldNameStringData() == LogBuilder::kUpdateSemanticsFieldName) {
- uassert(ErrorCodes::BadValue,
- "Duplicate $v in oplog update document",
- !foundUpdateSemanticsField);
- foundUpdateSemanticsField = true;
- invariant(mod.numberLong() == static_cast<long long>(UpdateSemantics::kUpdateNode));
+ if (mod.fieldNameStringData() == kUpdateOplogEntryVersionFieldName) {
+ uassert(
+ ErrorCodes::BadValue, "Duplicate $v in oplog update document", !foundVersionField);
+ foundVersionField = true;
+ invariant(mod.numberLong() ==
+ static_cast<long long>(UpdateOplogEntryVersion::kUpdateNodeV1));
continue;
}
@@ -158,6 +142,16 @@ void UpdateDriver::parse(
return;
}
+ if (updateMod.type() == write_ops::UpdateModification::Type::kDelta) {
+ uassert(4772603,
+ "arrayFilters may not be specified for delta-syle updates",
+ arrayFilters.empty());
+
+ _updateType = UpdateType::kDelta;
+ _updateExecutor = std::make_unique<DeltaExecutor>(updateMod.getDiff());
+ return;
+ }
+
uassert(51198, "Constant values may only be specified for pipeline updates", !constants);
// Check if the update expression is a full object replacement.
@@ -175,18 +169,21 @@ void UpdateDriver::parse(
invariant(_updateType == UpdateType::kOperator);
- // Some versions of mongod support more than one version of the update language and look for a
- // $v "UpdateSemantics" field when applying an oplog entry, in order to know which version of
- // the update language to apply with. We currently only support the 'kUpdateNode' version, but
- // we parse $v and check its value for compatibility.
+ // By this point we are expecting a "classic" update. This version of mongod only supports $v:
+ // 1 (modifier language) and $v: 2 (delta) (older versions support $v: 0). We've already
+ // checked whether this is a delta update so we check that the $v field isn't present, or has a
+ // value of 1.
+
auto updateExpr = updateMod.getUpdateClassic();
- BSONElement updateSemanticsElement = updateExpr[LogBuilder::kUpdateSemanticsFieldName];
- if (updateSemanticsElement) {
+ BSONElement versionElement = updateExpr[kUpdateOplogEntryVersionFieldName];
+ if (versionElement) {
uassert(ErrorCodes::FailedToParse,
"The $v update field is only recognized internally",
_fromOplogApplication);
- uassertStatusOK(updateSemanticsFromElement(updateSemanticsElement));
+ // The UpdateModification should have verified that the value of $v is valid.
+ invariant(versionElement.numberInt() ==
+ static_cast<int>(UpdateOplogEntryVersion::kUpdateNodeV1));
}
auto root = std::make_unique<UpdateObjectNode>();
@@ -253,12 +250,9 @@ Status UpdateDriver::update(StringData matchedField,
FieldRefSetWithStorage* modifiedPaths) {
// TODO: assert that update() is called at most once in a !_multi case.
- _affectIndices =
- ((_updateType == UpdateType::kReplacement || _updateType == UpdateType::kPipeline) &&
- (_indexedFields != nullptr));
+ _affectIndices = (_updateType != UpdateType::kOperator && _indexedFields != nullptr);
_logDoc.reset();
- LogBuilder logBuilder(_logDoc.root());
UpdateExecutor::ApplyParams applyParams(doc->root(), immutablePaths);
applyParams.matchedField = matchedField;
@@ -271,7 +265,9 @@ Status UpdateDriver::update(StringData matchedField,
invariant(!modifiedPaths || modifiedPaths->empty());
if (_logOp && logOpRec) {
- applyParams.logBuilder = &logBuilder;
+ applyParams.logMode = internalQueryEnableLoggingV2OplogEntries.load()
+ ? ApplyParams::LogMode::kGenerateOplogEntry
+ : ApplyParams::LogMode::kGenerateOnlyV1OplogEntry;
}
invariant(_updateExecutor);
@@ -283,22 +279,10 @@ Status UpdateDriver::update(StringData matchedField,
if (docWasModified) {
*docWasModified = !applyResult.noop;
}
- if (_updateType == UpdateType::kOperator && _logOp && logOpRec) {
- // If there are binVersion=3.6 mongod nodes in the replica set, they need to be told that
- // this update is using the "kUpdateNode" version of the update semantics and not the older
- // update semantics that could be used by a featureCompatibilityVersion=3.4 node.
- //
- // TODO (SERVER-32240): Once binVersion <= 3.6 nodes are not supported in a replica set, we
- // can safely elide this "$v" UpdateSemantics field from oplog entries, because there will
- // only one supported version, which all nodes will assume is in use.
- //
- // We also don't need to specify the semantics for a full document replacement (and there
- // would be no place to put a "$v" field in the update document).
- invariant(logBuilder.setUpdateSemantics(UpdateSemantics::kUpdateNode));
- }
- if (_logOp && logOpRec)
- *logOpRec = _logDoc.getObject();
+ if (_logOp && logOpRec && !applyResult.noop) {
+ *logOpRec = applyResult.oplogEntry;
+ }
return Status::OK();
}
diff --git a/src/mongo/db/update/update_driver.h b/src/mongo/db/update/update_driver.h
index df549a01c8d..fe035195cfb 100644
--- a/src/mongo/db/update/update_driver.h
+++ b/src/mongo/db/update/update_driver.h
@@ -56,7 +56,7 @@ class OperationContext;
class UpdateDriver {
public:
- enum class UpdateType { kOperator, kReplacement, kPipeline };
+ enum class UpdateType { kOperator, kReplacement, kPipeline, kDelta };
UpdateDriver(const boost::intrusive_ptr<ExpressionContext>& expCtx);
diff --git a/src/mongo/db/update/update_executor.h b/src/mongo/db/update/update_executor.h
index c1143a97f43..43d53575d7b 100644
--- a/src/mongo/db/update/update_executor.h
+++ b/src/mongo/db/update/update_executor.h
@@ -32,7 +32,6 @@
#include "mongo/bson/mutable/element.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/field_ref_set.h"
-#include "mongo/db/update/log_builder.h"
#include "mongo/db/update/update_node_visitor.h"
#include "mongo/db/update_index_data.h"
@@ -50,6 +49,23 @@ public:
* The parameters required by UpdateExecutor::applyUpdate.
*/
struct ApplyParams {
+ /**
+ * Enum indicating whether/what kind of oplog entry should be returned in the ApplyResult
+ * by the update executor.
+ */
+ enum class LogMode {
+ // Indicates that no oplog entry should be produced.
+ kDoNotGenerateOplogEntry,
+
+ // Indicates that the update executor should produce an oplog entry. Only the $v: 1
+ // format or replacement-style format may be used, however.
+ kGenerateOnlyV1OplogEntry,
+
+ // Indicates that the update executor should produce an oplog entry, and may use any
+ // format.
+ kGenerateOplogEntry
+ };
+
ApplyParams(mutablebson::Element element, const FieldRefSet& immutablePaths)
: element(element), immutablePaths(immutablePaths) {}
@@ -77,8 +93,10 @@ public:
// Used to determine whether indexes are affected.
const UpdateIndexData* indexData = nullptr;
- // If provided, UpdateNode::apply will log the update here.
- LogBuilder* logBuilder = nullptr;
+ // Indicates whether/what type of oplog entry should be produced by the update executor.
+ // If 'logMode' indicates an oplog entry should be produced but the update turns out to be
+ // a noop, an oplog entry may not be produced.
+ LogMode logMode = LogMode::kDoNotGenerateOplogEntry;
// If provided, UpdateNode::apply will populate this with a path to each modified field.
FieldRefSetWithStorage* modifiedPaths = nullptr;
@@ -97,6 +115,11 @@ public:
bool indexesAffected = true;
bool noop = false;
+
+ // The oplog entry to log. This is only populated if the operation is not considered a
+ // noop and if the 'logMode' provided in ApplyParams indicates that an oplog entry should
+ // be generated.
+ BSONObj oplogEntry;
};
diff --git a/src/mongo/db/update/update_node.h b/src/mongo/db/update/update_node.h
index cca2bea0105..59b8e513902 100644
--- a/src/mongo/db/update/update_node.h
+++ b/src/mongo/db/update/update_node.h
@@ -82,6 +82,10 @@ public:
// For example, if the update is {$set: {'a.b.c': 5}}, and the document is {a: {}}, then at
// the leaf node, 'pathTaken'="a".
std::shared_ptr<FieldRef> pathTaken = std::make_shared<FieldRef>();
+
+ // Builder object used for constructing an oplog entry. A value of nullptr indicates that
+ // no oplog entry needs to be constructed.
+ LogBuilder* logBuilder = nullptr;
};
explicit UpdateNode(Type type, Context context = Context::kAll)
diff --git a/src/mongo/db/update/update_node_test_fixture.h b/src/mongo/db/update/update_node_test_fixture.h
index e0b448fd9e4..28bc2cb9721 100644
--- a/src/mongo/db/update/update_node_test_fixture.h
+++ b/src/mongo/db/update/update_node_test_fixture.h
@@ -51,7 +51,7 @@ protected:
mongo::LogicalClock::set(service, std::move(logicalClock));
}
- void resetApplyParams() {
+ virtual void resetApplyParams() {
_immutablePathsVector.clear();
_immutablePaths.clear();
_pathToCreate = std::make_shared<FieldRef>();
@@ -66,15 +66,15 @@ protected:
_modifiedPaths.clear();
}
- UpdateExecutor::ApplyParams getApplyParams(mutablebson::Element element) {
+ virtual UpdateExecutor::ApplyParams getApplyParams(mutablebson::Element element) {
UpdateExecutor::ApplyParams applyParams(element, _immutablePaths);
applyParams.matchedField = _matchedField;
applyParams.insert = _insert;
applyParams.fromOplogApplication = _fromOplogApplication;
applyParams.validateForStorage = _validateForStorage;
applyParams.indexData = _indexData.get();
- applyParams.logBuilder = _logBuilder.get();
applyParams.modifiedPaths = &_modifiedPaths;
+ applyParams.logMode = ApplyParams::LogMode::kGenerateOplogEntry;
return applyParams;
}
@@ -82,6 +82,7 @@ protected:
UpdateNode::UpdateNodeApplyParams applyParams;
applyParams.pathToCreate = _pathToCreate;
applyParams.pathTaken = _pathTaken;
+ applyParams.logBuilder = _logBuilder.get();
return applyParams;
}
diff --git a/src/mongo/db/update/update_oplog_entry_serialization.h b/src/mongo/db/update/update_oplog_entry_serialization.h
new file mode 100644
index 00000000000..286c6e3b756
--- /dev/null
+++ b/src/mongo/db/update/update_oplog_entry_serialization.h
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/update/document_diff_serialization.h"
+#include "mongo/db/update/update_oplog_entry_version.h"
+
+/**
+ * This provides helpers for creating oplog entries. To create a $v: 1 modifier-style oplog
+ * entry, a LogBuilder must be used instead.
+ */
+namespace mongo::update_oplog_entry {
+
+/**
+ * Given a diff, produce the contents for the 'o' field of a $v: 2 delta-style oplog entry.
+ */
+inline BSONObj makeDeltaOplogEntry(const doc_diff::Diff& diff) {
+ BSONObjBuilder builder;
+ builder.append("$v", static_cast<int>(UpdateOplogEntryVersion::kDeltaV2));
+ builder.append("diff", diff);
+ return builder.obj();
+}
+
+/**
+ * Produce the contents of the 'o' field of a replacement style oplog entry.
+ */
+inline BSONObj makeReplacementOplogEntry(const BSONObj& replacement) {
+ return replacement;
+}
+} // namespace mongo::update_oplog_entry
diff --git a/src/mongo/db/update/update_oplog_entry_version.h b/src/mongo/db/update/update_oplog_entry_version.h
new file mode 100644
index 00000000000..8ae9ec069fd
--- /dev/null
+++ b/src/mongo/db/update/update_oplog_entry_version.h
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/string_data.h"
+
+namespace mongo {
+
+static inline constexpr StringData kUpdateOplogEntryVersionFieldName = "$v"_sd;
+
+/**
+ * There are multiple types of 'u' (update) oplog entries. The type of an entry is indicated using
+ * a field called $v.
+ *
+ * The values in this enum *MUST* not change unless we remove support for a type of update.
+ */
+enum class UpdateOplogEntryVersion {
+ // Ancient update system which was deleted in 4.0. We still reserve its version number.
+ kRemovedV0 = 0,
+
+ // The update system introduced in v3.6. When a single update adds multiple fields, those
+ // fields are added in lexicographic order by field name. This system introduces support for
+ // arrayFilters and $[] syntax.
+ kUpdateNodeV1 = 1,
+
+ // Delta style update, introduced in 4.6. When a pipeline based update is executed, the pre and
+ // post images are diffed, producing a delta. The delta is recorded in the oplog. On
+ // secondaries, the delta is applied to the pre-image to recover the post image.
+ //
+ // Delta style updates cannot be executed directly by users.
+ kDeltaV2 = 2,
+
+ // Must be last.
+ kNumVersions
+};
+} // namespace mongo
diff --git a/src/mongo/db/update/update_tree_executor.h b/src/mongo/db/update/update_tree_executor.h
index e10bccc50c1..e04eef0a6d4 100644
--- a/src/mongo/db/update/update_tree_executor.h
+++ b/src/mongo/db/update/update_tree_executor.h
@@ -42,8 +42,37 @@ public:
: _updateTree(std::move(node)) {}
ApplyResult applyUpdate(ApplyParams applyParams) const final {
+ mutablebson::Document logDocument;
+ boost::optional<LogBuilder> optLogBuilder;
+ const bool generateOplogEntry =
+ applyParams.logMode != ApplyParams::LogMode::kDoNotGenerateOplogEntry;
+ if (generateOplogEntry) {
+ optLogBuilder.emplace(logDocument.root());
+ }
+
UpdateNode::UpdateNodeApplyParams updateNodeApplyParams;
- return _updateTree->apply(applyParams, updateNodeApplyParams);
+ updateNodeApplyParams.logBuilder = optLogBuilder.get_ptr();
+
+ auto ret = _updateTree->apply(applyParams, updateNodeApplyParams);
+
+ if (generateOplogEntry) {
+ // In versions since 3.6, the absence of a $v field indicates either a
+ // replacement-style update or a "classic" modifier-style update.
+ //
+ // Since 3.6, the presence of a $v field with value 1 may also indicate that the oplog
+ // entry is a "classic" modifier-style update.
+ //
+ // While we could elide this $v field when providing a value of 1, we continue to log
+ // it because:
+ // (a) It avoids an unnecessary oplog format change.
+ // (b) It is easy to distinguish from $v: 2 delta-style oplog entries.
+ invariant(optLogBuilder->setVersion(UpdateOplogEntryVersion::kUpdateNodeV1));
+
+ invariant(ret.oplogEntry.isEmpty());
+ ret.oplogEntry = logDocument.getObject();
+ }
+
+ return ret;
}
UpdateNode* getUpdateTree() {
diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp
index b72f47ff162..71c7d8349f3 100644
--- a/src/mongo/shell/bench.cpp
+++ b/src/mongo/shell/bench.cpp
@@ -1187,6 +1187,11 @@ void BenchRunOp::executeOnce(DBClientBase* conn,
pipelineBuilder.doneFast();
break;
}
+ case write_ops::UpdateModification::Type::kDelta:
+ // It is not possible to run a delta update directly from a client.
+ // Delta updates are only executed on secondaries as part of oplog
+ // application.
+ MONGO_UNREACHABLE;
}
singleUpdate.append("multi", this->multi);
singleUpdate.append("upsert", this->upsert);