diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-28 17:12:26 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-03-01 04:44:11 +0000 |
commit | 724db8bcba09de5fa9b86bf0ea5b0626a5be9e24 (patch) | |
tree | a810b9cacb4e9bef100fc729a00f64fce3a520fa | |
parent | 204d63a92d588b9891277caf70a257b42f82ac32 (diff) | |
download | mongo-724db8bcba09de5fa9b86bf0ea5b0626a5be9e24.tar.gz |
SERVER-38414 Upgrade/Downgrade testing for change stream high water marks
9 files changed, 494 insertions, 175 deletions
diff --git a/etc/evergreen.yml b/etc/evergreen.yml index efddcd7fd14..d46ad6a2384 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -1772,7 +1772,7 @@ functions: --edition ${multiversion_edition|base} \ --platform ${multiversion_platform|linux} \ --architecture ${multiversion_architecture|x86_64} \ - --useLatest 3.2 3.4 3.6 3.6.7 + --useLatest 3.2 3.4 3.6 3.6.7 4.0.5 "do snmp setup" : command: shell.exec diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js index 346a03d4b38..40490b5fc6b 100644 --- a/jstests/multiVersion/change_streams_feature_compatibility_version.js +++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js @@ -40,7 +40,6 @@ // We can't open a change stream on a non-existent database on last-stable, so we insert a dummy // document to create the database. - // TODO BACKPORT-34138 Remove this check once the change has been backported. assert.writeOK(testDB.dummy.insert({_id: "dummy"})); // Open a change stream against a 3.6 binary. We will use the resume token from this stream to @@ -95,8 +94,9 @@ assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 1); - // Test that the stream is still using the old resume token format using BinData. - assertResumeTokenUsesBinDataFormat(change._id); + // From 4.0.7 onwards, we always produce the newer string-format tokens even while in FCV 3.6. + // The stream which was resumed from the 3.6 BinData token should now be producing 4.0 tokens. + assertResumeTokenUsesStringFormat(change._id); // Explicitly set feature compatibility version 4.0. Remember the cluster time from that // response to use later. @@ -116,7 +116,7 @@ assert.writeOK(coll.insert({_id: 2})); - // Test that the stream opened in FCV 3.6 continues to work. + // Test that the stream opened in FCV 3.6 continues to generate tokens in the new format. change = cst.getOneChange(streamOnOldVersion); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 2); @@ -168,23 +168,22 @@ // Set the feature compatibility version to 3.6. assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"})); - // Test that existing streams continue, but switch back to the BinData format after observing - // the change to FCV. + // Test that existing streams continue, but still generate resume tokens in the new format. assert.writeOK(coll.insert({_id: 3})); change = cst.getOneChange(wholeDbCursor); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 3); - assertResumeTokenUsesBinDataFormat(change._id); + assertResumeTokenUsesStringFormat(change._id); change = adminCST.getOneChange(wholeClusterCursor); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 3); - assertResumeTokenUsesBinDataFormat(change._id); + assertResumeTokenUsesStringFormat(change._id); change = cst.getOneChange(cursorStartedWithTime); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 3); - assertResumeTokenUsesBinDataFormat(change._id); + assertResumeTokenUsesStringFormat(change._id); // Creating a new change stream with a 4.0 feature should fail. assert.commandFailedWithCode( @@ -220,7 +219,7 @@ }); assert.soon(() => resumedOnFCV36With40BinaryResumeToken.hasNext()); change = resumedOnFCV36With40BinaryResumeToken.next(); - assertResumeTokenUsesBinDataFormat(change._id); + assertResumeTokenUsesStringFormat(change._id); // Test that resuming a change stream with the original resume token still works. let resumedWith36Token; @@ -229,7 +228,7 @@ }); assert.soon(() => resumedWith36Token.hasNext()); change = resumedWith36Token.next(); - assertResumeTokenUsesBinDataFormat(change._id); + assertResumeTokenUsesStringFormat(change._id); rst.stopSet(); }()); diff --git a/jstests/multiVersion/change_streams_high_water_mark_cluster.js b/jstests/multiVersion/change_streams_high_water_mark_cluster.js new file mode 100644 index 00000000000..b65dd1fefef --- /dev/null +++ b/jstests/multiVersion/change_streams_high_water_mark_cluster.js @@ -0,0 +1,242 @@ +/** + * Tests that high water mark and postBatchResumeTokens are handled correctly during upgrade from + * and downgrade to both 3.6 and a pre-backport version of 4.0 on a sharded cluster. + */ +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // assertCreateCollection + load("jstests/libs/fixture_helpers.js"); // runCommandOnEachPrimary + load("jstests/multiVersion/libs/causal_consistency_helpers.js"); // supportsMajorityReadConcern + load("jstests/multiVersion/libs/change_stream_hwm_helpers.js"); // ChangeStreamHWMHelpers + load("jstests/multiVersion/libs/multi_cluster.js"); // upgradeCluster + load("jstests/multiVersion/libs/multi_rs.js"); // upgradeSet + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const preBackport40Version = ChangeStreamHWMHelpers.preBackport40Version; + const latest40Version = ChangeStreamHWMHelpers.latest40Version; + const latest36Version = ChangeStreamHWMHelpers.latest36Version; + + const st = new ShardingTest({ + shards: 2, + mongos: 1, + rs: {nodes: 3}, + other: { + mongosOptions: {binVersion: latest36Version}, + configOptions: {binVersion: latest36Version}, + rsOptions: { + binVersion: latest36Version, + setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1} + }, + } + }); + + // Obtain references to the test database via mongoS and directly on shard0. + let mongosDB = st.s.getDB(jsTestName()); + let primaryShard = st.rs0.getPrimary(); + + // Names of each of the collections used in the course of this test. + const shardedCollName = "sharded_coll"; + const unshardedCollName = "unsharded_coll"; + + // Updates the specified cluster components and then refreshes our references to each of them. + function refreshCluster(version, components, singleShard) { + if (singleShard) { + singleShard.upgradeSet({binVersion: version}); + } else { + st.upgradeCluster(version, components); + } + + // Wait for the config server and shards to become available, and restart mongoS. + st.configRS.awaitReplication(); + st.rs0.awaitReplication(); + st.rs1.awaitReplication(); + st.restartMongoses(); + + // Having upgraded the cluster, reacquire references to each component. + mongosDB = st.s.getDB(jsTestName()); + primaryShard = st.rs0.getPrimary(); + + // Re-apply the 'writePeriodicNoops' parameter to the up/downgraded shards. + const mongosAdminDB = mongosDB.getSiblingDB("admin"); + FixtureHelpers.runCommandOnEachPrimary( + {db: mongosAdminDB, cmdObj: {setParameter: 1, writePeriodicNoops: true}}); + } + + // Enable sharding on the the test database and ensure that the primary is shard0. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), primaryShard.name); + + // Create an unsharded collection on the primary shard via mongoS. + assertCreateCollection(mongosDB, unshardedCollName); + + // Create a sharded collection on {_id: 1}, split across the shards at {_id: 0}. + const collToShard = assertCreateCollection(mongosDB, shardedCollName); + st.shardColl(collToShard, {_id: 1}, {_id: 0}, {_id: 1}); + + // Maps used to associate collection names with collection objects and HWM tokens. + const collMap = { + [unshardedCollName]: () => st.s.getDB(jsTestName())[shardedCollName], + [shardedCollName]: () => st.s.getDB(jsTestName())[unshardedCollName] + }; + + /** + * Tests the behaviour of the cluster while upgrading from 'oldVersion' to 'latest40Version'. + * The 'oldVersionFCVs' parameter specifies an array of FCVs supported by 'oldVersion'; the + * upgrade/downgrade procedure will be tested with each of these FCVs. + */ + function runUpgradeDowngradeTests(oldVersion, oldVersionFCVs) { + for (let minFCV of oldVersionFCVs) { + // Create a map to record the HWMs returned by each test type. + const hwmTokenMap = {}; + + // We start with the cluster running on 'oldVersion'. Should not see any PBRTs. + jsTestLog(`Testing binary ${oldVersion} mongoS and shards, FCV ${minFCV}`); + refreshCluster(oldVersion); + assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: minFCV})); + for (let collName in collMap) { + const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens( + {coll: collMap[collName](), expectPBRT: false}); + assert.eq(hwmToken, undefined); + } + + // Upgrade a single shard to 'latest40Version' but leave the mongoS on 'oldVersion'. No + // streams produce PBRTs, but the new shard should continue to produce resumable tokens + // and the old-style $sortKey while the cluster is mid-upgrade. + jsTestLog(`Upgrading shard1 to binary ${latest40Version} FCV ${minFCV}`); + refreshCluster(latest40Version, null, st.rs1); + for (let collName in collMap) { + const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens( + {coll: collMap[collName](), expectPBRT: false}); + assert.eq(hwmToken, undefined); + } + + // Upgrade the remaining shard to 'latest40Version', but leave mongoS on 'oldVersion'. + jsTestLog( + `Upgrading to ${oldVersion} mongoS and ${latest40Version} shards, FCV ${minFCV}`); + refreshCluster(latest40Version, + {upgradeMongos: false, upgradeShards: true, upgradeConfigs: true}); + + // The shards are upgraded to 'latest40Version' but mongoS is running 'oldVersion'. The + // mongoS should be able to merge the output from the shards, but neither mongoS stream + // will produce a PBRT. + jsTestLog( + `Testing ${oldVersion} mongoS and ${latest40Version} shards with FCV ${minFCV}`); + for (let collName in collMap) { + const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens( + {coll: collMap[collName](), expectPBRT: false}); + assert.eq(hwmToken, undefined); + } + + // Upgrade the mongoS to 'latest40Version' but leave the cluster in 'minFCV'. + jsTestLog( + `Upgrading to binary ${latest40Version} mongoS and shards with FCV ${minFCV}`); + refreshCluster(latest40Version, + {upgradeMongos: true, upgradeShards: false, upgradeConfigs: false}); + + // All streams should now return PBRTs, and we should obtain a valid HWM from the test. + jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV ${minFCV}`); + for (let collName in collMap) { + hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({ + coll: collMap[collName](), + expectPBRT: true, + hwmToResume: hwmTokenMap[collName], + expectResume: true + }); + assert.neq(hwmTokenMap[collName], undefined); + } + + // Set the cluster's FCV to 4.0. + assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: "4.0"})); + + // All streams return PBRTs, and we can resume with the HWMs from the previous test. + jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV 4.0`); + for (let collName in collMap) { + hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({ + coll: collMap[collName](), + expectPBRT: true, + hwmToResume: hwmTokenMap[collName], + expectResume: true + }); + assert.neq(hwmTokenMap[collName], undefined); + } + + // Downgrade the cluster to 'minFCV'. We should continue to produce PBRTs and can still + // resume from the tokens that we generated in the preceding tests. + jsTestLog(`Downgrading to FCV ${minFCV} shards`); + assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: minFCV})); + + jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV ${minFCV}`); + for (let collName in collMap) { + hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({ + coll: collMap[collName](), + expectPBRT: true, + hwmToResume: hwmTokenMap[collName], + expectResume: true + }); + assert.neq(hwmTokenMap[collName], undefined); + } + + // Downgrade the mongoS to 'oldVersion'. We should be able to create new streams and + // resume from their tokens, but cannot resume from the previously-generated v1 tokens. + jsTestLog(`Downgrading to binary ${oldVersion} mongoS with FCV ${minFCV} shards`); + refreshCluster(oldVersion, + {upgradeMongos: true, upgradeShards: false, upgradeConfigs: false}); + + // We should no longer receive any PBRTs via mongoS, and we cannot resume from the HWMs. + jsTestLog(`Testing binary ${oldVersion} mongoS and binary ${latest40Version} shards`); + for (let collName in collMap) { + const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({ + coll: collMap[collName](), + expectPBRT: false, + hwmToResume: hwmTokenMap[collName], + expectResume: false + }); + assert.eq(hwmToken, undefined); + } + + // Downgrade a single shard to 'oldVersion'. We should continue to observe the same + // behaviour as we did in the previous test. + jsTestLog(`Downgrading shard1 to binary ${oldVersion} FCV ${minFCV}`); + refreshCluster(oldVersion, null, st.rs1); + for (let collName in collMap) { + const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({ + coll: collMap[collName](), + expectPBRT: false, + hwmToResume: hwmTokenMap[collName], + expectResume: false + }); + assert.eq(hwmToken, undefined); + } + + // Downgrade the remainder of the cluster to binary 'oldVersion'. + jsTestLog(`Downgrading to binary ${oldVersion} shards`); + refreshCluster(oldVersion, + {upgradeShards: true, upgradeConfigs: false, upgradeMongos: false}); + refreshCluster(oldVersion, + {upgradeConfigs: true, upgradeShards: false, upgradeMongos: false}); + + // We should no longer receive any PBRTs, and we cannot resume from the HWM tokens. + jsTestLog(`Testing downgraded binary ${oldVersion} mongoS and shards`); + for (let collName in collMap) { + const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({ + coll: collMap[collName](), + expectPBRT: false, + hwmToResume: hwmTokenMap[collName], + expectResume: false + }); + assert.eq(hwmToken, undefined); + } + } + } + + // Run the upgrade/downgrade tests from both 3.6 and pre-backport 4.0 versions. + runUpgradeDowngradeTests(latest36Version, ["3.6"]); + runUpgradeDowngradeTests(preBackport40Version, ["3.6", "4.0"]); + + st.stop(); +})();
\ No newline at end of file diff --git a/jstests/multiVersion/change_streams_high_water_mark_replset.js b/jstests/multiVersion/change_streams_high_water_mark_replset.js new file mode 100644 index 00000000000..c06f5770e1f --- /dev/null +++ b/jstests/multiVersion/change_streams_high_water_mark_replset.js @@ -0,0 +1,107 @@ +/** + * Tests that high water mark and postBatchResumeTokens are handled correctly during upgrade from + * and downgrade to both 3.6 and a pre-backport version of 4.0 on a single replica set. + */ +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assertCreateCollection. + load("jstests/multiVersion/libs/change_stream_hwm_helpers.js"); // For ChangeStreamHWMHelpers. + load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + + const preBackport40Version = ChangeStreamHWMHelpers.preBackport40Version; + const latest40Version = ChangeStreamHWMHelpers.latest40Version; + const latest36Version = ChangeStreamHWMHelpers.latest36Version; + + const rst = new ReplSetTest({ + nodes: 2, + nodeOptions: {binVersion: latest36Version}, + }); + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + rst.initiate(); + + // Obtain references to the test database and create the test collection. + let testDB = rst.getPrimary().getDB(jsTestName()); + assert.commandWorked(testDB.createCollection("test")); + let testColl = testDB.test; + + // Up- or downgrades the replset and then refreshes our references to the test collection. + function refreshReplSet(version) { + // Upgrade the set and wait for it to become available again. + rst.upgradeSet({binVersion: version}); + rst.awaitReplication(); + + // Having upgraded the replset, reacquire references to the db and collection. + testDB = rst.getPrimary().getDB(jsTestName()); + testColl = testDB.test; + } + + /** + * Tests the behaviour of the replset while upgrading from 'oldVersion' to 'latest40Version'. + * The 'oldVersionFCVs' parameter specifies an array of FCVs supported by 'oldVersion'; the + * upgrade/downgrade procedure will be tested with each of these FCVs. + */ + function runUpgradeDowngradeTests(oldVersion, oldVersionFCVs) { + for (let minFCV of oldVersionFCVs) { + // Stores a high water mark generated by the most recent test. + let hwmToken = null; + + // We start with the replset running on 'oldVersion'. No streams should produce PBRTs. + jsTestLog(`Testing binary ${oldVersion}`); + refreshReplSet(oldVersion); + assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: minFCV})); + hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens( + {coll: testColl, expectPBRT: false}); + assert.eq(hwmToken, undefined); + + // Upgrade the replset to 'latest40Version' but leave it in FCV 'minFCV'. + jsTestLog(`Upgrading to binary ${latest40Version} with FCV ${minFCV}`); + refreshReplSet(latest40Version); + + // All streams should now return PBRTs, including high water marks. + jsTestLog(`Testing binary ${latest40Version} with FCV ${minFCV}`); + hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens( + {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true}); + assert.neq(hwmToken, undefined); + + // Set the replset's FCV to 4.0. + assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"})); + + // All streams should return PBRTs. We can resume with the HWM from the previous test. + jsTestLog(`Testing binary ${latest40Version} with FCV 4.0`); + hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens( + {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true}); + assert.neq(hwmToken, undefined); + + // Downgrade the replset to FCV 'minFCV'. + jsTestLog(`Downgrading to FCV ${minFCV}`); + assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: minFCV})); + + // All streams should return PBRTs and we can still resume from the last HWM token. + jsTestLog(`Testing binary ${latest40Version} with FCV ${minFCV}`); + hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens( + {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true}); + assert.neq(hwmToken, undefined); + + // Downgrade the replset to 'oldVersion'. + jsTestLog(`Downgrading to binary ${oldVersion}`); + refreshReplSet(oldVersion); + + // We no longer receive PBRTs, and we cannot resume from the HWM token we generated. + jsTestLog(`Testing downgraded binary ${oldVersion}`); + ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens( + {coll: testColl, expectPBRT: false, hwmToResume: hwmToken, expectResume: false}); + } + } + + // Run the upgrade/downgrade tests from both 3.6 and pre-backport 4.0 versions. + runUpgradeDowngradeTests(latest36Version, ["3.6"]); + runUpgradeDowngradeTests(preBackport40Version, ["3.6", "4.0"]); + + rst.stopSet(); +})();
\ No newline at end of file diff --git a/jstests/multiVersion/libs/change_stream_hwm_helpers.js b/jstests/multiVersion/libs/change_stream_hwm_helpers.js new file mode 100644 index 00000000000..8fbc1c4a5c1 --- /dev/null +++ b/jstests/multiVersion/libs/change_stream_hwm_helpers.js @@ -0,0 +1,119 @@ +/** + * Helper functions and constants to support the change stream high water mark multiversion tests. + */ +const ChangeStreamHWMHelpers = (function() { + /** + * Specifies the exact version to be used in tests which require pre-backport 4.0 binaries. + */ + const preBackport40Version = "4.0.5"; + const latest40Version = "latest"; + const latest36Version = "3.6.7"; + + /** + * Opens a stream on the given collection, and confirms that a PBRT is or is not produced based + * on the value of 'expectPBRT'. The 'hwmToResume' argument is a high water mark from the last + * upgraded/downgraded incarnation of the cluster, and 'expectResume' indicates whether or not + * we should be able to resume using it. If 'expectPBRT' is true, we also verify that responses + * from the server have a postBatchResumeToken field, use it to obtain a high water mark token, + * and confirm that we can resume from this high water mark. Finally, if we generated a new HWM + * during this test, we return it. + */ + function testPostBatchAndHighWaterMarkTokens({coll, expectPBRT, hwmToResume, expectResume}) { + // Log the test options for debugging. + jsTestLog(tojsononeline({ + coll: coll.getFullName(), + conn: coll.getMongo(), + expectPBRT: expectPBRT, + hwmToResume: hwmToResume, + expectResume: expectResume + })); + + // Verify that the command response object has a PBRT if 'expectPBRT' is true. + const csCmdResponse = assert.commandWorked(coll.runCommand( + {aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}})); + assert.eq(expectPBRT, csCmdResponse.cursor.hasOwnProperty("postBatchResumeToken")); + + // Open a stream on the collection. If 'expectPBRT' is true then we should have a high water + // mark token immediately. + let csCursor = coll.watch(); + assert(!csCursor.hasNext()); + const newHWM = csCursor.getResumeToken(); + assert.eq(expectPBRT, newHWM != null); + + // Insert 10 documents into the test collection. If this is the sharded collection, we will + // alternate between writing to each shard. + for (let i = 0; i < 10; ++i) { + const id = (i % 2 ? i : (-i)); + assert.commandWorked(coll.insert({_id: id})); + } + + // Confirms that the expected _ids are seen by the change stream. If 'testResume' is true, + // also verifies that we see the correct results when resuming after or starting from each + // event. + function assertChangeStreamEvents(cursor, startId, testResume) { + for (let i = startId; i < 10; ++i) { + const id = (i % 2 ? i : (-i)); + assert.soon(() => cursor.hasNext()); + const curRes = cursor.next(); + assert.eq(curRes.fullDocument._id, id); + if (testResume) { + let resumeCursor = coll.watch([], {resumeAfter: curRes._id}); + assertChangeStreamEvents(resumeCursor, (i + 1), false); + resumeCursor.close(); + // The 'startAtOperationTime' parameter does not exist on 3.6. + const startAtCmdRes = coll.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {startAtOperationTime: curRes.clusterTime}}], + cursor: {} + }); + if (startAtCmdRes.ok) { + resumeCursor = new DBCommandCursor(coll.getDB(), startAtCmdRes); + assertChangeStreamEvents(resumeCursor, i, false); + resumeCursor.close(); + } + } + } + } + + // Verify that we can see all events, and can both resume after and start from each. + assertChangeStreamEvents(csCursor, 0, true); + + // If we have a new high water mark token, confirm that we can resume after it. + if (newHWM) { + csCursor = coll.watch([], {resumeAfter: newHWM}); + assertChangeStreamEvents(csCursor, 0, false); + } + + // If we were passed a HWM resume token from before the cluster was upgraded/downgraded, + // verify that the resume behaviour is as expected. + const invalidResumeTokenVersion = [40647, 50795]; + if (hwmToResume) { + if (!expectResume) { + // If we expect to fail the resume, confirm that we throw an assertion. + assert.commandFailedWithCode(coll.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {resumeAfter: hwmToResume}}], + cursor: {} + }), + invalidResumeTokenVersion); + } else { + // If we expect to be able to resume, confirm that we see the expected events. + csCursor = coll.watch([], {resumeAfter: hwmToResume}); + assertChangeStreamEvents(csCursor, 0, false); + } + } + + // Remove all documents in the test collection so we start with a clean slate next time. + assert.commandWorked(coll.remove({})); + + // Return the high water mark token from the start of the stream. + return newHWM; + } + + return { + testPostBatchAndHighWaterMarkTokens: testPostBatchAndHighWaterMarkTokens, + preBackport40Version: preBackport40Version, + latest40Version: latest40Version, + latest36Version: latest36Version, + }; +})();
\ No newline at end of file diff --git a/jstests/noPassthrough/change_fcv_during_change_stream.js b/jstests/noPassthrough/change_fcv_during_change_stream.js deleted file mode 100644 index fc51ff35ebe..00000000000 --- a/jstests/noPassthrough/change_fcv_during_change_stream.js +++ /dev/null @@ -1,109 +0,0 @@ -// Tests that a change stream's resume token format will adapt as the server's feature compatibility -// version changes. -// @tags: [requires_replication, uses_transactions] -(function() { - "use strict"; - - const rst = new ReplSetTest({nodes: 1}); - rst.startSet(); - rst.initiate(); - - const primary = rst.getPrimary(); - const testDB = primary.getDB("test"); - - if (!testDB.serverStatus().storageEngine.supportsCommittedReads) { - print( - "Skipping change_fcv_during_change_stream.js since storageEngine doesn't support it."); - rst.stopSet(); - return; - } - - assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "3.6"})); - - const coll = testDB.change_fcv_during_change_stream; - const changeStream36 = coll.watch([], {cursor: {batchSize: 1}}); - - assert.commandWorked(coll.insert({_id: "first in 3.6"})); - assert.commandWorked(coll.insert({_id: "second in 3.6"})); - - assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"})); - - assert.commandWorked(coll.insert({_id: "first in 4.0"})); - - const session = testDB.getMongo().startSession({causalConsistency: false}); - const sessionDb = session.getDatabase("test"); - const sessionColl = sessionDb.change_fcv_during_change_stream; - - session.startTransaction(); - - assert.commandWorked(sessionColl.insert({_id: "first in transaction"})); - assert.commandWorked(sessionColl.insert({_id: "second in transaction"})); - assert.commandWorked(sessionColl.remove({_id: "second in transaction"})); - assert.commandWorked(sessionColl.insert({_id: "second in transaction"})); - assert.commandWorked(sessionColl.insert({_id: "third in transaction"})); - - session.commitTransaction(); - session.endSession(); - - function assertResumeTokenUsesStringFormat(resumeToken) { - assert.neq(resumeToken._data, undefined); - assert.eq(typeof resumeToken._data, - "string", - () => "Resume token: " + tojson(resumeToken) + ", oplog contents: " + - tojson(testDB.getSiblingDB("local").oplog.rs.find().toArray())); - } - - function assertResumeTokenUsesBinDataFormat(resumeToken) { - assert.neq(resumeToken._data, undefined); - assert(resumeToken._data instanceof BinData, tojson(resumeToken)); - } - - // Start reading the change stream. The stream has been opened in 3.6 so make sure that the - // resumeToken is in the appropriate format. - assert.soon(() => changeStream36.hasNext()); - let nextChange = changeStream36.next(); - assert.eq(nextChange.documentKey._id, "first in 3.6"); - assertResumeTokenUsesBinDataFormat(nextChange._id); - - assert.soon(() => changeStream36.hasNext()); - nextChange = changeStream36.next(); - assert.eq(nextChange.documentKey._id, "second in 3.6"); - assertResumeTokenUsesBinDataFormat(nextChange._id); - - // At this point the server is switched to 4.0 and the resumeToken format should switch in 4.0 - // on the fly too. - assert.soon(() => changeStream36.hasNext()); - nextChange = changeStream36.next(); - assert.eq(nextChange.documentKey._id, "first in 4.0"); - assertResumeTokenUsesStringFormat(nextChange._id); - - assert.soon(() => changeStream36.hasNext()); - nextChange = changeStream36.next(); - assert.eq(nextChange.documentKey._id, "first in transaction"); - assertResumeTokenUsesStringFormat(nextChange._id); - - assert.soon(() => changeStream36.hasNext()); - nextChange = changeStream36.next(); - assert.eq(nextChange.documentKey._id, "second in transaction"); - assertResumeTokenUsesStringFormat(nextChange._id); - - // Open a new change stream and position it in the middle of a transaction. Only the new 4.0 - // format resumeTokens are able to position correctly. - const changeStream40 = coll.watch([], {resumeAfter: nextChange._id, cursor: {batchSize: 1}}); - assert.soon(() => changeStream40.hasNext()); - nextChange = changeStream40.next(); - assert.eq(nextChange.documentKey._id, "second in transaction"); - assertResumeTokenUsesStringFormat(nextChange._id); - - assert.soon(() => changeStream40.hasNext()); - nextChange = changeStream40.next(); - assert.eq(nextChange.documentKey._id, "second in transaction"); - assertResumeTokenUsesStringFormat(nextChange._id); - - assert.soon(() => changeStream40.hasNext()); - nextChange = changeStream40.next(); - assert.eq(nextChange.documentKey._id, "third in transaction"); - assertResumeTokenUsesStringFormat(nextChange._id); - - rst.stopSet(); -}()); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 5f2679e16a1..819a510a763 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -274,21 +274,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // 3) Look for 'applyOps' which were created as part of a transaction. BSONObj applyOps = getTxnApplyOpsFilter(nsMatch["ns"], nss); - // 4) Look for changes to the feature compatibility version. These show up as updates to the - // admin.system.version collection. - BSONObj fcvChange = BSON( - "$and" << BSON_ARRAY( - BSON("ns" << NamespaceString::kServerConfigurationNamespace.ns()) - // Ignore entries which correspond to the beginning of a transition. We only need to - // care about those which actually commit a change to the feature compatibility version. - << BSON("o.targetVersion" << BSON("$exists" << false)) - << BSON("o.version" << BSON("$exists" << true)))); - // Match oplog entries after "start" and are either supported (1) commands or (2) operations, // excepting those tagged "fromMigrate". Include the resume token, if resuming, so we can verify // it was still present in the oplog. return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom) - << BSON(OR(opMatch, commandMatch, applyOps, fcvChange)) + << BSON(OR(opMatch, commandMatch, applyOps)) << BSON("fromMigrate" << NE << true))); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 3265f5ff19a..323f7ec9d75 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -67,11 +67,6 @@ using std::vector; namespace { constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; - -bool isFCVChange(const Document& input) { - const auto ns = input[repl::OplogEntry::kNamespaceFieldName]; - return NamespaceString(ns.getStringData()).isServerConfigurationCollection(); -} } // namespace DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( @@ -81,12 +76,16 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( bool isIndependentOfAnyCollection) : DocumentSource(expCtx), _changeStreamSpec(changeStreamSpec.getOwned()), - _resumeTokenFormat( - fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40 - ? ResumeToken::SerializationFormat::kHexString - : ResumeToken::SerializationFormat::kBinData), _isIndependentOfAnyCollection(isIndependentOfAnyCollection) { + // If 'needsMerge' is true and 'mergeByPBRT' is false, then we are running on a sharded cluster + // that is mid-upgrade, and so we generate resume tokens that obey the FCV. Otherwise, we always + // generate v1 resume tokens regardless of whether we are in FCV 3.6 or upgraded to FCV 4.0. + _resumeTokenFormat = pExpCtx->needsMerge && !pExpCtx->mergeByPBRT && + fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40 + ? ResumeToken::SerializationFormat::kBinData + : ResumeToken::SerializationFormat::kHexString; + _nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns)); auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), @@ -178,6 +177,12 @@ ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts, if (!uuid.missing()) resumeTokenData.uuid = uuid.getUuid(); + // If 'needsMerge' is true and 'mergeByPBRT' is false, then we are running on a sharded cluster + // that is mid-upgrade. We therefore always generate v0 resume tokens for compatibility. + if (pExpCtx->needsMerge && !pExpCtx->mergeByPBRT) { + resumeTokenData.version = 0; + } + return resumeTokenData; } @@ -395,27 +400,6 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document return doc.freeze(); } -void DocumentSourceChangeStreamTransform::switchResumeTokenFormat( - const Document& fcvUpdateOplogEntry) { - checkValueType(fcvUpdateOplogEntry[repl::OplogEntry::kObjectFieldName], - repl::OplogEntry::kObjectFieldName, - BSONType::Object); - Document opObject = fcvUpdateOplogEntry[repl::OplogEntry::kObjectFieldName].getDocument(); - // The FCV update is done with a replacement-style update, so there will be no $set. Instead, - // 'opObject' is the entire new document. - Value newVersion = opObject["version"]; - - checkValueType(newVersion, "o.version", BSONType::String); - if (newVersion.getStringData() == "3.6") { - _resumeTokenFormat = ResumeToken::SerializationFormat::kBinData; - } else { - uassert(65537, - "Feature compatibility version updated to an unknown version", - newVersion.getStringData() == "4.0"); - _resumeTokenFormat = ResumeToken::SerializationFormat::kHexString; - } -} - Value DocumentSourceChangeStreamTransform::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { Document changeStreamOptions(_changeStreamSpec); @@ -478,13 +462,6 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { // Get the next input document. auto input = pSource->getNext(); - - // Check for FCV changes. - while (input.isAdvanced() && isFCVChange(input.getDocument())) { - switchResumeTokenFormat(input.getDocument()); - input = pSource->getNext(); - } - if (!input.isAdvanced()) { return input; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 84219ed4f90..11e058544cf 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -116,12 +116,6 @@ private: void initializeTransactionContext(const Document& input); /** - * Switches the format used to encode the resume token. This must happen if the stream sees an - * update to the admin.system.version collection. - */ - void switchResumeTokenFormat(const Document& fcvUpdateOplogEntry); - - /** * Gets the next relevant applyOps entry that should be returned. If there is none, returns * empty document. */ |