summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-02-28 17:12:26 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-03-01 04:44:11 +0000
commit724db8bcba09de5fa9b86bf0ea5b0626a5be9e24 (patch)
treea810b9cacb4e9bef100fc729a00f64fce3a520fa
parent204d63a92d588b9891277caf70a257b42f82ac32 (diff)
downloadmongo-724db8bcba09de5fa9b86bf0ea5b0626a5be9e24.tar.gz
SERVER-38414 Upgrade/Downgrade testing for change stream high water marks
-rw-r--r--etc/evergreen.yml2
-rw-r--r--jstests/multiVersion/change_streams_feature_compatibility_version.js21
-rw-r--r--jstests/multiVersion/change_streams_high_water_mark_cluster.js242
-rw-r--r--jstests/multiVersion/change_streams_high_water_mark_replset.js107
-rw-r--r--jstests/multiVersion/libs/change_stream_hwm_helpers.js119
-rw-r--r--jstests/noPassthrough/change_fcv_during_change_stream.js109
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp51
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h6
9 files changed, 494 insertions, 175 deletions
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index efddcd7fd14..d46ad6a2384 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -1772,7 +1772,7 @@ functions:
--edition ${multiversion_edition|base} \
--platform ${multiversion_platform|linux} \
--architecture ${multiversion_architecture|x86_64} \
- --useLatest 3.2 3.4 3.6 3.6.7
+ --useLatest 3.2 3.4 3.6 3.6.7 4.0.5
"do snmp setup" :
command: shell.exec
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js
index 346a03d4b38..40490b5fc6b 100644
--- a/jstests/multiVersion/change_streams_feature_compatibility_version.js
+++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js
@@ -40,7 +40,6 @@
// We can't open a change stream on a non-existent database on last-stable, so we insert a dummy
// document to create the database.
- // TODO BACKPORT-34138 Remove this check once the change has been backported.
assert.writeOK(testDB.dummy.insert({_id: "dummy"}));
// Open a change stream against a 3.6 binary. We will use the resume token from this stream to
@@ -95,8 +94,9 @@
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 1);
- // Test that the stream is still using the old resume token format using BinData.
- assertResumeTokenUsesBinDataFormat(change._id);
+ // From 4.0.7 onwards, we always produce the newer string-format tokens even while in FCV 3.6.
+ // The stream which was resumed from the 3.6 BinData token should now be producing 4.0 tokens.
+ assertResumeTokenUsesStringFormat(change._id);
// Explicitly set feature compatibility version 4.0. Remember the cluster time from that
// response to use later.
@@ -116,7 +116,7 @@
assert.writeOK(coll.insert({_id: 2}));
- // Test that the stream opened in FCV 3.6 continues to work.
+ // Test that the stream opened in FCV 3.6 continues to generate tokens in the new format.
change = cst.getOneChange(streamOnOldVersion);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 2);
@@ -168,23 +168,22 @@
// Set the feature compatibility version to 3.6.
assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"}));
- // Test that existing streams continue, but switch back to the BinData format after observing
- // the change to FCV.
+ // Test that existing streams continue, but still generate resume tokens in the new format.
assert.writeOK(coll.insert({_id: 3}));
change = cst.getOneChange(wholeDbCursor);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 3);
- assertResumeTokenUsesBinDataFormat(change._id);
+ assertResumeTokenUsesStringFormat(change._id);
change = adminCST.getOneChange(wholeClusterCursor);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 3);
- assertResumeTokenUsesBinDataFormat(change._id);
+ assertResumeTokenUsesStringFormat(change._id);
change = cst.getOneChange(cursorStartedWithTime);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, 3);
- assertResumeTokenUsesBinDataFormat(change._id);
+ assertResumeTokenUsesStringFormat(change._id);
// Creating a new change stream with a 4.0 feature should fail.
assert.commandFailedWithCode(
@@ -220,7 +219,7 @@
});
assert.soon(() => resumedOnFCV36With40BinaryResumeToken.hasNext());
change = resumedOnFCV36With40BinaryResumeToken.next();
- assertResumeTokenUsesBinDataFormat(change._id);
+ assertResumeTokenUsesStringFormat(change._id);
// Test that resuming a change stream with the original resume token still works.
let resumedWith36Token;
@@ -229,7 +228,7 @@
});
assert.soon(() => resumedWith36Token.hasNext());
change = resumedWith36Token.next();
- assertResumeTokenUsesBinDataFormat(change._id);
+ assertResumeTokenUsesStringFormat(change._id);
rst.stopSet();
}());
diff --git a/jstests/multiVersion/change_streams_high_water_mark_cluster.js b/jstests/multiVersion/change_streams_high_water_mark_cluster.js
new file mode 100644
index 00000000000..b65dd1fefef
--- /dev/null
+++ b/jstests/multiVersion/change_streams_high_water_mark_cluster.js
@@ -0,0 +1,242 @@
+/**
+ * Tests that high water mark and postBatchResumeTokens are handled correctly during upgrade from
+ * and downgrade to both 3.6 and a pre-backport version of 4.0 on a sharded cluster.
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // assertCreateCollection
+ load("jstests/libs/fixture_helpers.js"); // runCommandOnEachPrimary
+ load("jstests/multiVersion/libs/causal_consistency_helpers.js"); // supportsMajorityReadConcern
+ load("jstests/multiVersion/libs/change_stream_hwm_helpers.js"); // ChangeStreamHWMHelpers
+ load("jstests/multiVersion/libs/multi_cluster.js"); // upgradeCluster
+ load("jstests/multiVersion/libs/multi_rs.js"); // upgradeSet
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const preBackport40Version = ChangeStreamHWMHelpers.preBackport40Version;
+ const latest40Version = ChangeStreamHWMHelpers.latest40Version;
+ const latest36Version = ChangeStreamHWMHelpers.latest36Version;
+
+ const st = new ShardingTest({
+ shards: 2,
+ mongos: 1,
+ rs: {nodes: 3},
+ other: {
+ mongosOptions: {binVersion: latest36Version},
+ configOptions: {binVersion: latest36Version},
+ rsOptions: {
+ binVersion: latest36Version,
+ setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
+ },
+ }
+ });
+
+ // Obtain references to the test database via mongoS and directly on shard0.
+ let mongosDB = st.s.getDB(jsTestName());
+ let primaryShard = st.rs0.getPrimary();
+
+ // Names of each of the collections used in the course of this test.
+ const shardedCollName = "sharded_coll";
+ const unshardedCollName = "unsharded_coll";
+
+ // Updates the specified cluster components and then refreshes our references to each of them.
+ function refreshCluster(version, components, singleShard) {
+ if (singleShard) {
+ singleShard.upgradeSet({binVersion: version});
+ } else {
+ st.upgradeCluster(version, components);
+ }
+
+ // Wait for the config server and shards to become available, and restart mongoS.
+ st.configRS.awaitReplication();
+ st.rs0.awaitReplication();
+ st.rs1.awaitReplication();
+ st.restartMongoses();
+
+ // Having upgraded the cluster, reacquire references to each component.
+ mongosDB = st.s.getDB(jsTestName());
+ primaryShard = st.rs0.getPrimary();
+
+ // Re-apply the 'writePeriodicNoops' parameter to the up/downgraded shards.
+ const mongosAdminDB = mongosDB.getSiblingDB("admin");
+ FixtureHelpers.runCommandOnEachPrimary(
+ {db: mongosAdminDB, cmdObj: {setParameter: 1, writePeriodicNoops: true}});
+ }
+
+ // Enable sharding on the the test database and ensure that the primary is shard0.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), primaryShard.name);
+
+ // Create an unsharded collection on the primary shard via mongoS.
+ assertCreateCollection(mongosDB, unshardedCollName);
+
+ // Create a sharded collection on {_id: 1}, split across the shards at {_id: 0}.
+ const collToShard = assertCreateCollection(mongosDB, shardedCollName);
+ st.shardColl(collToShard, {_id: 1}, {_id: 0}, {_id: 1});
+
+ // Maps used to associate collection names with collection objects and HWM tokens.
+ const collMap = {
+ [unshardedCollName]: () => st.s.getDB(jsTestName())[shardedCollName],
+ [shardedCollName]: () => st.s.getDB(jsTestName())[unshardedCollName]
+ };
+
+ /**
+ * Tests the behaviour of the cluster while upgrading from 'oldVersion' to 'latest40Version'.
+ * The 'oldVersionFCVs' parameter specifies an array of FCVs supported by 'oldVersion'; the
+ * upgrade/downgrade procedure will be tested with each of these FCVs.
+ */
+ function runUpgradeDowngradeTests(oldVersion, oldVersionFCVs) {
+ for (let minFCV of oldVersionFCVs) {
+ // Create a map to record the HWMs returned by each test type.
+ const hwmTokenMap = {};
+
+ // We start with the cluster running on 'oldVersion'. Should not see any PBRTs.
+ jsTestLog(`Testing binary ${oldVersion} mongoS and shards, FCV ${minFCV}`);
+ refreshCluster(oldVersion);
+ assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: minFCV}));
+ for (let collName in collMap) {
+ const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
+ {coll: collMap[collName](), expectPBRT: false});
+ assert.eq(hwmToken, undefined);
+ }
+
+ // Upgrade a single shard to 'latest40Version' but leave the mongoS on 'oldVersion'. No
+ // streams produce PBRTs, but the new shard should continue to produce resumable tokens
+ // and the old-style $sortKey while the cluster is mid-upgrade.
+ jsTestLog(`Upgrading shard1 to binary ${latest40Version} FCV ${minFCV}`);
+ refreshCluster(latest40Version, null, st.rs1);
+ for (let collName in collMap) {
+ const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
+ {coll: collMap[collName](), expectPBRT: false});
+ assert.eq(hwmToken, undefined);
+ }
+
+ // Upgrade the remaining shard to 'latest40Version', but leave mongoS on 'oldVersion'.
+ jsTestLog(
+ `Upgrading to ${oldVersion} mongoS and ${latest40Version} shards, FCV ${minFCV}`);
+ refreshCluster(latest40Version,
+ {upgradeMongos: false, upgradeShards: true, upgradeConfigs: true});
+
+ // The shards are upgraded to 'latest40Version' but mongoS is running 'oldVersion'. The
+ // mongoS should be able to merge the output from the shards, but neither mongoS stream
+ // will produce a PBRT.
+ jsTestLog(
+ `Testing ${oldVersion} mongoS and ${latest40Version} shards with FCV ${minFCV}`);
+ for (let collName in collMap) {
+ const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
+ {coll: collMap[collName](), expectPBRT: false});
+ assert.eq(hwmToken, undefined);
+ }
+
+ // Upgrade the mongoS to 'latest40Version' but leave the cluster in 'minFCV'.
+ jsTestLog(
+ `Upgrading to binary ${latest40Version} mongoS and shards with FCV ${minFCV}`);
+ refreshCluster(latest40Version,
+ {upgradeMongos: true, upgradeShards: false, upgradeConfigs: false});
+
+ // All streams should now return PBRTs, and we should obtain a valid HWM from the test.
+ jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV ${minFCV}`);
+ for (let collName in collMap) {
+ hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
+ coll: collMap[collName](),
+ expectPBRT: true,
+ hwmToResume: hwmTokenMap[collName],
+ expectResume: true
+ });
+ assert.neq(hwmTokenMap[collName], undefined);
+ }
+
+ // Set the cluster's FCV to 4.0.
+ assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: "4.0"}));
+
+ // All streams return PBRTs, and we can resume with the HWMs from the previous test.
+ jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV 4.0`);
+ for (let collName in collMap) {
+ hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
+ coll: collMap[collName](),
+ expectPBRT: true,
+ hwmToResume: hwmTokenMap[collName],
+ expectResume: true
+ });
+ assert.neq(hwmTokenMap[collName], undefined);
+ }
+
+ // Downgrade the cluster to 'minFCV'. We should continue to produce PBRTs and can still
+ // resume from the tokens that we generated in the preceding tests.
+ jsTestLog(`Downgrading to FCV ${minFCV} shards`);
+ assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: minFCV}));
+
+ jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV ${minFCV}`);
+ for (let collName in collMap) {
+ hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
+ coll: collMap[collName](),
+ expectPBRT: true,
+ hwmToResume: hwmTokenMap[collName],
+ expectResume: true
+ });
+ assert.neq(hwmTokenMap[collName], undefined);
+ }
+
+ // Downgrade the mongoS to 'oldVersion'. We should be able to create new streams and
+ // resume from their tokens, but cannot resume from the previously-generated v1 tokens.
+ jsTestLog(`Downgrading to binary ${oldVersion} mongoS with FCV ${minFCV} shards`);
+ refreshCluster(oldVersion,
+ {upgradeMongos: true, upgradeShards: false, upgradeConfigs: false});
+
+ // We should no longer receive any PBRTs via mongoS, and we cannot resume from the HWMs.
+ jsTestLog(`Testing binary ${oldVersion} mongoS and binary ${latest40Version} shards`);
+ for (let collName in collMap) {
+ const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
+ coll: collMap[collName](),
+ expectPBRT: false,
+ hwmToResume: hwmTokenMap[collName],
+ expectResume: false
+ });
+ assert.eq(hwmToken, undefined);
+ }
+
+ // Downgrade a single shard to 'oldVersion'. We should continue to observe the same
+ // behaviour as we did in the previous test.
+ jsTestLog(`Downgrading shard1 to binary ${oldVersion} FCV ${minFCV}`);
+ refreshCluster(oldVersion, null, st.rs1);
+ for (let collName in collMap) {
+ const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
+ coll: collMap[collName](),
+ expectPBRT: false,
+ hwmToResume: hwmTokenMap[collName],
+ expectResume: false
+ });
+ assert.eq(hwmToken, undefined);
+ }
+
+ // Downgrade the remainder of the cluster to binary 'oldVersion'.
+ jsTestLog(`Downgrading to binary ${oldVersion} shards`);
+ refreshCluster(oldVersion,
+ {upgradeShards: true, upgradeConfigs: false, upgradeMongos: false});
+ refreshCluster(oldVersion,
+ {upgradeConfigs: true, upgradeShards: false, upgradeMongos: false});
+
+ // We should no longer receive any PBRTs, and we cannot resume from the HWM tokens.
+ jsTestLog(`Testing downgraded binary ${oldVersion} mongoS and shards`);
+ for (let collName in collMap) {
+ const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
+ coll: collMap[collName](),
+ expectPBRT: false,
+ hwmToResume: hwmTokenMap[collName],
+ expectResume: false
+ });
+ assert.eq(hwmToken, undefined);
+ }
+ }
+ }
+
+ // Run the upgrade/downgrade tests from both 3.6 and pre-backport 4.0 versions.
+ runUpgradeDowngradeTests(latest36Version, ["3.6"]);
+ runUpgradeDowngradeTests(preBackport40Version, ["3.6", "4.0"]);
+
+ st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/multiVersion/change_streams_high_water_mark_replset.js b/jstests/multiVersion/change_streams_high_water_mark_replset.js
new file mode 100644
index 00000000000..c06f5770e1f
--- /dev/null
+++ b/jstests/multiVersion/change_streams_high_water_mark_replset.js
@@ -0,0 +1,107 @@
+/**
+ * Tests that high water mark and postBatchResumeTokens are handled correctly during upgrade from
+ * and downgrade to both 3.6 and a pre-backport version of 4.0 on a single replica set.
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assertCreateCollection.
+ load("jstests/multiVersion/libs/change_stream_hwm_helpers.js"); // For ChangeStreamHWMHelpers.
+ load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet.
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+ const preBackport40Version = ChangeStreamHWMHelpers.preBackport40Version;
+ const latest40Version = ChangeStreamHWMHelpers.latest40Version;
+ const latest36Version = ChangeStreamHWMHelpers.latest36Version;
+
+ const rst = new ReplSetTest({
+ nodes: 2,
+ nodeOptions: {binVersion: latest36Version},
+ });
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+ rst.initiate();
+
+ // Obtain references to the test database and create the test collection.
+ let testDB = rst.getPrimary().getDB(jsTestName());
+ assert.commandWorked(testDB.createCollection("test"));
+ let testColl = testDB.test;
+
+ // Up- or downgrades the replset and then refreshes our references to the test collection.
+ function refreshReplSet(version) {
+ // Upgrade the set and wait for it to become available again.
+ rst.upgradeSet({binVersion: version});
+ rst.awaitReplication();
+
+ // Having upgraded the replset, reacquire references to the db and collection.
+ testDB = rst.getPrimary().getDB(jsTestName());
+ testColl = testDB.test;
+ }
+
+ /**
+ * Tests the behaviour of the replset while upgrading from 'oldVersion' to 'latest40Version'.
+ * The 'oldVersionFCVs' parameter specifies an array of FCVs supported by 'oldVersion'; the
+ * upgrade/downgrade procedure will be tested with each of these FCVs.
+ */
+ function runUpgradeDowngradeTests(oldVersion, oldVersionFCVs) {
+ for (let minFCV of oldVersionFCVs) {
+ // Stores a high water mark generated by the most recent test.
+ let hwmToken = null;
+
+ // We start with the replset running on 'oldVersion'. No streams should produce PBRTs.
+ jsTestLog(`Testing binary ${oldVersion}`);
+ refreshReplSet(oldVersion);
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: minFCV}));
+ hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
+ {coll: testColl, expectPBRT: false});
+ assert.eq(hwmToken, undefined);
+
+ // Upgrade the replset to 'latest40Version' but leave it in FCV 'minFCV'.
+ jsTestLog(`Upgrading to binary ${latest40Version} with FCV ${minFCV}`);
+ refreshReplSet(latest40Version);
+
+ // All streams should now return PBRTs, including high water marks.
+ jsTestLog(`Testing binary ${latest40Version} with FCV ${minFCV}`);
+ hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
+ {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true});
+ assert.neq(hwmToken, undefined);
+
+ // Set the replset's FCV to 4.0.
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"}));
+
+ // All streams should return PBRTs. We can resume with the HWM from the previous test.
+ jsTestLog(`Testing binary ${latest40Version} with FCV 4.0`);
+ hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
+ {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true});
+ assert.neq(hwmToken, undefined);
+
+ // Downgrade the replset to FCV 'minFCV'.
+ jsTestLog(`Downgrading to FCV ${minFCV}`);
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: minFCV}));
+
+ // All streams should return PBRTs and we can still resume from the last HWM token.
+ jsTestLog(`Testing binary ${latest40Version} with FCV ${minFCV}`);
+ hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
+ {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true});
+ assert.neq(hwmToken, undefined);
+
+ // Downgrade the replset to 'oldVersion'.
+ jsTestLog(`Downgrading to binary ${oldVersion}`);
+ refreshReplSet(oldVersion);
+
+ // We no longer receive PBRTs, and we cannot resume from the HWM token we generated.
+ jsTestLog(`Testing downgraded binary ${oldVersion}`);
+ ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
+ {coll: testColl, expectPBRT: false, hwmToResume: hwmToken, expectResume: false});
+ }
+ }
+
+ // Run the upgrade/downgrade tests from both 3.6 and pre-backport 4.0 versions.
+ runUpgradeDowngradeTests(latest36Version, ["3.6"]);
+ runUpgradeDowngradeTests(preBackport40Version, ["3.6", "4.0"]);
+
+ rst.stopSet();
+})(); \ No newline at end of file
diff --git a/jstests/multiVersion/libs/change_stream_hwm_helpers.js b/jstests/multiVersion/libs/change_stream_hwm_helpers.js
new file mode 100644
index 00000000000..8fbc1c4a5c1
--- /dev/null
+++ b/jstests/multiVersion/libs/change_stream_hwm_helpers.js
@@ -0,0 +1,119 @@
+/**
+ * Helper functions and constants to support the change stream high water mark multiversion tests.
+ */
+const ChangeStreamHWMHelpers = (function() {
+ /**
+ * Specifies the exact version to be used in tests which require pre-backport 4.0 binaries.
+ */
+ const preBackport40Version = "4.0.5";
+ const latest40Version = "latest";
+ const latest36Version = "3.6.7";
+
+ /**
+ * Opens a stream on the given collection, and confirms that a PBRT is or is not produced based
+ * on the value of 'expectPBRT'. The 'hwmToResume' argument is a high water mark from the last
+ * upgraded/downgraded incarnation of the cluster, and 'expectResume' indicates whether or not
+ * we should be able to resume using it. If 'expectPBRT' is true, we also verify that responses
+ * from the server have a postBatchResumeToken field, use it to obtain a high water mark token,
+ * and confirm that we can resume from this high water mark. Finally, if we generated a new HWM
+ * during this test, we return it.
+ */
+ function testPostBatchAndHighWaterMarkTokens({coll, expectPBRT, hwmToResume, expectResume}) {
+ // Log the test options for debugging.
+ jsTestLog(tojsononeline({
+ coll: coll.getFullName(),
+ conn: coll.getMongo(),
+ expectPBRT: expectPBRT,
+ hwmToResume: hwmToResume,
+ expectResume: expectResume
+ }));
+
+ // Verify that the command response object has a PBRT if 'expectPBRT' is true.
+ const csCmdResponse = assert.commandWorked(coll.runCommand(
+ {aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}}));
+ assert.eq(expectPBRT, csCmdResponse.cursor.hasOwnProperty("postBatchResumeToken"));
+
+ // Open a stream on the collection. If 'expectPBRT' is true then we should have a high water
+ // mark token immediately.
+ let csCursor = coll.watch();
+ assert(!csCursor.hasNext());
+ const newHWM = csCursor.getResumeToken();
+ assert.eq(expectPBRT, newHWM != null);
+
+ // Insert 10 documents into the test collection. If this is the sharded collection, we will
+ // alternate between writing to each shard.
+ for (let i = 0; i < 10; ++i) {
+ const id = (i % 2 ? i : (-i));
+ assert.commandWorked(coll.insert({_id: id}));
+ }
+
+ // Confirms that the expected _ids are seen by the change stream. If 'testResume' is true,
+ // also verifies that we see the correct results when resuming after or starting from each
+ // event.
+ function assertChangeStreamEvents(cursor, startId, testResume) {
+ for (let i = startId; i < 10; ++i) {
+ const id = (i % 2 ? i : (-i));
+ assert.soon(() => cursor.hasNext());
+ const curRes = cursor.next();
+ assert.eq(curRes.fullDocument._id, id);
+ if (testResume) {
+ let resumeCursor = coll.watch([], {resumeAfter: curRes._id});
+ assertChangeStreamEvents(resumeCursor, (i + 1), false);
+ resumeCursor.close();
+ // The 'startAtOperationTime' parameter does not exist on 3.6.
+ const startAtCmdRes = coll.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {startAtOperationTime: curRes.clusterTime}}],
+ cursor: {}
+ });
+ if (startAtCmdRes.ok) {
+ resumeCursor = new DBCommandCursor(coll.getDB(), startAtCmdRes);
+ assertChangeStreamEvents(resumeCursor, i, false);
+ resumeCursor.close();
+ }
+ }
+ }
+ }
+
+ // Verify that we can see all events, and can both resume after and start from each.
+ assertChangeStreamEvents(csCursor, 0, true);
+
+ // If we have a new high water mark token, confirm that we can resume after it.
+ if (newHWM) {
+ csCursor = coll.watch([], {resumeAfter: newHWM});
+ assertChangeStreamEvents(csCursor, 0, false);
+ }
+
+ // If we were passed a HWM resume token from before the cluster was upgraded/downgraded,
+ // verify that the resume behaviour is as expected.
+ const invalidResumeTokenVersion = [40647, 50795];
+ if (hwmToResume) {
+ if (!expectResume) {
+ // If we expect to fail the resume, confirm that we throw an assertion.
+ assert.commandFailedWithCode(coll.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {resumeAfter: hwmToResume}}],
+ cursor: {}
+ }),
+ invalidResumeTokenVersion);
+ } else {
+ // If we expect to be able to resume, confirm that we see the expected events.
+ csCursor = coll.watch([], {resumeAfter: hwmToResume});
+ assertChangeStreamEvents(csCursor, 0, false);
+ }
+ }
+
+ // Remove all documents in the test collection so we start with a clean slate next time.
+ assert.commandWorked(coll.remove({}));
+
+ // Return the high water mark token from the start of the stream.
+ return newHWM;
+ }
+
+ return {
+ testPostBatchAndHighWaterMarkTokens: testPostBatchAndHighWaterMarkTokens,
+ preBackport40Version: preBackport40Version,
+ latest40Version: latest40Version,
+ latest36Version: latest36Version,
+ };
+})(); \ No newline at end of file
diff --git a/jstests/noPassthrough/change_fcv_during_change_stream.js b/jstests/noPassthrough/change_fcv_during_change_stream.js
deleted file mode 100644
index fc51ff35ebe..00000000000
--- a/jstests/noPassthrough/change_fcv_during_change_stream.js
+++ /dev/null
@@ -1,109 +0,0 @@
-// Tests that a change stream's resume token format will adapt as the server's feature compatibility
-// version changes.
-// @tags: [requires_replication, uses_transactions]
-(function() {
- "use strict";
-
- const rst = new ReplSetTest({nodes: 1});
- rst.startSet();
- rst.initiate();
-
- const primary = rst.getPrimary();
- const testDB = primary.getDB("test");
-
- if (!testDB.serverStatus().storageEngine.supportsCommittedReads) {
- print(
- "Skipping change_fcv_during_change_stream.js since storageEngine doesn't support it.");
- rst.stopSet();
- return;
- }
-
- assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "3.6"}));
-
- const coll = testDB.change_fcv_during_change_stream;
- const changeStream36 = coll.watch([], {cursor: {batchSize: 1}});
-
- assert.commandWorked(coll.insert({_id: "first in 3.6"}));
- assert.commandWorked(coll.insert({_id: "second in 3.6"}));
-
- assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"}));
-
- assert.commandWorked(coll.insert({_id: "first in 4.0"}));
-
- const session = testDB.getMongo().startSession({causalConsistency: false});
- const sessionDb = session.getDatabase("test");
- const sessionColl = sessionDb.change_fcv_during_change_stream;
-
- session.startTransaction();
-
- assert.commandWorked(sessionColl.insert({_id: "first in transaction"}));
- assert.commandWorked(sessionColl.insert({_id: "second in transaction"}));
- assert.commandWorked(sessionColl.remove({_id: "second in transaction"}));
- assert.commandWorked(sessionColl.insert({_id: "second in transaction"}));
- assert.commandWorked(sessionColl.insert({_id: "third in transaction"}));
-
- session.commitTransaction();
- session.endSession();
-
- function assertResumeTokenUsesStringFormat(resumeToken) {
- assert.neq(resumeToken._data, undefined);
- assert.eq(typeof resumeToken._data,
- "string",
- () => "Resume token: " + tojson(resumeToken) + ", oplog contents: " +
- tojson(testDB.getSiblingDB("local").oplog.rs.find().toArray()));
- }
-
- function assertResumeTokenUsesBinDataFormat(resumeToken) {
- assert.neq(resumeToken._data, undefined);
- assert(resumeToken._data instanceof BinData, tojson(resumeToken));
- }
-
- // Start reading the change stream. The stream has been opened in 3.6 so make sure that the
- // resumeToken is in the appropriate format.
- assert.soon(() => changeStream36.hasNext());
- let nextChange = changeStream36.next();
- assert.eq(nextChange.documentKey._id, "first in 3.6");
- assertResumeTokenUsesBinDataFormat(nextChange._id);
-
- assert.soon(() => changeStream36.hasNext());
- nextChange = changeStream36.next();
- assert.eq(nextChange.documentKey._id, "second in 3.6");
- assertResumeTokenUsesBinDataFormat(nextChange._id);
-
- // At this point the server is switched to 4.0 and the resumeToken format should switch in 4.0
- // on the fly too.
- assert.soon(() => changeStream36.hasNext());
- nextChange = changeStream36.next();
- assert.eq(nextChange.documentKey._id, "first in 4.0");
- assertResumeTokenUsesStringFormat(nextChange._id);
-
- assert.soon(() => changeStream36.hasNext());
- nextChange = changeStream36.next();
- assert.eq(nextChange.documentKey._id, "first in transaction");
- assertResumeTokenUsesStringFormat(nextChange._id);
-
- assert.soon(() => changeStream36.hasNext());
- nextChange = changeStream36.next();
- assert.eq(nextChange.documentKey._id, "second in transaction");
- assertResumeTokenUsesStringFormat(nextChange._id);
-
- // Open a new change stream and position it in the middle of a transaction. Only the new 4.0
- // format resumeTokens are able to position correctly.
- const changeStream40 = coll.watch([], {resumeAfter: nextChange._id, cursor: {batchSize: 1}});
- assert.soon(() => changeStream40.hasNext());
- nextChange = changeStream40.next();
- assert.eq(nextChange.documentKey._id, "second in transaction");
- assertResumeTokenUsesStringFormat(nextChange._id);
-
- assert.soon(() => changeStream40.hasNext());
- nextChange = changeStream40.next();
- assert.eq(nextChange.documentKey._id, "second in transaction");
- assertResumeTokenUsesStringFormat(nextChange._id);
-
- assert.soon(() => changeStream40.hasNext());
- nextChange = changeStream40.next();
- assert.eq(nextChange.documentKey._id, "third in transaction");
- assertResumeTokenUsesStringFormat(nextChange._id);
-
- rst.stopSet();
-}());
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 5f2679e16a1..819a510a763 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -274,21 +274,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// 3) Look for 'applyOps' which were created as part of a transaction.
BSONObj applyOps = getTxnApplyOpsFilter(nsMatch["ns"], nss);
- // 4) Look for changes to the feature compatibility version. These show up as updates to the
- // admin.system.version collection.
- BSONObj fcvChange = BSON(
- "$and" << BSON_ARRAY(
- BSON("ns" << NamespaceString::kServerConfigurationNamespace.ns())
- // Ignore entries which correspond to the beginning of a transition. We only need to
- // care about those which actually commit a change to the feature compatibility version.
- << BSON("o.targetVersion" << BSON("$exists" << false))
- << BSON("o.version" << BSON("$exists" << true))));
-
// Match oplog entries after "start" and are either supported (1) commands or (2) operations,
// excepting those tagged "fromMigrate". Include the resume token, if resuming, so we can verify
// it was still present in the oplog.
return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom)
- << BSON(OR(opMatch, commandMatch, applyOps, fcvChange))
+ << BSON(OR(opMatch, commandMatch, applyOps))
<< BSON("fromMigrate" << NE << true)));
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 3265f5ff19a..323f7ec9d75 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -67,11 +67,6 @@ using std::vector;
namespace {
constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
-
-bool isFCVChange(const Document& input) {
- const auto ns = input[repl::OplogEntry::kNamespaceFieldName];
- return NamespaceString(ns.getStringData()).isServerConfigurationCollection();
-}
} // namespace
DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
@@ -81,12 +76,16 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
bool isIndependentOfAnyCollection)
: DocumentSource(expCtx),
_changeStreamSpec(changeStreamSpec.getOwned()),
- _resumeTokenFormat(
- fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40
- ? ResumeToken::SerializationFormat::kHexString
- : ResumeToken::SerializationFormat::kBinData),
_isIndependentOfAnyCollection(isIndependentOfAnyCollection) {
+ // If 'needsMerge' is true and 'mergeByPBRT' is false, then we are running on a sharded cluster
+ // that is mid-upgrade, and so we generate resume tokens that obey the FCV. Otherwise, we always
+ // generate v1 resume tokens regardless of whether we are in FCV 3.6 or upgraded to FCV 4.0.
+ _resumeTokenFormat = pExpCtx->needsMerge && !pExpCtx->mergeByPBRT &&
+ fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40
+ ? ResumeToken::SerializationFormat::kBinData
+ : ResumeToken::SerializationFormat::kHexString;
+
_nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns));
auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
@@ -178,6 +177,12 @@ ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts,
if (!uuid.missing())
resumeTokenData.uuid = uuid.getUuid();
+ // If 'needsMerge' is true and 'mergeByPBRT' is false, then we are running on a sharded cluster
+ // that is mid-upgrade. We therefore always generate v0 resume tokens for compatibility.
+ if (pExpCtx->needsMerge && !pExpCtx->mergeByPBRT) {
+ resumeTokenData.version = 0;
+ }
+
return resumeTokenData;
}
@@ -395,27 +400,6 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
return doc.freeze();
}
-void DocumentSourceChangeStreamTransform::switchResumeTokenFormat(
- const Document& fcvUpdateOplogEntry) {
- checkValueType(fcvUpdateOplogEntry[repl::OplogEntry::kObjectFieldName],
- repl::OplogEntry::kObjectFieldName,
- BSONType::Object);
- Document opObject = fcvUpdateOplogEntry[repl::OplogEntry::kObjectFieldName].getDocument();
- // The FCV update is done with a replacement-style update, so there will be no $set. Instead,
- // 'opObject' is the entire new document.
- Value newVersion = opObject["version"];
-
- checkValueType(newVersion, "o.version", BSONType::String);
- if (newVersion.getStringData() == "3.6") {
- _resumeTokenFormat = ResumeToken::SerializationFormat::kBinData;
- } else {
- uassert(65537,
- "Feature compatibility version updated to an unknown version",
- newVersion.getStringData() == "4.0");
- _resumeTokenFormat = ResumeToken::SerializationFormat::kHexString;
- }
-}
-
Value DocumentSourceChangeStreamTransform::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
Document changeStreamOptions(_changeStreamSpec);
@@ -478,13 +462,6 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
// Get the next input document.
auto input = pSource->getNext();
-
- // Check for FCV changes.
- while (input.isAdvanced() && isFCVChange(input.getDocument())) {
- switchResumeTokenFormat(input.getDocument());
- input = pSource->getNext();
- }
-
if (!input.isAdvanced()) {
return input;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 84219ed4f90..11e058544cf 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -116,12 +116,6 @@ private:
void initializeTransactionContext(const Document& input);
/**
- * Switches the format used to encode the resume token. This must happen if the stream sees an
- * update to the admin.system.version collection.
- */
- void switchResumeTokenFormat(const Document& fcvUpdateOplogEntry);
-
- /**
* Gets the next relevant applyOps entry that should be returned. If there is none, returns
* empty document.
*/