summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-27 03:50:42 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-01-09 07:16:40 +0000
commit560a0c3a3e20924b362fc2b159c30255d62e81d2 (patch)
tree18a082ebad4c1e50f988025e8a53036836175400 /jstests/change_streams
parentec104311f774165b5b77b41b78c89e4f29baaca9 (diff)
downloadmongo-560a0c3a3e20924b362fc2b159c30255d62e81d2.tar.gz
SERVER-38411 Propagate postBatchResumeToken through mongoS to client
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/report_post_batch_resume_token.js66
-rw-r--r--jstests/change_streams/shell_helper_resume_token.js79
2 files changed, 30 insertions, 115 deletions
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js
index b84cc6189f7..1055288a9f5 100644
--- a/jstests/change_streams/report_post_batch_resume_token.js
+++ b/jstests/change_streams/report_post_batch_resume_token.js
@@ -1,5 +1,6 @@
/**
- * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken.
+ * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This
+ * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams.
* @tags: [uses_transactions]
*/
(function() {
@@ -55,7 +56,7 @@
// Test that postBatchResumeToken advances with returned events. Insert one document into the
// collection and consume the resulting change stream event.
assert.commandWorked(testCollection.insert({_id: docId++}));
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert(csCursor.objsLeftInBatch() == 1);
// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal
@@ -77,28 +78,16 @@
assert.neq(undefined, initialAggPBRT);
assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0);
- // Test that postBatchResumeToken is present on non-empty initial aggregate batch.
- csCursor =
- testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}});
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
- while (csCursor.objsLeftInBatch()) {
- csCursor.next();
- }
- // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the
- // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it.
- initialAggPBRT = csCursor.getResumeToken();
- assert.neq(undefined, initialAggPBRT);
- assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
-
// Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the
// observed postBatchResumeToken advanced.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
// The postBatchResumeToken is again equal to the final token in the batch, and greater than the
// PBRT from the initial response.
+ let eventFromCursor = null;
while (csCursor.objsLeftInBatch()) {
- resumeTokenFromDoc = csCursor.next()._id;
+ eventFromCursor = csCursor.next();
+ resumeTokenFromDoc = eventFromCursor._id;
}
getMorePBRT = csCursor.getResumeToken();
assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0);
@@ -106,15 +95,21 @@
// Test that postBatchResumeToken advances with writes to an unrelated collection. First make
// sure there is nothing left in our cursor, and obtain the latest PBRT...
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ while (eventFromCursor.fullDocument._id < (docId - 1)) {
+ assert.soon(() => csCursor.hasNext());
+ eventFromCursor = csCursor.next();
+ }
+ assert(!csCursor.hasNext());
let previousGetMorePBRT = csCursor.getResumeToken();
assert.neq(undefined, previousGetMorePBRT);
// ... then test that it advances on an insert to an unrelated collection.
assert.commandWorked(otherCollection.insert({}));
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
- getMorePBRT = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
+ assert.soon(() => {
+ assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ getMorePBRT = csCursor.getResumeToken();
+ return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0;
+ });
// Insert two documents into the collection which are of the maximum BSON object size.
const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize;
@@ -128,24 +123,25 @@
// Test that we return the correct postBatchResumeToken in the event that the batch hits the
// byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result,
// because the second result cannot fit in the batch.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
- // Verify that the postBatchResumeToken matches the last event actually added to the batch.
+ // Obtain the resume token and the PBRT from the first document.
resumeTokenFromDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
- // Now retrieve the second event and confirm that the PBRT matches its resume token.
+ // Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order.
previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
- resumeTokenFromDoc = csCursor.next()._id;
+ const resumeTokenFromSecondDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
- assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
+ assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0);
+ assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0);
+ assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0);
// Test that the PBRT is correctly updated when reading events from within a transaction.
+ csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}});
const session = db.getMongo().startSession();
const sessionDB = session.getDatabase(db.getName());
@@ -163,7 +159,7 @@
// Grab the next 2 events, which should be the first 2 events in the transaction.
previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 2);
// The clusterTime should be the same on each, but the resume token keeps advancing.
@@ -179,7 +175,7 @@
// Now get the next batch. This contains the third of the four transaction operations.
previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
// The clusterTime of this event is the same as the two events from the previous batch, but its
@@ -189,9 +185,7 @@
assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
// Because we wrote to the unrelated collection, the final event in the transaction does not
- // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
- // last event in the batch, because the unrelated event is within the same transaction and
- // therefore has the same clusterTime.
+ // appear in the batch. Confirm that the postBatchResumeToken has been set correctly.
getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
+ assert.gte(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
})();
diff --git a/jstests/change_streams/shell_helper_resume_token.js b/jstests/change_streams/shell_helper_resume_token.js
deleted file mode 100644
index 8cfd2328451..00000000000
--- a/jstests/change_streams/shell_helper_resume_token.js
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Tests that the cursor.getResumeToken() shell helper behaves as expected, tracking the resume
- * token with each document and returning the postBatchResumeToken as soon as each batch is
- * exhausted.
- */
-(function() {
- "use strict";
-
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
-
- // Drop and recreate collections to assure a clean run.
- const collName = "change_stream_shell_helper_resume_token";
- const csCollection = assertDropAndRecreateCollection(db, collName);
- const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
-
- const batchSize = 5;
-
- // Test that getResumeToken() returns the postBatchResumeToken when an empty batch is received.
- const csCursor = csCollection.watch([], {cursor: {batchSize: batchSize}});
- assert(!csCursor.hasNext());
- let curResumeToken = csCursor.getResumeToken();
- assert.neq(undefined, curResumeToken);
-
- // Test that advancing the oplog time updates the postBatchResumeToken, even with no results.
- assert.commandWorked(otherCollection.insert({}));
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
- let prevResumeToken = curResumeToken;
- curResumeToken = csCursor.getResumeToken();
- assert.neq(undefined, curResumeToken);
- assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
-
- // Insert 9 documents into the collection, followed by a write to the unrelated collection.
- for (let i = 0; i < 9; ++i) {
- assert.commandWorked(csCollection.insert({_id: i}));
- }
- assert.commandWorked(otherCollection.insert({}));
-
- // Retrieve the first batch of 5 events.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
-
- // We have not yet iterated any of the events. Verify that the resume token is unchanged.
- assert.docEq(curResumeToken, csCursor.getResumeToken());
-
- // For each event in the first batch, the resume token should match the document's _id.
- while (csCursor.objsLeftInBatch()) {
- const nextDoc = csCursor.next();
- prevResumeToken = curResumeToken;
- curResumeToken = csCursor.getResumeToken();
- assert.docEq(curResumeToken, nextDoc._id);
- assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
- }
-
- // Retrieve the second batch. This should be 4 documents.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), 4);
-
- // We haven't pulled any events out of the cursor yet, so the resumeToken should be unchanged.
- assert.docEq(curResumeToken, csCursor.getResumeToken());
-
- // For each of the first 3 events, the resume token should match the document's _id.
- while (csCursor.objsLeftInBatch() > 1) {
- const nextDoc = csCursor.next();
- prevResumeToken = curResumeToken;
- curResumeToken = csCursor.getResumeToken();
- assert.docEq(curResumeToken, nextDoc._id);
- assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
- }
-
- // When we pull the final document out of the cursor, the resume token should become the
- // postBatchResumeToken rather than the document's _id. Because we inserted an item into the
- // unrelated collection to push the oplog past the final event returned by the change stream,
- // this will be strictly greater than the final document's _id.
- const finalDoc = csCursor.next();
- prevResumeToken = curResumeToken;
- curResumeToken = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(finalDoc._id, prevResumeToken), 0);
- assert.gt(bsonWoCompare(curResumeToken, finalDoc._id), 0);
-}());