diff options
-rw-r--r-- | etc/backports_required_for_multiversion_tests.yml | 2 | ||||
-rw-r--r-- | jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js | 98 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 14 |
3 files changed, 109 insertions, 5 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 9cb2282e9f0..e1810f67d19 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -61,6 +61,8 @@ replica_sets_multiversion: test_file: jstests/replsets/reconfig_avoids_diverging_configs.js - ticket: SERVER-45081 test_file: jstests/replsets/reconfig_waits_for_oplog_commitment_condition.js +- ticket: SERVER-44500 + test_file: jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js sharding_multiversion: diff --git a/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js b/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js new file mode 100644 index 00000000000..344c7e119f9 --- /dev/null +++ b/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js @@ -0,0 +1,98 @@ +/** + * Tests that insert oplog entries created by applyOps commands do not contain the 'fromMigrate' + * field. Additionally tests that non-atomic applyOps inserts should be returned by changeStreams. + */ +(function() { +'use strict'; +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + +const rst = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0, votes: 0}}]}); +rst.startSet(); +rst.initiate(); + +function nss(dbName, collName) { + return `${dbName}.${collName}`; +} + +const dbName = 'foo'; +const collName = 'coll'; +const primary = rst.getPrimary(); +const secondary = rst.getSecondary(); +const primaryDB = primary.getDB(dbName); + +const primaryCST = new ChangeStreamTest(primary.getDB("admin")); +const primaryChangeStream = primaryCST.startWatchingAllChangesForCluster(); +const secondaryCST = new ChangeStreamTest(secondary.getDB("admin")); +const secondaryChangeStream = secondaryCST.startWatchingAllChangesForCluster(); + +primaryDB.createCollection(collName); + +// Test non-atomic applyOps inserts. +assert.commandWorked(primaryDB.runCommand( + {applyOps: [{op: "i", ns: nss(dbName, collName), o: {_id: 0}}], allowAtomic: false})); +assert.commandWorked(primaryDB.runCommand({ + applyOps: [ + {op: "i", ns: nss(dbName, collName), o: {_id: 1}}, + {op: "c", ns: nss(dbName, "$cmd"), o: {create: "other"}} + ] +})); + +// Test non-atomic applyOps upserts. These will be logged as insert oplog entries. +assert.commandWorked(primaryDB.runCommand({ + applyOps: [{op: "u", ns: nss(dbName, collName), o2: {_id: 2}, o: {$set: {x: 2}}}], + allowAtomic: false +})); + +assert.commandWorked(primaryDB.runCommand({ + applyOps: [ + {op: "u", ns: nss(dbName, collName), o2: {_id: 3}, o: {$set: {x: 3}}}, + {op: "c", ns: nss(dbName, "$cmd"), o: {create: "other2"}} + ] +})); + +// Test atomic applyOps inserts. +assert.commandWorked( + primaryDB.runCommand({applyOps: [{op: "i", ns: nss(dbName, collName), o: {_id: 4}}]})); +assert.commandWorked(primaryDB.runCommand({ + applyOps: [ + {op: "i", ns: nss(dbName, collName), o: {_id: 5}}, + {op: "i", ns: nss(dbName, collName), o: {_id: 6}}, + ] +})); +rst.awaitReplication(); + +assert.eq(7, primaryDB[collName].find().toArray().length); + +let expectedCount = 0; +const oplog = rst.getPrimary().getDB("local").getCollection("oplog.rs"); +const nonAtomicResults = oplog.find({ns: nss(dbName, collName)}).toArray(); +assert.eq(nonAtomicResults.length, 4, nonAtomicResults); +nonAtomicResults.forEach(function(op) { + // We expect non-atomic applyOps inserts to be picked up by changeStreams. + const primaryChange = primaryCST.getOneChange(primaryChangeStream); + assert.eq(primaryChange.documentKey._id, expectedCount, primaryChange); + const secondaryChange = secondaryCST.getOneChange(secondaryChangeStream); + assert.eq(secondaryChange.documentKey._id, expectedCount, secondaryChange); + + assert.eq(op.o._id, expectedCount++, nonAtomicResults); + assert(!op.hasOwnProperty("fromMigrate"), nonAtomicResults); +}); + +// TODO (SERVER-33182): Remove the atomic applyOps testing once atomic applyOps are removed. +// Atomic applyOps inserts are not expected to be picked up by changeStreams. +primaryCST.assertNoChange(primaryChangeStream); +secondaryCST.assertNoChange(secondaryChangeStream); +// We expect the operations from an atomic applyOps command to be nested in an applyOps oplog entry. +const atomicResults = oplog.find({"o.applyOps": {$exists: true}}).toArray(); +assert.eq(atomicResults.length, 2, atomicResults); +for (let i = 0; i < atomicResults.length; i++) { + let ops = atomicResults[i].o.applyOps; + ops.forEach(function(op) { + assert.eq(op.o._id, expectedCount++, atomicResults); + assert(!op.hasOwnProperty("fromMigrate"), atomicResults); + }); +} +assert.eq(7, expectedCount); + +rst.stopSet(); +})(); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index eb9b8953415..7e10d3762d7 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1081,7 +1081,6 @@ Status applyOperation_inlock(OperationContext* opCtx, str::stream() << "Failed to apply insert due to missing collection: " << redact(opOrGroupedInserts.toBSON()), collection); - if (opOrGroupedInserts.isGroupedInserts()) { // Grouped inserts. @@ -1101,8 +1100,11 @@ Status applyOperation_inlock(OperationContext* opCtx, WriteUnitOfWork wuow(opCtx); OpDebug* const nullOpDebug = nullptr; - Status status = collection->insertDocuments( - opCtx, insertObjs.begin(), insertObjs.end(), nullOpDebug, true); + Status status = collection->insertDocuments(opCtx, + insertObjs.begin(), + insertObjs.end(), + nullOpDebug, + false /* fromMigrate */); if (!status.isOK()) { return status; } @@ -1177,8 +1179,10 @@ Status applyOperation_inlock(OperationContext* opCtx, } OpDebug* const nullOpDebug = nullptr; - Status status = collection->insertDocument( - opCtx, InsertStatement(o, timestamp, term), nullOpDebug, true); + Status status = collection->insertDocument(opCtx, + InsertStatement(o, timestamp, term), + nullOpDebug, + false /* fromMigrate */); if (status.isOK()) { wuow.commit(); |