diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-27 03:50:42 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-01-09 07:16:40 +0000 |
commit | 560a0c3a3e20924b362fc2b159c30255d62e81d2 (patch) | |
tree | 18a082ebad4c1e50f988025e8a53036836175400 /jstests/change_streams | |
parent | ec104311f774165b5b77b41b78c89e4f29baaca9 (diff) | |
download | mongo-560a0c3a3e20924b362fc2b159c30255d62e81d2.tar.gz |
SERVER-38411 Propagate postBatchResumeToken through mongoS to client
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/report_post_batch_resume_token.js | 66 | ||||
-rw-r--r-- | jstests/change_streams/shell_helper_resume_token.js | 79 |
2 files changed, 30 insertions, 115 deletions
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js index b84cc6189f7..1055288a9f5 100644 --- a/jstests/change_streams/report_post_batch_resume_token.js +++ b/jstests/change_streams/report_post_batch_resume_token.js @@ -1,5 +1,6 @@ /** - * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. + * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This + * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams. * @tags: [uses_transactions] */ (function() { @@ -55,7 +56,7 @@ // Test that postBatchResumeToken advances with returned events. Insert one document into the // collection and consume the resulting change stream event. assert.commandWorked(testCollection.insert({_id: docId++})); - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert(csCursor.objsLeftInBatch() == 1); // Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal @@ -77,28 +78,16 @@ assert.neq(undefined, initialAggPBRT); assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0); - // Test that postBatchResumeToken is present on non-empty initial aggregate batch. - csCursor = - testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}}); - assert.eq(csCursor.objsLeftInBatch(), batchSize); - while (csCursor.objsLeftInBatch()) { - csCursor.next(); - } - // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the - // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it. - initialAggPBRT = csCursor.getResumeToken(); - assert.neq(undefined, initialAggPBRT); - assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); - // Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the // observed postBatchResumeToken advanced. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), batchSize); + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. // The postBatchResumeToken is again equal to the final token in the batch, and greater than the // PBRT from the initial response. + let eventFromCursor = null; while (csCursor.objsLeftInBatch()) { - resumeTokenFromDoc = csCursor.next()._id; + eventFromCursor = csCursor.next(); + resumeTokenFromDoc = eventFromCursor._id; } getMorePBRT = csCursor.getResumeToken(); assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0); @@ -106,15 +95,21 @@ // Test that postBatchResumeToken advances with writes to an unrelated collection. First make // sure there is nothing left in our cursor, and obtain the latest PBRT... - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + while (eventFromCursor.fullDocument._id < (docId - 1)) { + assert.soon(() => csCursor.hasNext()); + eventFromCursor = csCursor.next(); + } + assert(!csCursor.hasNext()); let previousGetMorePBRT = csCursor.getResumeToken(); assert.neq(undefined, previousGetMorePBRT); // ... then test that it advances on an insert to an unrelated collection. assert.commandWorked(otherCollection.insert({})); - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. - getMorePBRT = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); + assert.soon(() => { + assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + getMorePBRT = csCursor.getResumeToken(); + return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0; + }); // Insert two documents into the collection which are of the maximum BSON object size. const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize; @@ -128,24 +123,25 @@ // Test that we return the correct postBatchResumeToken in the event that the batch hits the // byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result, // because the second result cannot fit in the batch. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); - // Verify that the postBatchResumeToken matches the last event actually added to the batch. + // Obtain the resume token and the PBRT from the first document. resumeTokenFromDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); - // Now retrieve the second event and confirm that the PBRT matches its resume token. + // Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order. previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); - resumeTokenFromDoc = csCursor.next()._id; + const resumeTokenFromSecondDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); - assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); + assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0); + assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0); + assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0); // Test that the PBRT is correctly updated when reading events from within a transaction. + csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}}); const session = db.getMongo().startSession(); const sessionDB = session.getDatabase(db.getName()); @@ -163,7 +159,7 @@ // Grab the next 2 events, which should be the first 2 events in the transaction. previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 2); // The clusterTime should be the same on each, but the resume token keeps advancing. @@ -179,7 +175,7 @@ // Now get the next batch. This contains the third of the four transaction operations. previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); // The clusterTime of this event is the same as the two events from the previous batch, but its @@ -189,9 +185,7 @@ assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); // Because we wrote to the unrelated collection, the final event in the transaction does not - // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the - // last event in the batch, because the unrelated event is within the same transaction and - // therefore has the same clusterTime. + // appear in the batch. Confirm that the postBatchResumeToken has been set correctly. getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0); + assert.gte(bsonWoCompare(getMorePBRT, txnEvent3._id), 0); })(); diff --git a/jstests/change_streams/shell_helper_resume_token.js b/jstests/change_streams/shell_helper_resume_token.js deleted file mode 100644 index 8cfd2328451..00000000000 --- a/jstests/change_streams/shell_helper_resume_token.js +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Tests that the cursor.getResumeToken() shell helper behaves as expected, tracking the resume - * token with each document and returning the postBatchResumeToken as soon as each batch is - * exhausted. - */ -(function() { - "use strict"; - - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - - // Drop and recreate collections to assure a clean run. - const collName = "change_stream_shell_helper_resume_token"; - const csCollection = assertDropAndRecreateCollection(db, collName); - const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName); - - const batchSize = 5; - - // Test that getResumeToken() returns the postBatchResumeToken when an empty batch is received. - const csCursor = csCollection.watch([], {cursor: {batchSize: batchSize}}); - assert(!csCursor.hasNext()); - let curResumeToken = csCursor.getResumeToken(); - assert.neq(undefined, curResumeToken); - - // Test that advancing the oplog time updates the postBatchResumeToken, even with no results. - assert.commandWorked(otherCollection.insert({})); - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. - let prevResumeToken = curResumeToken; - curResumeToken = csCursor.getResumeToken(); - assert.neq(undefined, curResumeToken); - assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0); - - // Insert 9 documents into the collection, followed by a write to the unrelated collection. - for (let i = 0; i < 9; ++i) { - assert.commandWorked(csCollection.insert({_id: i})); - } - assert.commandWorked(otherCollection.insert({})); - - // Retrieve the first batch of 5 events. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), batchSize); - - // We have not yet iterated any of the events. Verify that the resume token is unchanged. - assert.docEq(curResumeToken, csCursor.getResumeToken()); - - // For each event in the first batch, the resume token should match the document's _id. - while (csCursor.objsLeftInBatch()) { - const nextDoc = csCursor.next(); - prevResumeToken = curResumeToken; - curResumeToken = csCursor.getResumeToken(); - assert.docEq(curResumeToken, nextDoc._id); - assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0); - } - - // Retrieve the second batch. This should be 4 documents. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), 4); - - // We haven't pulled any events out of the cursor yet, so the resumeToken should be unchanged. - assert.docEq(curResumeToken, csCursor.getResumeToken()); - - // For each of the first 3 events, the resume token should match the document's _id. - while (csCursor.objsLeftInBatch() > 1) { - const nextDoc = csCursor.next(); - prevResumeToken = curResumeToken; - curResumeToken = csCursor.getResumeToken(); - assert.docEq(curResumeToken, nextDoc._id); - assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0); - } - - // When we pull the final document out of the cursor, the resume token should become the - // postBatchResumeToken rather than the document's _id. Because we inserted an item into the - // unrelated collection to push the oplog past the final event returned by the change stream, - // this will be strictly greater than the final document's _id. - const finalDoc = csCursor.next(); - prevResumeToken = curResumeToken; - curResumeToken = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(finalDoc._id, prevResumeToken), 0); - assert.gt(bsonWoCompare(curResumeToken, finalDoc._id), 0); -}()); |