// Tests that the $changeStream requires read concern majority. (function() { "use strict"; load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. load("jstests/libs/write_concern_util.js"); // For stopReplicationOnSecondaries. const rst = new ReplSetTest({nodes: 2, nodeOptions: {enableMajorityReadConcern: ""}}); // Skip this test if running with --nojournal and WiredTiger. if (jsTest.options().noJournal && (!jsTest.options().storageEngine || jsTest.options().storageEngine === "wiredTiger")) { print("Skipping test because running WiredTiger without journaling isn't a valid" + " replica set configuration"); return; } if (!startSetIfSupportsReadMajority(rst)) { jsTestLog("Skipping test since storage engine doesn't support majority read concern."); rst.stopSet(); return; } rst.initiate(); const name = "change_stream_require_majority_read_concern"; const db = rst.getPrimary().getDB(name); function getCollectionNameFromFullNamespace(ns) { return ns.split(/\.(.+)/)[1]; } // Use ChangeStreamTest to verify that the pipeline returns expected results. const cst = new ChangeStreamTest(db); // Attempts to get a document from the cursor with awaitData disabled, and asserts if a // document is present. function assertNextBatchIsEmpty(cursor) { assert.commandWorked(db.adminCommand( {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); let res = assert.commandWorked(db.runCommand({ getMore: cursor.id, collection: getCollectionNameFromFullNamespace(cursor.ns), batchSize: 1 })); assert.eq(res.cursor.nextBatch.length, 0); assert.commandWorked( db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); } // Test read concerns other than "majority" are not supported. const primaryColl = db.foo; assert.writeOK(primaryColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); let res = primaryColl.runCommand({ aggregate: primaryColl.getName(), pipeline: [{$changeStream: {}}], cursor: {}, readConcern: {level: "local"}, }); assert.commandFailedWithCode(res, ErrorCodes.InvalidOptions); res = primaryColl.runCommand({ aggregate: primaryColl.getName(), pipeline: [{$changeStream: {}}], cursor: {}, readConcern: {level: "linearizable"}, }); assert.commandFailedWithCode(res, ErrorCodes.InvalidOptions); // Test that explicit read concern "majority" works. res = primaryColl.runCommand({ aggregate: primaryColl.getName(), pipeline: [{$changeStream: {}}], cursor: {}, readConcern: {level: "majority"}, }); assert.commandWorked(res); // Test not specifying readConcern defaults to "majority" read concern. stopReplicationOnSecondaries(rst); // Verify that the document just inserted cannot be returned. let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: primaryColl}); assert.eq(cursor.firstBatch.length, 0); // Insert a document on the primary only. assert.writeOK(primaryColl.insert({_id: 2}, {writeConcern: {w: 1}})); assertNextBatchIsEmpty(cursor); // Restart data replicaiton and wait until the new write becomes visible. restartReplicationOnSecondaries(rst); rst.awaitLastOpCommitted(); // Verify that the expected doc is returned because it has been committed. let doc = cst.getOneChange(cursor); assert.docEq(doc.operationType, "insert"); assert.docEq(doc.fullDocument, {_id: 2}); rst.stopSet(); }());