diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-03-20 17:30:14 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-04 14:42:00 -0400 |
commit | 527386807d3fc2897542e90539e3f5230922c865 (patch) | |
tree | b12ccf54e0408d88e6f76319e50cdd5e5dd478ca /jstests/change_streams | |
parent | ea14c46d8a5b8d33412ef2c62b09475fe8533976 (diff) | |
download | mongo-527386807d3fc2897542e90539e3f5230922c865.tar.gz |
SERVER-33818: Add 'resumeWithClusterTime' as an alias for '$_resumeAfterClusterTime' in $changeStream
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/change_stream_start_at_cluster_time.js | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/jstests/change_streams/change_stream_start_at_cluster_time.js b/jstests/change_streams/change_stream_start_at_cluster_time.js new file mode 100644 index 00000000000..ee9e9698583 --- /dev/null +++ b/jstests/change_streams/change_stream_start_at_cluster_time.js @@ -0,0 +1,88 @@ +// Tests resuming change streams based on cluster time. +(function() { + "use strict"; + + const coll = db[jsTestName()]; + coll.drop(); + + assert.commandWorked(db.createCollection(coll.getName())); + + const testStartTime = db.runCommand({isMaster: 1}).$clusterTime.clusterTime; + + // Write a document to each chunk, and wait for replication. + assert.writeOK(coll.insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + // Perform two updates, capturing the cluster time of the first to be resumed from. + const res = coll.runCommand("update", {updates: [{q: {_id: -1}, u: {$set: {updated: true}}}]}); + const resumeTime = res.$clusterTime.clusterTime; + + assert.writeOK(coll.update({_id: 1}, {$set: {updated: true}})); + + let changeStream = coll.watch([], {startAtClusterTime: {ts: resumeTime}}); + + // Test that we see the two updates. + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "update", tojson(next)); + assert.eq(next.documentKey._id, -1, tojson(next)); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update", tojson(next)); + assert.eq(next.documentKey._id, 1, tojson(next)); + + // Test that startAtClusterTime is not allowed alongside resumeAfter or + // $_resumeAfterClusterTime. + assert.commandFailedWithCode(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTime}, resumeAfter: next._id}}], + cursor: {} + }), + 40674); + + assert.commandFailedWithCode(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{ + $changeStream: { + startAtClusterTime: {ts: resumeTime}, + $_resumeAfterClusterTime: {ts: resumeTime} + } + }], + cursor: {} + }), + 50573); + + // Test that resuming from a time in the future will wait for that time to come. + let resumeTimeFarFuture = db.runCommand({isMaster: 1}).$clusterTime.clusterTime; + resumeTimeFarFuture = + new Timestamp(resumeTimeFarFuture.getTime() + 60 * 60 * 6, 1); // 6 hours in the future + + let changeStreamFuture = coll.watch([], {startAtClusterTime: {ts: resumeTimeFarFuture}}); + + // Resume the change stream from the start of the test and verify it picks up the changes to the + // collection. Namely, it should see two inserts followed by two updates. + changeStream = coll.watch([], {startAtClusterTime: {ts: testStartTime}}); + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "insert", tojson(next)); + assert.eq(next.documentKey._id, -1, tojson(next)); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "insert", tojson(next)); + assert.eq(next.documentKey._id, 1, tojson(next)); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update", tojson(next)); + assert.eq(next.documentKey._id, -1, tojson(next)); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update", tojson(next)); + assert.eq(next.documentKey._id, 1, tojson(next)); + + // Verify that the change stream resumed from far into the future does not see any changes. + assert(!changeStreamFuture.hasNext()); +})(); |