diff options
author | William Schultz <william.schultz@mongodb.com> | 2018-12-21 16:33:36 -0500 |
---|---|---|
committer | William Schultz <william.schultz@mongodb.com> | 2018-12-21 16:43:58 -0500 |
commit | 435200964a1fc9de566e74bd579e99c308551670 (patch) | |
tree | f6d4e2723f794200b900c914818c8d9645df50f0 /jstests/replsets/change_stream_speculative_majority_rollback.js | |
parent | f15556ae1ba4f78d2823d54e38d7025c7e9ca4fb (diff) | |
download | mongo-435200964a1fc9de566e74bd579e99c308551670.tar.gz |
SERVER-37560 Allow change streams to work with speculative majority reads
This patch allows change stream queries to use speculative majority reads so that they can be used even when enableMajorityReadConcern:false. Change stream aggregation commands are allowed to use speculative majority reads as well as 'find' commands that specify a special flag. This commit also enables all change streams tests and suites on the enableMajorityReadConcern:false Evergreen variant. No optimizations for speculative majority change streams are included in this commit.
Diffstat (limited to 'jstests/replsets/change_stream_speculative_majority_rollback.js')
-rw-r--r-- | jstests/replsets/change_stream_speculative_majority_rollback.js | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/jstests/replsets/change_stream_speculative_majority_rollback.js b/jstests/replsets/change_stream_speculative_majority_rollback.js new file mode 100644 index 00000000000..104269ff8a3 --- /dev/null +++ b/jstests/replsets/change_stream_speculative_majority_rollback.js @@ -0,0 +1,104 @@ +/** + * Test change stream behavior with speculative majority reads in the face of replication rollback. + */ +(function() { + 'use strict'; + + load("jstests/replsets/libs/rollback_test.js"); // for RollbackTest. + + // Disable implicit sessions so it's easy to run commands from different threads. + TestData.disableImplicitSessions = true; + + const name = "change_stream_speculative_majority_rollback"; + const dbName = name; + const collName = "coll"; + + // Set up a replica set for use in RollbackTest. We disable majority reads on all nodes so we + // will utilize speculative majority reads for change streams. + const replTest = new ReplSetTest( + {name, nodes: 3, useBridge: true, nodeOptions: {enableMajorityReadConcern: "false"}}); + replTest.startSet(); + const nodes = replTest.nodeList(); + replTest.initiate({ + _id: name, + members: [ + {_id: 0, host: nodes[0]}, + {_id: 1, host: nodes[1]}, + {_id: 2, host: nodes[2], arbiterOnly: true} + ] + }); + + const rollbackTest = new RollbackTest(name, replTest); + const primary = rollbackTest.getPrimary(); + const primaryDB = primary.getDB(dbName); + let coll = primaryDB[collName]; + + // Create a collection. + assert.commandWorked(coll.insert({_id: 0}, {writeConcern: {w: "majority"}})); + + // Open a change stream on the initial primary. + let res = + primaryDB.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}}); + assert.commandWorked(res); + let cursorId = res.cursor.id; + + // Receive an initial change event and save the resume token. + assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); + res = primaryDB.runCommand({getMore: cursorId, collection: collName}); + let changes = res.cursor.nextBatch; + assert.eq(changes.length, 1); + assert.eq(changes[0]["fullDocument"], {_id: 1}); + assert.eq(changes[0]["operationType"], "insert"); + let resumeToken = changes[0]["_id"]; + + let rollbackNode = rollbackTest.transitionToRollbackOperations(); + assert.eq(rollbackNode, primary); + + // Insert a few items that will be rolled back. + assert.commandWorked(coll.insert({_id: 2})); + assert.commandWorked(coll.insert({_id: 3})); + assert.commandWorked(coll.insert({_id: 4})); + + let getChangeEvent = new ScopedThread(function(host, cursorId, dbName, collName) { + jsTestLog("Trying to receive change event from divergent primary."); + const nodeDB = new Mongo(host).getDB(dbName); + try { + return nodeDB.runCommand({getMore: eval(cursorId), collection: collName}); + } catch (e) { + return isNetworkError(e); + } + }, rollbackNode.host, tojson(cursorId), dbName, collName); + getChangeEvent.start(); + + // Make sure the change stream query started. + assert.soon(() => primaryDB.currentOp({"command.getMore": cursorId}).inprog.length === 1); + + // Do some operations on the new primary that we can receive in a resumed stream. + let syncSource = rollbackTest.transitionToSyncSourceOperationsBeforeRollback(); + coll = syncSource.getDB(dbName)[collName]; + assert.commandWorked(coll.insert({_id: 5})); + assert.commandWorked(coll.insert({_id: 6})); + assert.commandWorked(coll.insert({_id: 7})); + + // Let rollback begin and complete. + rollbackTest.transitionToSyncSourceOperationsDuringRollback(); + rollbackTest.transitionToSteadyStateOperations(); + + // The change stream query should have failed when the node entered rollback. + assert(getChangeEvent.returnData()); + + jsTestLog("Resuming change stream against new primary."); + res = syncSource.getDB(dbName).runCommand( + {aggregate: collName, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}}); + changes = res.cursor.firstBatch; + assert.eq(changes.length, 3); + assert.eq(changes[0]["fullDocument"], {_id: 5}); + assert.eq(changes[0]["operationType"], "insert"); + assert.eq(changes[1]["fullDocument"], {_id: 6}); + assert.eq(changes[1]["operationType"], "insert"); + assert.eq(changes[2]["fullDocument"], {_id: 7}); + assert.eq(changes[2]["operationType"], "insert"); + + rollbackTest.stop(); + +})(); |