summaryrefslogtreecommitdiff
path: root/jstests
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-12-16 23:16:51 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-03-01 04:44:06 +0000
commit689bd54bb22d4ff22f0e280431e6429c2c8bea5d (patch)
treebdeb8e20e44a95ce5b10c4eda9fb19e34658605d /jstests
parent8f594961045240524c251ca0bf6b85bb8c099c50 (diff)
downloadmongo-689bd54bb22d4ff22f0e280431e6429c2c8bea5d.tar.gz
SERVER-38413 Always set an initial postBatchResumeToken on mongoD
(cherry picked from commit 1fd9bfa5867c3e7d6c1eebda53d5ccd8c637459e) (cherry picked from commit 5fe1af5f8e845f736f4e76d7a8d47675b6507a6c)
Diffstat (limited to 'jstests')
-rw-r--r--jstests/change_streams/report_post_batch_resume_token.js37
-rw-r--r--jstests/noPassthrough/change_streams_resume_at_same_clustertime.js64
2 files changed, 98 insertions, 3 deletions
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js
index 6a7dd48f8d2..b84cc6189f7 100644
--- a/jstests/change_streams/report_post_batch_resume_token.js
+++ b/jstests/change_streams/report_post_batch_resume_token.js
@@ -16,15 +16,39 @@
let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate.
const batchSize = 2;
- // Test that postBatchResumeToken is present on empty initial aggregate batch.
- let csCursor = testCollection.watch();
+ // Test that postBatchResumeToken is present on an initial aggregate of batchSize: 0.
+ let csCursor = testCollection.watch([], {cursor: {batchSize: 0}});
assert.eq(csCursor.objsLeftInBatch(), 0);
let initialAggPBRT = csCursor.getResumeToken();
assert.neq(undefined, initialAggPBRT);
- // Test that postBatchResumeToken is present on empty getMore batch.
+ // Test that the PBRT does not advance beyond its initial value for a change stream whose
+ // startAtOperationTime is in the future, even as writes are made to the test collection.
+ const timestampIn2100 = Timestamp(4102444800, 1);
+ csCursor = testCollection.watch([], {startAtOperationTime: timestampIn2100});
+ assert.eq(csCursor.objsLeftInBatch(), 0);
+ initialAggPBRT = csCursor.getResumeToken();
+ assert.neq(undefined, initialAggPBRT);
+
+ // Write some documents to the test collection.
+ for (let i = 0; i < 5; ++i) {
+ assert.commandWorked(testCollection.insert({_id: docId++}));
+ }
+
+ // Verify that no events are returned and the PBRT does not advance or go backwards.
assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
let getMorePBRT = csCursor.getResumeToken();
+ assert.eq(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
+
+ // Test that postBatchResumeToken is present on empty initial aggregate batch.
+ csCursor = testCollection.watch();
+ assert.eq(csCursor.objsLeftInBatch(), 0);
+ initialAggPBRT = csCursor.getResumeToken();
+ assert.neq(undefined, initialAggPBRT);
+
+ // Test that postBatchResumeToken is present on empty getMore batch.
+ assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ getMorePBRT = csCursor.getResumeToken();
assert.neq(undefined, getMorePBRT);
assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
@@ -46,6 +70,13 @@
assert.commandWorked(testCollection.insert({_id: docId++}));
}
+ // Test that the PBRT for a resumed stream is the given resume token if no result are returned.
+ csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: 0}});
+ assert.eq(csCursor.objsLeftInBatch(), 0);
+ initialAggPBRT = csCursor.getResumeToken();
+ assert.neq(undefined, initialAggPBRT);
+ assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0);
+
// Test that postBatchResumeToken is present on non-empty initial aggregate batch.
csCursor =
testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}});
diff --git a/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js b/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js
new file mode 100644
index 00000000000..627fdf4eb7a
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js
@@ -0,0 +1,64 @@
+/**
+ * Tests that a change stream can be resumed from the higher of two tokens on separate shards whose
+ * clusterTime is identical, differing only by documentKey, without causing the PBRT sent to mongoS
+ * to go back-in-time.
+ * @tags: [requires_replication, requires_journaling, requires_majority_read_concern]
+ */
+(function() {
+ "use strict";
+
+ const st =
+ new ShardingTest({shards: 2, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}});
+
+ const mongosDB = st.s.getDB(jsTestName());
+ const mongosColl = mongosDB.test;
+
+ // Enable sharding on the test DB and ensure its primary is shard0.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard on {_id:1}, split at {_id:0}, and move the upper chunk to shard1.
+ st.shardColl(mongosColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName(), true);
+
+ // Write one document to each shard.
+ assert.commandWorked(mongosColl.insert({_id: -10}));
+ assert.commandWorked(mongosColl.insert({_id: 10}));
+
+ // Open a change stream cursor to listen for subsequent events.
+ let csCursor = mongosColl.watch([], {cursor: {batchSize: 1}});
+
+ // Update both documents in the collection, such that the events will have the same clusterTime.
+ // We update twice to ensure that the PBRT for both shards moves past the first two updates.
+ assert.commandWorked(mongosColl.update({}, {$set: {updated: 1}}, {multi: true}));
+ assert.commandWorked(mongosColl.update({}, {$set: {updatedAgain: 1}}, {multi: true}));
+
+ // Retrieve the first two events, confirm that they are in order with the same clusterTime.
+ let clusterTime = null, updateEvent = null;
+ for (let id of[-10, 10]) {
+ assert.soon(() => csCursor.hasNext());
+ updateEvent = csCursor.next();
+ assert.eq(updateEvent.documentKey._id, id);
+ clusterTime = (clusterTime || updateEvent.clusterTime);
+ assert.eq(updateEvent.clusterTime, clusterTime);
+ assert.eq(updateEvent.updateDescription.updatedFields.updated, 1);
+ }
+
+ // Update both documents again, so that we will have something to observe after resuming.
+ assert.commandWorked(mongosColl.update({}, {$set: {updatedYetAgain: 1}}, {multi: true}));
+
+ // Resume from the second update, and confirm that we only see events starting with the third
+ // and fourth updates. We use batchSize:1 to induce mongoD to send each individual event to the
+ // mongoS when resuming, rather than scanning all the way to the most recent point in its oplog.
+ csCursor = mongosColl.watch([], {resumeAfter: updateEvent._id, cursor: {batchSize: 1}});
+ clusterTime = updateEvent = null;
+ for (let id of[-10, 10]) {
+ assert.soon(() => csCursor.hasNext());
+ updateEvent = csCursor.next();
+ assert.eq(updateEvent.documentKey._id, id);
+ clusterTime = (clusterTime || updateEvent.clusterTime);
+ assert.eq(updateEvent.clusterTime, clusterTime);
+ assert.eq(updateEvent.updateDescription.updatedFields.updatedAgain, 1);
+ }
+
+ st.stop();
+})(); \ No newline at end of file