summaryrefslogtreecommitdiff
path: root/jstests/change_streams/report_post_batch_resume_token.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams/report_post_batch_resume_token.js')
-rw-r--r--jstests/change_streams/report_post_batch_resume_token.js66
1 files changed, 30 insertions, 36 deletions
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js
index b84cc6189f7..1055288a9f5 100644
--- a/jstests/change_streams/report_post_batch_resume_token.js
+++ b/jstests/change_streams/report_post_batch_resume_token.js
@@ -1,5 +1,6 @@
/**
- * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken.
+ * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This
+ * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams.
* @tags: [uses_transactions]
*/
(function() {
@@ -55,7 +56,7 @@
// Test that postBatchResumeToken advances with returned events. Insert one document into the
// collection and consume the resulting change stream event.
assert.commandWorked(testCollection.insert({_id: docId++}));
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert(csCursor.objsLeftInBatch() == 1);
// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal
@@ -77,28 +78,16 @@
assert.neq(undefined, initialAggPBRT);
assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0);
- // Test that postBatchResumeToken is present on non-empty initial aggregate batch.
- csCursor =
- testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}});
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
- while (csCursor.objsLeftInBatch()) {
- csCursor.next();
- }
- // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the
- // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it.
- initialAggPBRT = csCursor.getResumeToken();
- assert.neq(undefined, initialAggPBRT);
- assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
-
// Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the
// observed postBatchResumeToken advanced.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
// The postBatchResumeToken is again equal to the final token in the batch, and greater than the
// PBRT from the initial response.
+ let eventFromCursor = null;
while (csCursor.objsLeftInBatch()) {
- resumeTokenFromDoc = csCursor.next()._id;
+ eventFromCursor = csCursor.next();
+ resumeTokenFromDoc = eventFromCursor._id;
}
getMorePBRT = csCursor.getResumeToken();
assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0);
@@ -106,15 +95,21 @@
// Test that postBatchResumeToken advances with writes to an unrelated collection. First make
// sure there is nothing left in our cursor, and obtain the latest PBRT...
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ while (eventFromCursor.fullDocument._id < (docId - 1)) {
+ assert.soon(() => csCursor.hasNext());
+ eventFromCursor = csCursor.next();
+ }
+ assert(!csCursor.hasNext());
let previousGetMorePBRT = csCursor.getResumeToken();
assert.neq(undefined, previousGetMorePBRT);
// ... then test that it advances on an insert to an unrelated collection.
assert.commandWorked(otherCollection.insert({}));
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
- getMorePBRT = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
+ assert.soon(() => {
+ assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ getMorePBRT = csCursor.getResumeToken();
+ return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0;
+ });
// Insert two documents into the collection which are of the maximum BSON object size.
const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize;
@@ -128,24 +123,25 @@
// Test that we return the correct postBatchResumeToken in the event that the batch hits the
// byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result,
// because the second result cannot fit in the batch.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
- // Verify that the postBatchResumeToken matches the last event actually added to the batch.
+ // Obtain the resume token and the PBRT from the first document.
resumeTokenFromDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
- // Now retrieve the second event and confirm that the PBRT matches its resume token.
+ // Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order.
previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
- resumeTokenFromDoc = csCursor.next()._id;
+ const resumeTokenFromSecondDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
- assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
+ assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0);
+ assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0);
+ assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0);
// Test that the PBRT is correctly updated when reading events from within a transaction.
+ csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}});
const session = db.getMongo().startSession();
const sessionDB = session.getDatabase(db.getName());
@@ -163,7 +159,7 @@
// Grab the next 2 events, which should be the first 2 events in the transaction.
previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 2);
// The clusterTime should be the same on each, but the resume token keeps advancing.
@@ -179,7 +175,7 @@
// Now get the next batch. This contains the third of the four transaction operations.
previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
// The clusterTime of this event is the same as the two events from the previous batch, but its
@@ -189,9 +185,7 @@
assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
// Because we wrote to the unrelated collection, the final event in the transaction does not
- // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
- // last event in the batch, because the unrelated event is within the same transaction and
- // therefore has the same clusterTime.
+ // appear in the batch. Confirm that the postBatchResumeToken has been set correctly.
getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
+ assert.gte(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
})();