path: root/jstests/multiVersion/libs
diff options
authorBernard Gorman <>2019-01-23 16:32:45 +0000
committerBernard Gorman <>2019-02-28 01:13:38 +0000
commit25dea813a6f3a63bd0c39c075457379daadc1e32 (patch)
tree6a53fd3cf6c27ce5ae81224e380470acaaf03874 /jstests/multiVersion/libs
parenta74b2f39025fee2c59aa5437deea6e06f05e18ca (diff)
SERVER-38414 Upgrade/Downgrade testing for change stream high water marks
Diffstat (limited to 'jstests/multiVersion/libs')
2 files changed, 157 insertions, 0 deletions
diff --git a/jstests/multiVersion/libs/change_stream_hwm_helpers.js b/jstests/multiVersion/libs/change_stream_hwm_helpers.js
new file mode 100644
index 00000000000..448b000c838
--- /dev/null
+++ b/jstests/multiVersion/libs/change_stream_hwm_helpers.js
@@ -0,0 +1,106 @@
+ * Helper functions and constants to support the change stream high water mark multiversion tests.
+ */
+const ChangeStreamHWMHelpers = (function() {
+ /**
+ * Specifies the exact version to be used in tests which require pre-backport 4.0 binaries.
+ */
+ const preBackport40Version = "4.0.5";
+ const latest42Version = "latest";
+ /**
+ * Opens a stream on the given collection, and confirms that a PBRT is or is not produced based
+ * on the value of 'expectPBRT'. The 'hwmToResume' argument is a high water mark from an earlier
+ * upgraded/downgraded incarnation of the cluster, and 'expectResume' indicates whether or not
+ * we should be able to resume from it. If 'expectPBRT' is true, we also generate a new high
+ * water mark token, confirm that we can resume from it, and return it to the caller.
+ */
+ function testPostBatchAndHighWaterMarkTokens({coll, expectPBRT, hwmToResume, expectResume}) {
+ // Log the test options for debugging.
+ jsTestLog(tojsononeline({
+ coll: coll.getFullName(),
+ conn: coll.getMongo(),
+ expectPBRT: expectPBRT,
+ hwmToResume: hwmToResume,
+ expectResume: expectResume
+ }));
+ // Verify that the command response object has a PBRT if 'expectPBRT' is true.
+ const csCmdResponse = assert.commandWorked(coll.runCommand(
+ {aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}}));
+ assert.eq(expectPBRT, csCmdResponse.cursor.hasOwnProperty("postBatchResumeToken"));
+ // Open a stream on the collection. If 'expectPBRT' is true then we should have a high water
+ // mark token immediately.
+ let csCursor =;
+ assert(!csCursor.hasNext());
+ const newHWM = csCursor.getResumeToken();
+ assert.eq(expectPBRT, newHWM != null);
+ // Insert 10 documents into the test collection. If this is the sharded collection, we will
+ // alternate between writing to each shard.
+ for (let i = 0; i < 10; ++i) {
+ const id = (i % 2 ? i : (-i));
+ assert.commandWorked(coll.insert({_id: id}));
+ }
+ // Confirms that the expected _ids are seen by the change stream. If 'testResume' is true,
+ // also verifies that we see the correct results when resuming/starting at each event.
+ function assertChangeStreamEvents(cursor, startId, testResume) {
+ for (let i = startId; i < 10; ++i) {
+ const id = (i % 2 ? i : (-i));
+ assert.soon(() => cursor.hasNext());
+ const curRes =;
+ assert.eq(curRes.fullDocument._id, id);
+ if (testResume) {
+ let resumeCursor =[], {resumeAfter: curRes._id});
+ assertChangeStreamEvents(resumeCursor, (i + 1), false);
+ resumeCursor.close();
+ resumeCursor =[], {startAtOperationTime: curRes.clusterTime});
+ assertChangeStreamEvents(resumeCursor, i, false);
+ resumeCursor.close();
+ }
+ }
+ }
+ // Verify that we can see all events, and can both resume after and start from each.
+ assertChangeStreamEvents(csCursor, 0, true);
+ // If we have a new high water mark token, confirm that we can resume after it.
+ if (newHWM) {
+ csCursor =[], {resumeAfter: newHWM});
+ assertChangeStreamEvents(csCursor, 0, false);
+ }
+ // If we were passed a HWM resume token from before the cluster was upgraded/downgraded,
+ // verify that the resume behaviour is as expected.
+ const invalidResumeTokenVersion = 50795;
+ if (hwmToResume) {
+ if (!expectResume) {
+ // If we expect to fail the resume, confirm that we throw an assertion.
+ assert.commandFailedWithCode(coll.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {resumeAfter: hwmToResume}}],
+ cursor: {}
+ }),
+ invalidResumeTokenVersion);
+ } else {
+ // If we expect to be able to resume, confirm that we see the expected events.
+ csCursor =[], {resumeAfter: hwmToResume});
+ assertChangeStreamEvents(csCursor, 0, false);
+ }
+ }
+ // Remove all documents in the test collection so we start with a clean slate next time.
+ assert.commandWorked(coll.remove({}));
+ // Return the high water mark token from the start of the stream.
+ return newHWM;
+ }
+ return {
+ testPostBatchAndHighWaterMarkTokens: testPostBatchAndHighWaterMarkTokens,
+ preBackport40Version: preBackport40Version,
+ latest42Version: latest42Version,
+ };
+})(); \ No newline at end of file
diff --git a/jstests/multiVersion/libs/index_format_downgrade.js b/jstests/multiVersion/libs/index_format_downgrade.js
new file mode 100644
index 00000000000..4c30ef7ed00
--- /dev/null
+++ b/jstests/multiVersion/libs/index_format_downgrade.js
@@ -0,0 +1,51 @@
+ * For FCV 4.2, MongoDB uses a new internal format for unique indexes that is incompatible with 4.0.
+ * The new format applies to both existing unique indexes as well as newly created/rebuilt unique
+ * indexes. This helper function rebuilds all unique indexes on an instance after downgrading to FCV
+ * 4.0, for backwards compatibility with binary 4.0. Because this is an internal change, the index
+ * version is retained through rebuilding.
+ */
+function downgradeUniqueIndexes(db) {
+ // Obtain a list of v:1 and v:2 unique indexes.
+ const unique_idx_v1 = [];
+ const unique_idx_v2 = [];
+ db.adminCommand("listDatabases").databases.forEach(function(d) {
+ let mdb = db.getSiblingDB(;
+ mdb.getCollectionInfos().forEach(function(c) {
+ let currentCollection = mdb.getCollection(;
+ currentCollection.getIndexes().forEach(function(i) {
+ if (i.unique) {
+ if (i.v === 1) {
+ unique_idx_v1.push(i);
+ } else {
+ unique_idx_v2.push(i);
+ }
+ return;
+ }
+ });
+ });
+ });
+ // Drop and recreate all v:1 indexes
+ for (let idx of unique_idx_v1) {
+ let [dbName, collName] = idx.ns.split(".");
+ let res = db.getSiblingDB(dbName).runCommand({dropIndexes: collName, index:});
+ assert.commandWorked(res);
+ res = db.getSiblingDB(dbName).runCommand({
+ createIndexes: collName,
+ indexes: [{"key": idx.key, "name":, "unique": true, "v": 1}]
+ });
+ assert.commandWorked(res);
+ }
+ // Drop and recreate all v:2 indexes
+ for (let idx of unique_idx_v2) {
+ let [dbName, collName] = idx.ns.split(".");
+ let res = db.getSiblingDB(dbName).runCommand({dropIndexes: collName, index:});
+ assert.commandWorked(res);
+ res = db.getSiblingDB(dbName).runCommand({
+ createIndexes: collName,
+ indexes: [{"key": idx.key, "name":, "unique": true, "v": 2}]
+ });
+ assert.commandWorked(res);
+ }