summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2020-03-19 20:37:48 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-27 19:55:48 +0000
commit7e12da83457ca5a8e76c7d6f39d6b9bbfad3be92 (patch)
tree125c895fff514d206b2bea83016eda92cc37982d
parent6dc4c266ee09b1d6a5a73ebb30dbdb5b5be3d588 (diff)
downloadmongo-7e12da83457ca5a8e76c7d6f39d6b9bbfad3be92.tar.gz
SERVER-44450 Do not add fromMigrate field to applyOps insert oplog entries
-rw-r--r--etc/backports_required_for_multiversion_tests.yml2
-rw-r--r--jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js98
-rw-r--r--src/mongo/db/repl/oplog.cpp14
3 files changed, 109 insertions, 5 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 9cb2282e9f0..e1810f67d19 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -61,6 +61,8 @@ replica_sets_multiversion:
test_file: jstests/replsets/reconfig_avoids_diverging_configs.js
- ticket: SERVER-45081
test_file: jstests/replsets/reconfig_waits_for_oplog_commitment_condition.js
+- ticket: SERVER-44500
+ test_file: jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js
sharding_multiversion:
diff --git a/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js b/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js
new file mode 100644
index 00000000000..344c7e119f9
--- /dev/null
+++ b/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js
@@ -0,0 +1,98 @@
+/**
+ * Tests that insert oplog entries created by applyOps commands do not contain the 'fromMigrate'
+ * field. Additionally tests that non-atomic applyOps inserts should be returned by changeStreams.
+ */
+(function() {
+'use strict';
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+
+const rst = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0, votes: 0}}]});
+rst.startSet();
+rst.initiate();
+
+function nss(dbName, collName) {
+ return `${dbName}.${collName}`;
+}
+
+const dbName = 'foo';
+const collName = 'coll';
+const primary = rst.getPrimary();
+const secondary = rst.getSecondary();
+const primaryDB = primary.getDB(dbName);
+
+const primaryCST = new ChangeStreamTest(primary.getDB("admin"));
+const primaryChangeStream = primaryCST.startWatchingAllChangesForCluster();
+const secondaryCST = new ChangeStreamTest(secondary.getDB("admin"));
+const secondaryChangeStream = secondaryCST.startWatchingAllChangesForCluster();
+
+primaryDB.createCollection(collName);
+
+// Test non-atomic applyOps inserts.
+assert.commandWorked(primaryDB.runCommand(
+ {applyOps: [{op: "i", ns: nss(dbName, collName), o: {_id: 0}}], allowAtomic: false}));
+assert.commandWorked(primaryDB.runCommand({
+ applyOps: [
+ {op: "i", ns: nss(dbName, collName), o: {_id: 1}},
+ {op: "c", ns: nss(dbName, "$cmd"), o: {create: "other"}}
+ ]
+}));
+
+// Test non-atomic applyOps upserts. These will be logged as insert oplog entries.
+assert.commandWorked(primaryDB.runCommand({
+ applyOps: [{op: "u", ns: nss(dbName, collName), o2: {_id: 2}, o: {$set: {x: 2}}}],
+ allowAtomic: false
+}));
+
+assert.commandWorked(primaryDB.runCommand({
+ applyOps: [
+ {op: "u", ns: nss(dbName, collName), o2: {_id: 3}, o: {$set: {x: 3}}},
+ {op: "c", ns: nss(dbName, "$cmd"), o: {create: "other2"}}
+ ]
+}));
+
+// Test atomic applyOps inserts.
+assert.commandWorked(
+ primaryDB.runCommand({applyOps: [{op: "i", ns: nss(dbName, collName), o: {_id: 4}}]}));
+assert.commandWorked(primaryDB.runCommand({
+ applyOps: [
+ {op: "i", ns: nss(dbName, collName), o: {_id: 5}},
+ {op: "i", ns: nss(dbName, collName), o: {_id: 6}},
+ ]
+}));
+rst.awaitReplication();
+
+assert.eq(7, primaryDB[collName].find().toArray().length);
+
+let expectedCount = 0;
+const oplog = rst.getPrimary().getDB("local").getCollection("oplog.rs");
+const nonAtomicResults = oplog.find({ns: nss(dbName, collName)}).toArray();
+assert.eq(nonAtomicResults.length, 4, nonAtomicResults);
+nonAtomicResults.forEach(function(op) {
+ // We expect non-atomic applyOps inserts to be picked up by changeStreams.
+ const primaryChange = primaryCST.getOneChange(primaryChangeStream);
+ assert.eq(primaryChange.documentKey._id, expectedCount, primaryChange);
+ const secondaryChange = secondaryCST.getOneChange(secondaryChangeStream);
+ assert.eq(secondaryChange.documentKey._id, expectedCount, secondaryChange);
+
+ assert.eq(op.o._id, expectedCount++, nonAtomicResults);
+ assert(!op.hasOwnProperty("fromMigrate"), nonAtomicResults);
+});
+
+// TODO (SERVER-33182): Remove the atomic applyOps testing once atomic applyOps are removed.
+// Atomic applyOps inserts are not expected to be picked up by changeStreams.
+primaryCST.assertNoChange(primaryChangeStream);
+secondaryCST.assertNoChange(secondaryChangeStream);
+// We expect the operations from an atomic applyOps command to be nested in an applyOps oplog entry.
+const atomicResults = oplog.find({"o.applyOps": {$exists: true}}).toArray();
+assert.eq(atomicResults.length, 2, atomicResults);
+for (let i = 0; i < atomicResults.length; i++) {
+ let ops = atomicResults[i].o.applyOps;
+ ops.forEach(function(op) {
+ assert.eq(op.o._id, expectedCount++, atomicResults);
+ assert(!op.hasOwnProperty("fromMigrate"), atomicResults);
+ });
+}
+assert.eq(7, expectedCount);
+
+rst.stopSet();
+})();
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index eb9b8953415..7e10d3762d7 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1081,7 +1081,6 @@ Status applyOperation_inlock(OperationContext* opCtx,
str::stream() << "Failed to apply insert due to missing collection: "
<< redact(opOrGroupedInserts.toBSON()),
collection);
-
if (opOrGroupedInserts.isGroupedInserts()) {
// Grouped inserts.
@@ -1101,8 +1100,11 @@ Status applyOperation_inlock(OperationContext* opCtx,
WriteUnitOfWork wuow(opCtx);
OpDebug* const nullOpDebug = nullptr;
- Status status = collection->insertDocuments(
- opCtx, insertObjs.begin(), insertObjs.end(), nullOpDebug, true);
+ Status status = collection->insertDocuments(opCtx,
+ insertObjs.begin(),
+ insertObjs.end(),
+ nullOpDebug,
+ false /* fromMigrate */);
if (!status.isOK()) {
return status;
}
@@ -1177,8 +1179,10 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
OpDebug* const nullOpDebug = nullptr;
- Status status = collection->insertDocument(
- opCtx, InsertStatement(o, timestamp, term), nullOpDebug, true);
+ Status status = collection->insertDocument(opCtx,
+ InsertStatement(o, timestamp, term),
+ nullOpDebug,
+ false /* fromMigrate */);
if (status.isOK()) {
wuow.commit();