summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-09-19 10:18:39 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-09-19 10:22:32 -0400
commit84dde9f345606f8794eafc1b244902f8f6846d85 (patch)
treea4acfefdb799d776e00bd7fec9bd9f2b0762c127 /jstests/change_streams
parent3eedd24bb8448e9a491a03d8eaa21999d4ea9405 (diff)
downloadmongo-84dde9f345606f8794eafc1b244902f8f6846d85.tar.gz
SERVER-29929 Report highest observed cluster time from change stream on each shard
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/report_latest_observed_oplog_timestamp.js96
1 files changed, 96 insertions, 0 deletions
diff --git a/jstests/change_streams/report_latest_observed_oplog_timestamp.js b/jstests/change_streams/report_latest_observed_oplog_timestamp.js
new file mode 100644
index 00000000000..7345cfc4667
--- /dev/null
+++ b/jstests/change_streams/report_latest_observed_oplog_timestamp.js
@@ -0,0 +1,96 @@
+// Tests that an aggregate with a $changeStream stage will report the latest optime read in
+// the oplog by its cursor. This is information is needed in order to correctly merge the results
+// from the various shards on mongos.
+(function() {
+ "use strict";
+
+ const testName = "report_latest_observed_oplog_timestamp";
+ const cursorCollection = db.getCollection(testName);
+ const otherCollection = db.getCollection("unrelated_" + testName);
+
+ // Drop collections to assure a clean run. Collections may not exist so do not check result.
+ cursorCollection.drop();
+ otherCollection.drop();
+
+ // Get a resume point.
+ jsTestLog("Getting a resume point.");
+ const batchSize = 2;
+ assert.commandWorked(db.createCollection(cursorCollection.getName()));
+ const firstResponse = assert.commandWorked(cursorCollection.runCommand(
+ {aggregate: testName, pipeline: [{$changeStream: {}}], cursor: {batchSize: batchSize}}));
+ assert.eq(0, firstResponse.cursor.firstBatch.length);
+ assert.writeOK(cursorCollection.insert({_id: 0}));
+
+ function iterateCursor(initialCursorResponse) {
+ const getMoreCollName = initialCursorResponse.cursor.ns.substr(
+ initialCursorResponse.cursor.ns.indexOf('.') + 1);
+ return assert.commandWorked(cursorCollection.runCommand({
+ getMore: initialCursorResponse.cursor.id,
+ collection: getMoreCollName,
+ batchSize: batchSize
+ }));
+ }
+ const resumeResponse = iterateCursor(firstResponse);
+ assert.eq(1, resumeResponse.cursor.nextBatch.length);
+ // Because needsMerge was not set, the latest oplog timestamp should not be returned.
+ assert.eq(undefined, resumeResponse.$_internalLatestOplogTimestamp);
+ const resumeToken = resumeResponse.cursor.nextBatch[0]["_id"];
+
+ // Seed the collection with enough documents to fit in one batch.
+ // Note the resume document is included; when needsMerge is true, we see the resume token
+ // in the stream.
+ jsTestLog("Adding documents to collection.");
+ for (let i = 1; i < batchSize * 2; i++) {
+ assert.writeOK(cursorCollection.insert({_id: i}, {writeConcern: {w: 1}}));
+ }
+
+ // TODO: SERVER-29126
+ // While change streams still uses read concern level local instead of read concern level
+ // majority, we need to use causal consistency to be able to immediately read our own writes out
+ // of the oplog. Once change streams read from the majority snapshot, we can remove this
+ // synchronization point from this test.
+ assert.commandWorked(db.runCommand({
+ find: "foo",
+ readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ }));
+
+ // Look at one batch's worth.
+ jsTestLog("Testing that operation time is present on initial aggregate command.");
+ const cursorResponse = assert.commandWorked(cursorCollection.runCommand({
+ aggregate: testName,
+ // The latest observed optime is only reported when needsMerge is set, and needsMerge
+ // requires fromMongos be set.
+ needsMerge: true,
+ fromMongos: true,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {batchSize: batchSize}
+ }));
+ const firstBatchOplogTimestamp = cursorResponse.$_internalLatestOplogTimestamp;
+ assert.neq(undefined, firstBatchOplogTimestamp, tojson(cursorResponse));
+
+ // Iterate the cursor and assert that the observed operation time advanced.
+ jsTestLog("Testing that operation time advances with getMore.");
+ let getMoreResponse = iterateCursor(cursorResponse);
+ const getMoreOplogTimestamp = getMoreResponse.$_internalLatestOplogTimestamp;
+ assert.neq(undefined, getMoreOplogTimestamp, tojson(getMoreResponse));
+ assert.gt(getMoreOplogTimestamp, firstBatchOplogTimestamp);
+
+ // Now make sure that the reported operation time advances if there are writes to an unrelated
+ // collection.
+ jsTestLog("Testing that operation time advances with writes to an unrelated collection.");
+
+ // First make sure there is nothing left in our cursor.
+ getMoreResponse = iterateCursor(cursorResponse);
+ assert.eq(getMoreResponse.cursor.nextBatch, []);
+
+ // Record that operation time, then test that the reported time advances on an insert to an
+ // unrelated collection.
+ const oplogTimeAtExhaust = getMoreResponse.$_internalLatestOplogTimestamp;
+ assert.neq(undefined, oplogTimeAtExhaust, tojson(getMoreResponse));
+ assert.writeOK(otherCollection.insert({}));
+
+ getMoreResponse = iterateCursor(cursorResponse);
+ const oplogTimeAfterUnrelatedInsert = getMoreResponse.$_internalLatestOplogTimestamp;
+ assert.neq(undefined, oplogTimeAtExhaust, tojson(getMoreResponse));
+ assert.gt(oplogTimeAfterUnrelatedInsert, oplogTimeAtExhaust);
+})();