diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-12-16 23:16:51 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-03-01 04:44:06 +0000 |
commit | 689bd54bb22d4ff22f0e280431e6429c2c8bea5d (patch) | |
tree | bdeb8e20e44a95ce5b10c4eda9fb19e34658605d /jstests | |
parent | 8f594961045240524c251ca0bf6b85bb8c099c50 (diff) | |
download | mongo-689bd54bb22d4ff22f0e280431e6429c2c8bea5d.tar.gz |
SERVER-38413 Always set an initial postBatchResumeToken on mongoD
(cherry picked from commit 1fd9bfa5867c3e7d6c1eebda53d5ccd8c637459e)
(cherry picked from commit 5fe1af5f8e845f736f4e76d7a8d47675b6507a6c)
Diffstat (limited to 'jstests')
-rw-r--r-- | jstests/change_streams/report_post_batch_resume_token.js | 37 | ||||
-rw-r--r-- | jstests/noPassthrough/change_streams_resume_at_same_clustertime.js | 64 |
2 files changed, 98 insertions, 3 deletions
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js index 6a7dd48f8d2..b84cc6189f7 100644 --- a/jstests/change_streams/report_post_batch_resume_token.js +++ b/jstests/change_streams/report_post_batch_resume_token.js @@ -16,15 +16,39 @@ let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate. const batchSize = 2; - // Test that postBatchResumeToken is present on empty initial aggregate batch. - let csCursor = testCollection.watch(); + // Test that postBatchResumeToken is present on an initial aggregate of batchSize: 0. + let csCursor = testCollection.watch([], {cursor: {batchSize: 0}}); assert.eq(csCursor.objsLeftInBatch(), 0); let initialAggPBRT = csCursor.getResumeToken(); assert.neq(undefined, initialAggPBRT); - // Test that postBatchResumeToken is present on empty getMore batch. + // Test that the PBRT does not advance beyond its initial value for a change stream whose + // startAtOperationTime is in the future, even as writes are made to the test collection. + const timestampIn2100 = Timestamp(4102444800, 1); + csCursor = testCollection.watch([], {startAtOperationTime: timestampIn2100}); + assert.eq(csCursor.objsLeftInBatch(), 0); + initialAggPBRT = csCursor.getResumeToken(); + assert.neq(undefined, initialAggPBRT); + + // Write some documents to the test collection. + for (let i = 0; i < 5; ++i) { + assert.commandWorked(testCollection.insert({_id: docId++})); + } + + // Verify that no events are returned and the PBRT does not advance or go backwards. assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. let getMorePBRT = csCursor.getResumeToken(); + assert.eq(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); + + // Test that postBatchResumeToken is present on empty initial aggregate batch. + csCursor = testCollection.watch(); + assert.eq(csCursor.objsLeftInBatch(), 0); + initialAggPBRT = csCursor.getResumeToken(); + assert.neq(undefined, initialAggPBRT); + + // Test that postBatchResumeToken is present on empty getMore batch. + assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + getMorePBRT = csCursor.getResumeToken(); assert.neq(undefined, getMorePBRT); assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); @@ -46,6 +70,13 @@ assert.commandWorked(testCollection.insert({_id: docId++})); } + // Test that the PBRT for a resumed stream is the given resume token if no result are returned. + csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: 0}}); + assert.eq(csCursor.objsLeftInBatch(), 0); + initialAggPBRT = csCursor.getResumeToken(); + assert.neq(undefined, initialAggPBRT); + assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0); + // Test that postBatchResumeToken is present on non-empty initial aggregate batch. csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}}); diff --git a/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js b/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js new file mode 100644 index 00000000000..627fdf4eb7a --- /dev/null +++ b/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js @@ -0,0 +1,64 @@ +/** + * Tests that a change stream can be resumed from the higher of two tokens on separate shards whose + * clusterTime is identical, differing only by documentKey, without causing the PBRT sent to mongoS + * to go back-in-time. + * @tags: [requires_replication, requires_journaling, requires_majority_read_concern] + */ +(function() { + "use strict"; + + const st = + new ShardingTest({shards: 2, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}}); + + const mongosDB = st.s.getDB(jsTestName()); + const mongosColl = mongosDB.test; + + // Enable sharding on the test DB and ensure its primary is shard0. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard on {_id:1}, split at {_id:0}, and move the upper chunk to shard1. + st.shardColl(mongosColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName(), true); + + // Write one document to each shard. + assert.commandWorked(mongosColl.insert({_id: -10})); + assert.commandWorked(mongosColl.insert({_id: 10})); + + // Open a change stream cursor to listen for subsequent events. + let csCursor = mongosColl.watch([], {cursor: {batchSize: 1}}); + + // Update both documents in the collection, such that the events will have the same clusterTime. + // We update twice to ensure that the PBRT for both shards moves past the first two updates. + assert.commandWorked(mongosColl.update({}, {$set: {updated: 1}}, {multi: true})); + assert.commandWorked(mongosColl.update({}, {$set: {updatedAgain: 1}}, {multi: true})); + + // Retrieve the first two events, confirm that they are in order with the same clusterTime. + let clusterTime = null, updateEvent = null; + for (let id of[-10, 10]) { + assert.soon(() => csCursor.hasNext()); + updateEvent = csCursor.next(); + assert.eq(updateEvent.documentKey._id, id); + clusterTime = (clusterTime || updateEvent.clusterTime); + assert.eq(updateEvent.clusterTime, clusterTime); + assert.eq(updateEvent.updateDescription.updatedFields.updated, 1); + } + + // Update both documents again, so that we will have something to observe after resuming. + assert.commandWorked(mongosColl.update({}, {$set: {updatedYetAgain: 1}}, {multi: true})); + + // Resume from the second update, and confirm that we only see events starting with the third + // and fourth updates. We use batchSize:1 to induce mongoD to send each individual event to the + // mongoS when resuming, rather than scanning all the way to the most recent point in its oplog. + csCursor = mongosColl.watch([], {resumeAfter: updateEvent._id, cursor: {batchSize: 1}}); + clusterTime = updateEvent = null; + for (let id of[-10, 10]) { + assert.soon(() => csCursor.hasNext()); + updateEvent = csCursor.next(); + assert.eq(updateEvent.documentKey._id, id); + clusterTime = (clusterTime || updateEvent.clusterTime); + assert.eq(updateEvent.clusterTime, clusterTime); + assert.eq(updateEvent.updateDescription.updatedFields.updatedAgain, 1); + } + + st.stop(); +})();
\ No newline at end of file |