summaryrefslogtreecommitdiff
path: root/jstests/replsets/change_stream_speculative_majority_rollback.js
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2018-12-21 16:33:36 -0500
committerWilliam Schultz <william.schultz@mongodb.com>2018-12-21 16:43:58 -0500
commit435200964a1fc9de566e74bd579e99c308551670 (patch)
treef6d4e2723f794200b900c914818c8d9645df50f0 /jstests/replsets/change_stream_speculative_majority_rollback.js
parentf15556ae1ba4f78d2823d54e38d7025c7e9ca4fb (diff)
downloadmongo-435200964a1fc9de566e74bd579e99c308551670.tar.gz
SERVER-37560 Allow change streams to work with speculative majority reads
This patch allows change stream queries to use speculative majority reads so that they can be used even when enableMajorityReadConcern:false. Change stream aggregation commands are allowed to use speculative majority reads as well as 'find' commands that specify a special flag. This commit also enables all change streams tests and suites on the enableMajorityReadConcern:false Evergreen variant. No optimizations for speculative majority change streams are included in this commit.
Diffstat (limited to 'jstests/replsets/change_stream_speculative_majority_rollback.js')
-rw-r--r--jstests/replsets/change_stream_speculative_majority_rollback.js104
1 files changed, 104 insertions, 0 deletions
diff --git a/jstests/replsets/change_stream_speculative_majority_rollback.js b/jstests/replsets/change_stream_speculative_majority_rollback.js
new file mode 100644
index 00000000000..104269ff8a3
--- /dev/null
+++ b/jstests/replsets/change_stream_speculative_majority_rollback.js
@@ -0,0 +1,104 @@
+/**
+ * Test change stream behavior with speculative majority reads in the face of replication rollback.
+ */
+(function() {
+ 'use strict';
+
+ load("jstests/replsets/libs/rollback_test.js"); // for RollbackTest.
+
+ // Disable implicit sessions so it's easy to run commands from different threads.
+ TestData.disableImplicitSessions = true;
+
+ const name = "change_stream_speculative_majority_rollback";
+ const dbName = name;
+ const collName = "coll";
+
+ // Set up a replica set for use in RollbackTest. We disable majority reads on all nodes so we
+ // will utilize speculative majority reads for change streams.
+ const replTest = new ReplSetTest(
+ {name, nodes: 3, useBridge: true, nodeOptions: {enableMajorityReadConcern: "false"}});
+ replTest.startSet();
+ const nodes = replTest.nodeList();
+ replTest.initiate({
+ _id: name,
+ members: [
+ {_id: 0, host: nodes[0]},
+ {_id: 1, host: nodes[1]},
+ {_id: 2, host: nodes[2], arbiterOnly: true}
+ ]
+ });
+
+ const rollbackTest = new RollbackTest(name, replTest);
+ const primary = rollbackTest.getPrimary();
+ const primaryDB = primary.getDB(dbName);
+ let coll = primaryDB[collName];
+
+ // Create a collection.
+ assert.commandWorked(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
+
+ // Open a change stream on the initial primary.
+ let res =
+ primaryDB.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}});
+ assert.commandWorked(res);
+ let cursorId = res.cursor.id;
+
+ // Receive an initial change event and save the resume token.
+ assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ res = primaryDB.runCommand({getMore: cursorId, collection: collName});
+ let changes = res.cursor.nextBatch;
+ assert.eq(changes.length, 1);
+ assert.eq(changes[0]["fullDocument"], {_id: 1});
+ assert.eq(changes[0]["operationType"], "insert");
+ let resumeToken = changes[0]["_id"];
+
+ let rollbackNode = rollbackTest.transitionToRollbackOperations();
+ assert.eq(rollbackNode, primary);
+
+ // Insert a few items that will be rolled back.
+ assert.commandWorked(coll.insert({_id: 2}));
+ assert.commandWorked(coll.insert({_id: 3}));
+ assert.commandWorked(coll.insert({_id: 4}));
+
+ let getChangeEvent = new ScopedThread(function(host, cursorId, dbName, collName) {
+ jsTestLog("Trying to receive change event from divergent primary.");
+ const nodeDB = new Mongo(host).getDB(dbName);
+ try {
+ return nodeDB.runCommand({getMore: eval(cursorId), collection: collName});
+ } catch (e) {
+ return isNetworkError(e);
+ }
+ }, rollbackNode.host, tojson(cursorId), dbName, collName);
+ getChangeEvent.start();
+
+ // Make sure the change stream query started.
+ assert.soon(() => primaryDB.currentOp({"command.getMore": cursorId}).inprog.length === 1);
+
+ // Do some operations on the new primary that we can receive in a resumed stream.
+ let syncSource = rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
+ coll = syncSource.getDB(dbName)[collName];
+ assert.commandWorked(coll.insert({_id: 5}));
+ assert.commandWorked(coll.insert({_id: 6}));
+ assert.commandWorked(coll.insert({_id: 7}));
+
+ // Let rollback begin and complete.
+ rollbackTest.transitionToSyncSourceOperationsDuringRollback();
+ rollbackTest.transitionToSteadyStateOperations();
+
+ // The change stream query should have failed when the node entered rollback.
+ assert(getChangeEvent.returnData());
+
+ jsTestLog("Resuming change stream against new primary.");
+ res = syncSource.getDB(dbName).runCommand(
+ {aggregate: collName, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}});
+ changes = res.cursor.firstBatch;
+ assert.eq(changes.length, 3);
+ assert.eq(changes[0]["fullDocument"], {_id: 5});
+ assert.eq(changes[0]["operationType"], "insert");
+ assert.eq(changes[1]["fullDocument"], {_id: 6});
+ assert.eq(changes[1]["operationType"], "insert");
+ assert.eq(changes[2]["fullDocument"], {_id: 7});
+ assert.eq(changes[2]["operationType"], "insert");
+
+ rollbackTest.stop();
+
+})();