summaryrefslogtreecommitdiff
path: root/jstests/replsets/change_stream_speculative_majority_rollback.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/replsets/change_stream_speculative_majority_rollback.js')
-rw-r--r--jstests/replsets/change_stream_speculative_majority_rollback.js194
1 files changed, 96 insertions, 98 deletions
diff --git a/jstests/replsets/change_stream_speculative_majority_rollback.js b/jstests/replsets/change_stream_speculative_majority_rollback.js
index 2c8aa9492af..06e4fccc51d 100644
--- a/jstests/replsets/change_stream_speculative_majority_rollback.js
+++ b/jstests/replsets/change_stream_speculative_majority_rollback.js
@@ -4,102 +4,100 @@
* @tags: [uses_speculative_majority]
*/
(function() {
- 'use strict';
-
- load("jstests/replsets/libs/rollback_test.js"); // for RollbackTest.
-
- // Disable implicit sessions so it's easy to run commands from different threads.
- TestData.disableImplicitSessions = true;
-
- const name = "change_stream_speculative_majority_rollback";
- const dbName = name;
- const collName = "coll";
-
- // Set up a replica set for use in RollbackTest. We disable majority reads on all nodes so we
- // will utilize speculative majority reads for change streams.
- const replTest = new ReplSetTest({
- name,
- nodes: 3,
- useBridge: true,
- settings: {chainingAllowed: false},
- nodeOptions: {enableMajorityReadConcern: "false"}
- });
- replTest.startSet();
- let config = replTest.getReplSetConfig();
- config.members[2].priority = 0;
- replTest.initiate(config);
-
- const rollbackTest = new RollbackTest(name, replTest);
- const primary = rollbackTest.getPrimary();
- const primaryDB = primary.getDB(dbName);
- let coll = primaryDB[collName];
-
- // Create a collection.
- assert.commandWorked(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
-
- // Open a change stream on the initial primary.
- let res =
- primaryDB.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}});
- assert.commandWorked(res);
- let cursorId = res.cursor.id;
-
- // Receive an initial change event and save the resume token.
- assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
- res = primaryDB.runCommand({getMore: cursorId, collection: collName});
- let changes = res.cursor.nextBatch;
- assert.eq(changes.length, 1);
- assert.eq(changes[0]["fullDocument"], {_id: 1});
- assert.eq(changes[0]["operationType"], "insert");
- let resumeToken = changes[0]["_id"];
-
- let rollbackNode = rollbackTest.transitionToRollbackOperations();
- assert.eq(rollbackNode, primary);
-
- // Insert a few items that will be rolled back.
- assert.commandWorked(coll.insert({_id: 2}));
- assert.commandWorked(coll.insert({_id: 3}));
- assert.commandWorked(coll.insert({_id: 4}));
-
- let getChangeEvent = new ScopedThread(function(host, cursorId, dbName, collName) {
- jsTestLog("Trying to receive change event from divergent primary.");
- const nodeDB = new Mongo(host).getDB(dbName);
- try {
- return nodeDB.runCommand({getMore: eval(cursorId), collection: collName});
- } catch (e) {
- return isNetworkError(e);
- }
- }, rollbackNode.host, tojson(cursorId), dbName, collName);
- getChangeEvent.start();
-
- // Make sure the change stream query started.
- assert.soon(() => primaryDB.currentOp({"command.getMore": cursorId}).inprog.length === 1);
-
- // Do some operations on the new primary that we can receive in a resumed stream.
- let syncSource = rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
- coll = syncSource.getDB(dbName)[collName];
- assert.commandWorked(coll.insert({_id: 5}));
- assert.commandWorked(coll.insert({_id: 6}));
- assert.commandWorked(coll.insert({_id: 7}));
-
- // Let rollback begin and complete.
- rollbackTest.transitionToSyncSourceOperationsDuringRollback();
- rollbackTest.transitionToSteadyStateOperations();
-
- // The change stream query should have failed when the node entered rollback.
- assert(getChangeEvent.returnData());
-
- jsTestLog("Resuming change stream against new primary.");
- res = syncSource.getDB(dbName).runCommand(
- {aggregate: collName, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}});
- changes = res.cursor.firstBatch;
- assert.eq(changes.length, 3);
- assert.eq(changes[0]["fullDocument"], {_id: 5});
- assert.eq(changes[0]["operationType"], "insert");
- assert.eq(changes[1]["fullDocument"], {_id: 6});
- assert.eq(changes[1]["operationType"], "insert");
- assert.eq(changes[2]["fullDocument"], {_id: 7});
- assert.eq(changes[2]["operationType"], "insert");
-
- rollbackTest.stop();
-
+'use strict';
+
+load("jstests/replsets/libs/rollback_test.js"); // for RollbackTest.
+
+// Disable implicit sessions so it's easy to run commands from different threads.
+TestData.disableImplicitSessions = true;
+
+const name = "change_stream_speculative_majority_rollback";
+const dbName = name;
+const collName = "coll";
+
+// Set up a replica set for use in RollbackTest. We disable majority reads on all nodes so we
+// will utilize speculative majority reads for change streams.
+const replTest = new ReplSetTest({
+ name,
+ nodes: 3,
+ useBridge: true,
+ settings: {chainingAllowed: false},
+ nodeOptions: {enableMajorityReadConcern: "false"}
+});
+replTest.startSet();
+let config = replTest.getReplSetConfig();
+config.members[2].priority = 0;
+replTest.initiate(config);
+
+const rollbackTest = new RollbackTest(name, replTest);
+const primary = rollbackTest.getPrimary();
+const primaryDB = primary.getDB(dbName);
+let coll = primaryDB[collName];
+
+// Create a collection.
+assert.commandWorked(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
+
+// Open a change stream on the initial primary.
+let res = primaryDB.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}});
+assert.commandWorked(res);
+let cursorId = res.cursor.id;
+
+// Receive an initial change event and save the resume token.
+assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+res = primaryDB.runCommand({getMore: cursorId, collection: collName});
+let changes = res.cursor.nextBatch;
+assert.eq(changes.length, 1);
+assert.eq(changes[0]["fullDocument"], {_id: 1});
+assert.eq(changes[0]["operationType"], "insert");
+let resumeToken = changes[0]["_id"];
+
+let rollbackNode = rollbackTest.transitionToRollbackOperations();
+assert.eq(rollbackNode, primary);
+
+// Insert a few items that will be rolled back.
+assert.commandWorked(coll.insert({_id: 2}));
+assert.commandWorked(coll.insert({_id: 3}));
+assert.commandWorked(coll.insert({_id: 4}));
+
+let getChangeEvent = new ScopedThread(function(host, cursorId, dbName, collName) {
+ jsTestLog("Trying to receive change event from divergent primary.");
+ const nodeDB = new Mongo(host).getDB(dbName);
+ try {
+ return nodeDB.runCommand({getMore: eval(cursorId), collection: collName});
+ } catch (e) {
+ return isNetworkError(e);
+ }
+}, rollbackNode.host, tojson(cursorId), dbName, collName);
+getChangeEvent.start();
+
+// Make sure the change stream query started.
+assert.soon(() => primaryDB.currentOp({"command.getMore": cursorId}).inprog.length === 1);
+
+// Do some operations on the new primary that we can receive in a resumed stream.
+let syncSource = rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
+coll = syncSource.getDB(dbName)[collName];
+assert.commandWorked(coll.insert({_id: 5}));
+assert.commandWorked(coll.insert({_id: 6}));
+assert.commandWorked(coll.insert({_id: 7}));
+
+// Let rollback begin and complete.
+rollbackTest.transitionToSyncSourceOperationsDuringRollback();
+rollbackTest.transitionToSteadyStateOperations();
+
+// The change stream query should have failed when the node entered rollback.
+assert(getChangeEvent.returnData());
+
+jsTestLog("Resuming change stream against new primary.");
+res = syncSource.getDB(dbName).runCommand(
+ {aggregate: collName, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}});
+changes = res.cursor.firstBatch;
+assert.eq(changes.length, 3);
+assert.eq(changes[0]["fullDocument"], {_id: 5});
+assert.eq(changes[0]["operationType"], "insert");
+assert.eq(changes[1]["fullDocument"], {_id: 6});
+assert.eq(changes[1]["operationType"], "insert");
+assert.eq(changes[2]["fullDocument"], {_id: 7});
+assert.eq(changes[2]["operationType"], "insert");
+
+rollbackTest.stop();
})();