diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-01-23 16:32:45 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-28 01:13:38 +0000 |
commit | 25dea813a6f3a63bd0c39c075457379daadc1e32 (patch) | |
tree | 6a53fd3cf6c27ce5ae81224e380470acaaf03874 /jstests/multiVersion/libs | |
parent | a74b2f39025fee2c59aa5437deea6e06f05e18ca (diff) | |
download | mongo-25dea813a6f3a63bd0c39c075457379daadc1e32.tar.gz |
SERVER-38414 Upgrade/Downgrade testing for change stream high water marks
Diffstat (limited to 'jstests/multiVersion/libs')
-rw-r--r-- | jstests/multiVersion/libs/change_stream_hwm_helpers.js | 106 | ||||
-rw-r--r-- | jstests/multiVersion/libs/index_format_downgrade.js | 51 |
2 files changed, 157 insertions, 0 deletions
diff --git a/jstests/multiVersion/libs/change_stream_hwm_helpers.js b/jstests/multiVersion/libs/change_stream_hwm_helpers.js new file mode 100644 index 00000000000..448b000c838 --- /dev/null +++ b/jstests/multiVersion/libs/change_stream_hwm_helpers.js @@ -0,0 +1,106 @@ +/** + * Helper functions and constants to support the change stream high water mark multiversion tests. + */ +const ChangeStreamHWMHelpers = (function() { + /** + * Specifies the exact version to be used in tests which require pre-backport 4.0 binaries. + */ + const preBackport40Version = "4.0.5"; + const latest42Version = "latest"; + + /** + * Opens a stream on the given collection, and confirms that a PBRT is or is not produced based + * on the value of 'expectPBRT'. The 'hwmToResume' argument is a high water mark from an earlier + * upgraded/downgraded incarnation of the cluster, and 'expectResume' indicates whether or not + * we should be able to resume from it. If 'expectPBRT' is true, we also generate a new high + * water mark token, confirm that we can resume from it, and return it to the caller. + */ + function testPostBatchAndHighWaterMarkTokens({coll, expectPBRT, hwmToResume, expectResume}) { + // Log the test options for debugging. + jsTestLog(tojsononeline({ + coll: coll.getFullName(), + conn: coll.getMongo(), + expectPBRT: expectPBRT, + hwmToResume: hwmToResume, + expectResume: expectResume + })); + + // Verify that the command response object has a PBRT if 'expectPBRT' is true. + const csCmdResponse = assert.commandWorked(coll.runCommand( + {aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}})); + assert.eq(expectPBRT, csCmdResponse.cursor.hasOwnProperty("postBatchResumeToken")); + + // Open a stream on the collection. If 'expectPBRT' is true then we should have a high water + // mark token immediately. + let csCursor = coll.watch(); + assert(!csCursor.hasNext()); + const newHWM = csCursor.getResumeToken(); + assert.eq(expectPBRT, newHWM != null); + + // Insert 10 documents into the test collection. If this is the sharded collection, we will + // alternate between writing to each shard. + for (let i = 0; i < 10; ++i) { + const id = (i % 2 ? i : (-i)); + assert.commandWorked(coll.insert({_id: id})); + } + + // Confirms that the expected _ids are seen by the change stream. If 'testResume' is true, + // also verifies that we see the correct results when resuming/starting at each event. + function assertChangeStreamEvents(cursor, startId, testResume) { + for (let i = startId; i < 10; ++i) { + const id = (i % 2 ? i : (-i)); + assert.soon(() => cursor.hasNext()); + const curRes = cursor.next(); + assert.eq(curRes.fullDocument._id, id); + if (testResume) { + let resumeCursor = coll.watch([], {resumeAfter: curRes._id}); + assertChangeStreamEvents(resumeCursor, (i + 1), false); + resumeCursor.close(); + resumeCursor = coll.watch([], {startAtOperationTime: curRes.clusterTime}); + assertChangeStreamEvents(resumeCursor, i, false); + resumeCursor.close(); + } + } + } + + // Verify that we can see all events, and can both resume after and start from each. + assertChangeStreamEvents(csCursor, 0, true); + + // If we have a new high water mark token, confirm that we can resume after it. + if (newHWM) { + csCursor = coll.watch([], {resumeAfter: newHWM}); + assertChangeStreamEvents(csCursor, 0, false); + } + + // If we were passed a HWM resume token from before the cluster was upgraded/downgraded, + // verify that the resume behaviour is as expected. + const invalidResumeTokenVersion = 50795; + if (hwmToResume) { + if (!expectResume) { + // If we expect to fail the resume, confirm that we throw an assertion. + assert.commandFailedWithCode(coll.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {resumeAfter: hwmToResume}}], + cursor: {} + }), + invalidResumeTokenVersion); + } else { + // If we expect to be able to resume, confirm that we see the expected events. + csCursor = coll.watch([], {resumeAfter: hwmToResume}); + assertChangeStreamEvents(csCursor, 0, false); + } + } + + // Remove all documents in the test collection so we start with a clean slate next time. + assert.commandWorked(coll.remove({})); + + // Return the high water mark token from the start of the stream. + return newHWM; + } + + return { + testPostBatchAndHighWaterMarkTokens: testPostBatchAndHighWaterMarkTokens, + preBackport40Version: preBackport40Version, + latest42Version: latest42Version, + }; +})();
\ No newline at end of file diff --git a/jstests/multiVersion/libs/index_format_downgrade.js b/jstests/multiVersion/libs/index_format_downgrade.js new file mode 100644 index 00000000000..4c30ef7ed00 --- /dev/null +++ b/jstests/multiVersion/libs/index_format_downgrade.js @@ -0,0 +1,51 @@ +/** + * For FCV 4.2, MongoDB uses a new internal format for unique indexes that is incompatible with 4.0. + * The new format applies to both existing unique indexes as well as newly created/rebuilt unique + * indexes. This helper function rebuilds all unique indexes on an instance after downgrading to FCV + * 4.0, for backwards compatibility with binary 4.0. Because this is an internal change, the index + * version is retained through rebuilding. + */ +function downgradeUniqueIndexes(db) { + // Obtain a list of v:1 and v:2 unique indexes. + const unique_idx_v1 = []; + const unique_idx_v2 = []; + db.adminCommand("listDatabases").databases.forEach(function(d) { + let mdb = db.getSiblingDB(d.name); + mdb.getCollectionInfos().forEach(function(c) { + let currentCollection = mdb.getCollection(c.name); + currentCollection.getIndexes().forEach(function(i) { + if (i.unique) { + if (i.v === 1) { + unique_idx_v1.push(i); + } else { + unique_idx_v2.push(i); + } + return; + } + }); + }); + }); + // Drop and recreate all v:1 indexes + for (let idx of unique_idx_v1) { + let [dbName, collName] = idx.ns.split("."); + let res = db.getSiblingDB(dbName).runCommand({dropIndexes: collName, index: idx.name}); + assert.commandWorked(res); + res = db.getSiblingDB(dbName).runCommand({ + createIndexes: collName, + indexes: [{"key": idx.key, "name": idx.name, "unique": true, "v": 1}] + }); + assert.commandWorked(res); + } + + // Drop and recreate all v:2 indexes + for (let idx of unique_idx_v2) { + let [dbName, collName] = idx.ns.split("."); + let res = db.getSiblingDB(dbName).runCommand({dropIndexes: collName, index: idx.name}); + assert.commandWorked(res); + res = db.getSiblingDB(dbName).runCommand({ + createIndexes: collName, + indexes: [{"key": idx.key, "name": idx.name, "unique": true, "v": 2}] + }); + assert.commandWorked(res); + } +} |