summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Banala <arun.banala@mongodb.com>2020-04-28 14:39:06 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-06-10 12:54:40 +0000
commitbacc717789965d83d5253a4591c5f3171c741c39 (patch)
tree13e8fa0c0baccc41949fba2b022805d684ba1bf8
parent05aacade41bf56bdb7a2ab9abba72cff89fc1ac5 (diff)
downloadmongo-bacc717789965d83d5253a4591c5f3171c741c39.tar.gz
SERVER-45708 Add automated test for downgrading with change streams
-rw-r--r--buildscripts/resmokeconfig/suites/multiversion.yml6
-rw-r--r--buildscripts/resmokeconfig/suites/multiversion_auth.yml6
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_feature_compatibility_version.js (renamed from jstests/multiVersion/change_streams_feature_compatibility_version.js)0
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js (renamed from jstests/multiVersion/change_streams_multi_version_cluster.js)0
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_multi_version_transaction.js (renamed from jstests/multiVersion/change_streams_multi_version_transaction.js)0
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js247
6 files changed, 255 insertions, 4 deletions
diff --git a/buildscripts/resmokeconfig/suites/multiversion.yml b/buildscripts/resmokeconfig/suites/multiversion.yml
index ec081c2cafc..c76d0faaeb3 100644
--- a/buildscripts/resmokeconfig/suites/multiversion.yml
+++ b/buildscripts/resmokeconfig/suites/multiversion.yml
@@ -2,10 +2,12 @@ test_kind: js_test
selector:
roots:
- - jstests/multiVersion/*.js
- - jstests/multiVersion/genericSetFCVUsage/*.js
+ - jstests/multiVersion/**/*.js
- src/mongo/db/modules/*/jstests/hot_backups/*last_stable*.js
exclude_files:
+ # Do not execute files with helper functions.
+ - jstests/multiVersion/libs/*.js
+
# TODO: SERVER-21578
- jstests/multiVersion/balancer_multiVersion_detect.js
diff --git a/buildscripts/resmokeconfig/suites/multiversion_auth.yml b/buildscripts/resmokeconfig/suites/multiversion_auth.yml
index 77aea334781..11118423485 100644
--- a/buildscripts/resmokeconfig/suites/multiversion_auth.yml
+++ b/buildscripts/resmokeconfig/suites/multiversion_auth.yml
@@ -7,9 +7,11 @@ test_kind: js_test
selector:
roots:
- - jstests/multiVersion/*.js
- - jstests/multiVersion/genericSetFCVUsage/*.js
+ - jstests/multiVersion/**/*.js
exclude_files:
+ # Do not execute files with helper functions.
+ - jstests/multiVersion/libs/*.js
+
# TODO: SERVER-21578
- jstests/multiVersion/balancer_multiVersion_detect.js
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/genericChangeStreams/change_streams_feature_compatibility_version.js
index 01ac768ac55..01ac768ac55 100644
--- a/jstests/multiVersion/change_streams_feature_compatibility_version.js
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_feature_compatibility_version.js
diff --git a/jstests/multiVersion/change_streams_multi_version_cluster.js b/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js
index 0237bf22dff..0237bf22dff 100644
--- a/jstests/multiVersion/change_streams_multi_version_cluster.js
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js
diff --git a/jstests/multiVersion/change_streams_multi_version_transaction.js b/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_transaction.js
index 591233bc374..591233bc374 100644
--- a/jstests/multiVersion/change_streams_multi_version_transaction.js
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_transaction.js
diff --git a/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js b/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
new file mode 100644
index 00000000000..c2eb2520c60
--- /dev/null
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
@@ -0,0 +1,247 @@
+/**
+ * Verifies that a change stream which is resumed on a downgraded last-stable binary does not crash
+ * the server, even when reading oplog entries which the last-stable binary may not understand.
+ *
+ * @tags: [uses_change_streams, requires_replication]
+ */
+(function() {
+"use strict";
+
+load("jstests/multiVersion/libs/multi_cluster.js"); // For upgradeCluster.
+
+// Checking UUID consistency uses cached connections, which are not valid across restarts or
+// stepdowns.
+TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+
+const dbName = jsTestName();
+const collName = "coll";
+
+// Start a sharded cluster with latest binaries.
+const st = new ShardingTest({
+ shards: 1,
+ rs: {nodes: 2, binVersion: "latest"},
+ other: {mongosOptions: {binVersion: "latest"}}
+});
+
+let shardedColl = st.s.getDB(dbName)[collName];
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+st.ensurePrimaryShard(dbName, st.shard0.shardName);
+assert.commandWorked(st.s.adminCommand({shardCollection: shardedColl.getFullName(), key: {sk: 1}}));
+
+// Define a set of standard write tests. These tests will be run for every new version and should
+// not be modified. Each test case should have a function with field name 'generateOpLogEntry'
+// which takes a collection object as input.
+const standardTestCases = [
+ // Basic insert case.
+ {
+ testName: "StandardInsert",
+ generateOpLogEntry: function(coll) {
+ assert.commandWorked(coll.runCommand({
+ insert: coll.getName(),
+ documents: [{sk: 1}, {sk: 2}, {sk: -1}, {sk: -2}],
+ ordered: false
+ }));
+ }
+ },
+ // Op-style update.
+ {
+ testName: "OpStyleUpdate",
+ generateOpLogEntry: function(coll) {
+ assert.commandWorked(coll.runCommand({
+ update: shardedColl.getName(),
+ updates: [
+ {q: {sk: 1}, u: {$set: {a: 1}}},
+ {q: {sk: -1}, u: {$set: {a: 1}}},
+ {q: {sk: 2}, u: {$set: {a: 1}}}
+ ],
+ ordered: false
+ }));
+ }
+ },
+ // Replacement style update.
+ {
+ testName: "ReplacementStyleUpdate",
+ generateOpLogEntry: function(coll) {
+ assert.commandWorked(coll.runCommand({
+ update: shardedColl.getName(),
+ updates: [{q: {sk: 1}, u: {sk: 1, a: 2}}, {q: {sk: -1}, u: {sk: -1, a: 2}}]
+ }));
+ }
+
+ },
+ // Pipeline style update.
+ {
+ testName: "PipelineStyleUpdate",
+ generateOpLogEntry: function(coll) {
+ assert.commandWorked(coll.runCommand({
+ update: shardedColl.getName(),
+ updates: [{q: {sk: 2}, u: [{$set: {a: 3}}]}, {q: {sk: -2}, u: [{$set: {a: 3}}]}]
+ }));
+ }
+ },
+ // Basic delete.
+ {
+ testName: "Delete",
+ generateOpLogEntry: function(coll) {
+ assert.commandWorked(coll.runCommand({
+ delete: shardedColl.getName(),
+ deletes: [{q: {sk: 1}, limit: 0}, {q: {sk: -2}, limit: 0}, {q: {sk: -1}, limit: 0}]
+ }));
+ }
+ }
+];
+
+// The list of test cases against which to test the downgraded change stream. Any time a change is
+// made to the existing oplog format, or whenever a new oplog entry type is created, a test-case
+// should be added here which generates an example of the new or modified entry.
+const latestVersionTestCases = [];
+
+// Concatenate the standard tests with the custom latest-version tests to produce the final set of
+// test-cases that will be run.
+const testCases = standardTestCases.concat(latestVersionTestCases);
+
+// The list of all the change stream variations against which the above test cases need to be run.
+// Each entry should have function with field name 'watch' which opens a new change stream.
+const changeStreamsVariants = [
+ {
+ watch: function(options) {
+ return shardedColl.watch([], options);
+ }
+ },
+ {
+ watch: function(options) {
+ return shardedColl.getDB().watch([], options);
+ }
+ },
+ {
+ watch: function(options) {
+ return st.s.watch([], options);
+ }
+ },
+ {
+ watch: function(options) {
+ return shardedColl.watch([], Object.assign(options, {fullDocument: "updateLookup"}));
+ }
+ },
+ {
+ watch: function(options) {
+ return shardedColl.getDB().watch(
+ [], Object.assign(options, {fullDocument: "updateLookup"}));
+ }
+ },
+ {
+ watch: function(options) {
+ return st.s.watch([], Object.assign(options, {fullDocument: "updateLookup"}));
+ }
+ }
+];
+
+/**
+ * For each test case and change stream variation, generates the oplog entries to be tested and
+ * creates an augmented test-case containing a resume token which marks the start of the test, and a
+ * sentinel entry that marks the end of the test. These will be used post-downgrade to run each of
+ * the test cases in isolation.
+ */
+function writeOplogEntriesAndCreateResumePointsOnLatestVersion() {
+ function createSentinelEntry(testNum) {
+ return assert
+ .commandWorked(shardedColl.runCommand(
+ {insert: shardedColl.getName(), documents: [{_id: "sentinel_entry_" + testNum}]}))
+ .$clusterTime.clusterTime;
+ }
+
+ // We write a sentinel entry before each test case so that the resumed changestreams will have a
+ // known point at which to stop while running each test.
+ let testNum = 0;
+ let testStartTime = createSentinelEntry(testNum);
+ const outputChangeStreams = [];
+ for (let testCase of testCases) {
+ // Capture the 'resumeToken' when the sentinel entry is found. We use the token to resume
+ // the stream rather than the 'testStartTime' because resuming from a token adds more stages
+ // to the $changeStream pipeline, which increases our coverage.
+ let resumeToken;
+ const csCursor = changeStreamsVariants[0].watch({startAtOperationTime: testStartTime});
+ assert.soon(() => {
+ if (!csCursor.hasNext()) {
+ return false;
+ }
+ const nextEvent = csCursor.next();
+ resumeToken = nextEvent._id;
+ return (nextEvent.documentKey._id == "sentinel_entry_" + testNum);
+ });
+
+ for (let changeStreamVariant of changeStreamsVariants) {
+ // Start a change stream on the sentinel entry for each test case.
+ const outputChangeStream = {
+ watch: changeStreamVariant.watch,
+ resumeToken: resumeToken,
+ // The termination for the change stream of the current test case will be the
+ // sentinel entry for the next test case.
+ endSentinelEntry: "sentinel_entry_" + (testNum + 1),
+ // We copy this for debugging purposes only.
+ testName: testCases.testName
+ };
+ outputChangeStreams.push(outputChangeStream);
+ }
+
+ // Run the test case's 'generateOpLogEntry' function, which will create the actual oplog
+ // entry to be tested.
+ testCase.generateOpLogEntry(shardedColl);
+
+ // Insert a sentinel to separate this test-case from the next.
+ testStartTime = createSentinelEntry(++testNum);
+ }
+ return outputChangeStreams;
+}
+
+/**
+ * Validates that resuming each of the change stream will not crash the server. The 'changeStreams'
+ * should be an array and each entry should have fields 'watch', 'resumeToken' and
+ * 'endSentinelEntry'.
+ */
+function resumeStreamsOnLastStableVersion(changeStreams) {
+ for (let changeStream of changeStreams) {
+ jsTestLog("Validating change stream for " + tojson(changeStream));
+ const csCursor = changeStream.watch({resumeAfter: changeStream.resumeToken});
+
+ // Keep calling 'getmore' until the sentinal entry for the next test is found or until the
+ // change stream throws an error.
+ assert.soon(() => {
+ if (!csCursor.hasNext()) {
+ return false;
+ }
+ try {
+ const nextEvent = csCursor.next();
+ return (nextEvent.documentKey._id == changeStream.endSentinelEntry);
+ } catch (e) {
+ jsTestLog("Error occurred while reading change stream. " + tojson(e));
+
+ // Validate that the error returned was not a consequence of server crash.
+ assert.commandWorked(shardedColl.runCommand({ping: 1}));
+ assert.commandWorked(st.rs0.getPrimary().getDB("admin").runCommand({ping: 1}));
+ assert.commandWorked(st.configRS.getPrimary().getDB("admin").runCommand({ping: 1}));
+
+ return true;
+ }
+ });
+ }
+}
+
+// Obtain the list of change stream tests and the associated tokens from which to resume after the
+// cluster has been downgraded.
+const changeStreamsToBeValidated = writeOplogEntriesAndCreateResumePointsOnLatestVersion();
+
+// Downgrade the entire cluster to 'last-stable' binVersion.
+assert.commandWorked(
+ st.s.getDB(dbName).adminCommand({setFeatureCompatibilityVersion: lastStableFCV}));
+st.upgradeCluster("last-stable");
+
+// Refresh our reference to the sharded collection post-downgrade.
+shardedColl = st.s.getDB(dbName)[collName];
+
+// Resume all the change streams that were created on latest version and validate that the change
+// stream doesn't crash the server after downgrade.
+resumeStreamsOnLastStableVersion(changeStreamsToBeValidated);
+
+st.stop();
+}());