diff options
Diffstat (limited to 'jstests/change_streams/report_post_batch_resume_token.js')
-rw-r--r-- | jstests/change_streams/report_post_batch_resume_token.js | 66 |
1 files changed, 30 insertions, 36 deletions
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js index b84cc6189f7..1055288a9f5 100644 --- a/jstests/change_streams/report_post_batch_resume_token.js +++ b/jstests/change_streams/report_post_batch_resume_token.js @@ -1,5 +1,6 @@ /** - * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. + * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This + * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams. * @tags: [uses_transactions] */ (function() { @@ -55,7 +56,7 @@ // Test that postBatchResumeToken advances with returned events. Insert one document into the // collection and consume the resulting change stream event. assert.commandWorked(testCollection.insert({_id: docId++})); - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert(csCursor.objsLeftInBatch() == 1); // Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal @@ -77,28 +78,16 @@ assert.neq(undefined, initialAggPBRT); assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0); - // Test that postBatchResumeToken is present on non-empty initial aggregate batch. - csCursor = - testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}}); - assert.eq(csCursor.objsLeftInBatch(), batchSize); - while (csCursor.objsLeftInBatch()) { - csCursor.next(); - } - // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the - // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it. - initialAggPBRT = csCursor.getResumeToken(); - assert.neq(undefined, initialAggPBRT); - assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); - // Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the // observed postBatchResumeToken advanced. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), batchSize); + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. // The postBatchResumeToken is again equal to the final token in the batch, and greater than the // PBRT from the initial response. + let eventFromCursor = null; while (csCursor.objsLeftInBatch()) { - resumeTokenFromDoc = csCursor.next()._id; + eventFromCursor = csCursor.next(); + resumeTokenFromDoc = eventFromCursor._id; } getMorePBRT = csCursor.getResumeToken(); assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0); @@ -106,15 +95,21 @@ // Test that postBatchResumeToken advances with writes to an unrelated collection. First make // sure there is nothing left in our cursor, and obtain the latest PBRT... - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + while (eventFromCursor.fullDocument._id < (docId - 1)) { + assert.soon(() => csCursor.hasNext()); + eventFromCursor = csCursor.next(); + } + assert(!csCursor.hasNext()); let previousGetMorePBRT = csCursor.getResumeToken(); assert.neq(undefined, previousGetMorePBRT); // ... then test that it advances on an insert to an unrelated collection. assert.commandWorked(otherCollection.insert({})); - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. - getMorePBRT = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); + assert.soon(() => { + assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + getMorePBRT = csCursor.getResumeToken(); + return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0; + }); // Insert two documents into the collection which are of the maximum BSON object size. const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize; @@ -128,24 +123,25 @@ // Test that we return the correct postBatchResumeToken in the event that the batch hits the // byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result, // because the second result cannot fit in the batch. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); - // Verify that the postBatchResumeToken matches the last event actually added to the batch. + // Obtain the resume token and the PBRT from the first document. resumeTokenFromDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); - // Now retrieve the second event and confirm that the PBRT matches its resume token. + // Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order. previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); - resumeTokenFromDoc = csCursor.next()._id; + const resumeTokenFromSecondDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); - assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); + assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0); + assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0); + assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0); // Test that the PBRT is correctly updated when reading events from within a transaction. + csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}}); const session = db.getMongo().startSession(); const sessionDB = session.getDatabase(db.getName()); @@ -163,7 +159,7 @@ // Grab the next 2 events, which should be the first 2 events in the transaction. previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 2); // The clusterTime should be the same on each, but the resume token keeps advancing. @@ -179,7 +175,7 @@ // Now get the next batch. This contains the third of the four transaction operations. previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); // The clusterTime of this event is the same as the two events from the previous batch, but its @@ -189,9 +185,7 @@ assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); // Because we wrote to the unrelated collection, the final event in the transaction does not - // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the - // last event in the batch, because the unrelated event is within the same transaction and - // therefore has the same clusterTime. + // appear in the batch. Confirm that the postBatchResumeToken has been set correctly. getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0); + assert.gte(bsonWoCompare(getMorePBRT, txnEvent3._id), 0); })(); |