summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-03-20 17:30:14 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-04 14:42:00 -0400
commit527386807d3fc2897542e90539e3f5230922c865 (patch)
treeb12ccf54e0408d88e6f76319e50cdd5e5dd478ca /jstests/change_streams
parentea14c46d8a5b8d33412ef2c62b09475fe8533976 (diff)
downloadmongo-527386807d3fc2897542e90539e3f5230922c865.tar.gz
SERVER-33818: Add 'resumeWithClusterTime' as an alias for '$_resumeAfterClusterTime' in $changeStream
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/change_stream_start_at_cluster_time.js88
1 files changed, 88 insertions, 0 deletions
diff --git a/jstests/change_streams/change_stream_start_at_cluster_time.js b/jstests/change_streams/change_stream_start_at_cluster_time.js
new file mode 100644
index 00000000000..ee9e9698583
--- /dev/null
+++ b/jstests/change_streams/change_stream_start_at_cluster_time.js
@@ -0,0 +1,88 @@
+// Tests resuming change streams based on cluster time.
+(function() {
+ "use strict";
+
+ const coll = db[jsTestName()];
+ coll.drop();
+
+ assert.commandWorked(db.createCollection(coll.getName()));
+
+ const testStartTime = db.runCommand({isMaster: 1}).$clusterTime.clusterTime;
+
+ // Write a document to each chunk, and wait for replication.
+ assert.writeOK(coll.insert({_id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ // Perform two updates, capturing the cluster time of the first to be resumed from.
+ const res = coll.runCommand("update", {updates: [{q: {_id: -1}, u: {$set: {updated: true}}}]});
+ const resumeTime = res.$clusterTime.clusterTime;
+
+ assert.writeOK(coll.update({_id: 1}, {$set: {updated: true}}));
+
+ let changeStream = coll.watch([], {startAtClusterTime: {ts: resumeTime}});
+
+ // Test that we see the two updates.
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update", tojson(next));
+ assert.eq(next.documentKey._id, -1, tojson(next));
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update", tojson(next));
+ assert.eq(next.documentKey._id, 1, tojson(next));
+
+ // Test that startAtClusterTime is not allowed alongside resumeAfter or
+ // $_resumeAfterClusterTime.
+ assert.commandFailedWithCode(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTime}, resumeAfter: next._id}}],
+ cursor: {}
+ }),
+ 40674);
+
+ assert.commandFailedWithCode(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{
+ $changeStream: {
+ startAtClusterTime: {ts: resumeTime},
+ $_resumeAfterClusterTime: {ts: resumeTime}
+ }
+ }],
+ cursor: {}
+ }),
+ 50573);
+
+ // Test that resuming from a time in the future will wait for that time to come.
+ let resumeTimeFarFuture = db.runCommand({isMaster: 1}).$clusterTime.clusterTime;
+ resumeTimeFarFuture =
+ new Timestamp(resumeTimeFarFuture.getTime() + 60 * 60 * 6, 1); // 6 hours in the future
+
+ let changeStreamFuture = coll.watch([], {startAtClusterTime: {ts: resumeTimeFarFuture}});
+
+ // Resume the change stream from the start of the test and verify it picks up the changes to the
+ // collection. Namely, it should see two inserts followed by two updates.
+ changeStream = coll.watch([], {startAtClusterTime: {ts: testStartTime}});
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "insert", tojson(next));
+ assert.eq(next.documentKey._id, -1, tojson(next));
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "insert", tojson(next));
+ assert.eq(next.documentKey._id, 1, tojson(next));
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update", tojson(next));
+ assert.eq(next.documentKey._id, -1, tojson(next));
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update", tojson(next));
+ assert.eq(next.documentKey._id, 1, tojson(next));
+
+ // Verify that the change stream resumed from far into the future does not see any changes.
+ assert(!changeStreamFuture.hasNext());
+})();