diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-27 03:50:42 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-03-01 04:44:07 +0000 |
commit | 4d06a4769bcf1c58d6c5ee524c4bb610f9e658ba (patch) | |
tree | 64b1f47448f8c22677c1fef0371ba231040aba28 /jstests | |
parent | 689bd54bb22d4ff22f0e280431e6429c2c8bea5d (diff) | |
download | mongo-4d06a4769bcf1c58d6c5ee524c4bb610f9e658ba.tar.gz |
SERVER-38411 Propagate postBatchResumeToken through mongoS to client
(cherry picked from commit 560a0c3a3e20924b362fc2b159c30255d62e81d2)
(cherry picked from commit 027cf6e2ac4958119dc5c108518f220722cd6f97)
Diffstat (limited to 'jstests')
-rw-r--r-- | jstests/change_streams/report_post_batch_resume_token.js | 105 | ||||
-rw-r--r-- | jstests/noPassthrough/change_streams_shell_helper_resume_token.js (renamed from jstests/change_streams/shell_helper_resume_token.js) | 53 | ||||
-rw-r--r-- | jstests/noPassthrough/report_post_batch_resume_token_mongod.js | 114 |
3 files changed, 174 insertions, 98 deletions
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js index b84cc6189f7..4c161e85a03 100644 --- a/jstests/change_streams/report_post_batch_resume_token.js +++ b/jstests/change_streams/report_post_batch_resume_token.js @@ -1,5 +1,6 @@ /** - * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. + * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This + * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams. * @tags: [uses_transactions] */ (function() { @@ -55,7 +56,7 @@ // Test that postBatchResumeToken advances with returned events. Insert one document into the // collection and consume the resulting change stream event. assert.commandWorked(testCollection.insert({_id: docId++})); - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert(csCursor.objsLeftInBatch() == 1); // Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal @@ -77,28 +78,16 @@ assert.neq(undefined, initialAggPBRT); assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0); - // Test that postBatchResumeToken is present on non-empty initial aggregate batch. - csCursor = - testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}}); - assert.eq(csCursor.objsLeftInBatch(), batchSize); - while (csCursor.objsLeftInBatch()) { - csCursor.next(); - } - // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the - // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it. - initialAggPBRT = csCursor.getResumeToken(); - assert.neq(undefined, initialAggPBRT); - assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); - // Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the // observed postBatchResumeToken advanced. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), batchSize); + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. // The postBatchResumeToken is again equal to the final token in the batch, and greater than the // PBRT from the initial response. + let eventFromCursor = null; while (csCursor.objsLeftInBatch()) { - resumeTokenFromDoc = csCursor.next()._id; + eventFromCursor = csCursor.next(); + resumeTokenFromDoc = eventFromCursor._id; } getMorePBRT = csCursor.getResumeToken(); assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0); @@ -106,15 +95,21 @@ // Test that postBatchResumeToken advances with writes to an unrelated collection. First make // sure there is nothing left in our cursor, and obtain the latest PBRT... - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + while (eventFromCursor.fullDocument._id < (docId - 1)) { + assert.soon(() => csCursor.hasNext()); + eventFromCursor = csCursor.next(); + } + assert(!csCursor.hasNext()); let previousGetMorePBRT = csCursor.getResumeToken(); assert.neq(undefined, previousGetMorePBRT); // ... then test that it advances on an insert to an unrelated collection. assert.commandWorked(otherCollection.insert({})); - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. - getMorePBRT = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); + assert.soon(() => { + assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + getMorePBRT = csCursor.getResumeToken(); + return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0; + }); // Insert two documents into the collection which are of the maximum BSON object size. const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize; @@ -128,70 +123,20 @@ // Test that we return the correct postBatchResumeToken in the event that the batch hits the // byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result, // because the second result cannot fit in the batch. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); - // Verify that the postBatchResumeToken matches the last event actually added to the batch. + // Obtain the resume token and the PBRT from the first document. resumeTokenFromDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); - // Now retrieve the second event and confirm that the PBRT matches its resume token. + // Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order. previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); - resumeTokenFromDoc = csCursor.next()._id; - getMorePBRT = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); - assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); - - // Test that the PBRT is correctly updated when reading events from within a transaction. - const session = db.getMongo().startSession(); - const sessionDB = session.getDatabase(db.getName()); - - const sessionColl = sessionDB[testCollection.getName()]; - const sessionOtherColl = sessionDB[otherCollection.getName()]; - session.startTransaction(); - - // Write 3 documents to testCollection and 1 to the unrelated collection within the transaction. - for (let i = 0; i < 3; ++i) { - assert.commandWorked(sessionColl.insert({_id: docId++})); - } - assert.commandWorked(sessionOtherColl.insert({})); - assert.commandWorked(session.commitTransaction_forTesting()); - session.endSession(); - - // Grab the next 2 events, which should be the first 2 events in the transaction. - previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), 2); - - // The clusterTime should be the same on each, but the resume token keeps advancing. - const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next(); - const txnClusterTime = txnEvent1.clusterTime; - assert.eq(txnEvent2.clusterTime, txnClusterTime); - assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0); - assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0); - - // The PBRT of the first transaction batch is equal to the last document's resumeToken. - getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0); - - // Now get the next batch. This contains the third of the four transaction operations. - previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), 1); - - // The clusterTime of this event is the same as the two events from the previous batch, but its - // resume token is greater than the previous PBRT. - const txnEvent3 = csCursor.next(); - assert.eq(txnEvent3.clusterTime, txnClusterTime); - assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); - - // Because we wrote to the unrelated collection, the final event in the transaction does not - // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the - // last event in the batch, because the unrelated event is within the same transaction and - // therefore has the same clusterTime. + const resumeTokenFromSecondDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0); + assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0); + assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0); + assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0); })(); diff --git a/jstests/change_streams/shell_helper_resume_token.js b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js index 8cfd2328451..b10f85a7702 100644 --- a/jstests/change_streams/shell_helper_resume_token.js +++ b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js @@ -2,18 +2,30 @@ * Tests that the cursor.getResumeToken() shell helper behaves as expected, tracking the resume * token with each document and returning the postBatchResumeToken as soon as each batch is * exhausted. + * @tags: [requires_journaling] */ (function() { "use strict"; load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. - // Drop and recreate collections to assure a clean run. + // Create a new single-node replica set, and ensure that it can support $changeStream. + const rst = new ReplSetTest({nodes: 1}); + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + rst.initiate(); + + const db = rst.getPrimary().getDB(jsTestName()); const collName = "change_stream_shell_helper_resume_token"; const csCollection = assertDropAndRecreateCollection(db, collName); const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName); const batchSize = 5; + let docId = 0; // Test that getResumeToken() returns the postBatchResumeToken when an empty batch is received. const csCursor = csCollection.watch([], {cursor: {batchSize: batchSize}}); @@ -31,49 +43,54 @@ // Insert 9 documents into the collection, followed by a write to the unrelated collection. for (let i = 0; i < 9; ++i) { - assert.commandWorked(csCollection.insert({_id: i})); + assert.commandWorked(csCollection.insert({_id: ++docId})); } assert.commandWorked(otherCollection.insert({})); - // Retrieve the first batch of 5 events. + // Retrieve the first batch of events from the cursor. assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), batchSize); // We have not yet iterated any of the events. Verify that the resume token is unchanged. assert.docEq(curResumeToken, csCursor.getResumeToken()); // For each event in the first batch, the resume token should match the document's _id. + let currentDoc = null; while (csCursor.objsLeftInBatch()) { - const nextDoc = csCursor.next(); + currentDoc = csCursor.next(); prevResumeToken = curResumeToken; curResumeToken = csCursor.getResumeToken(); - assert.docEq(curResumeToken, nextDoc._id); + assert.docEq(curResumeToken, currentDoc._id); assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0); } - // Retrieve the second batch. This should be 4 documents. + // Retrieve the second batch of events from the cursor. assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), 4); // We haven't pulled any events out of the cursor yet, so the resumeToken should be unchanged. assert.docEq(curResumeToken, csCursor.getResumeToken()); - // For each of the first 3 events, the resume token should match the document's _id. - while (csCursor.objsLeftInBatch() > 1) { - const nextDoc = csCursor.next(); + // For all but the final event, the resume token should match the document's _id. + while ((currentDoc = csCursor.next()).fullDocument._id < docId) { + assert(csCursor.hasNext()); prevResumeToken = curResumeToken; curResumeToken = csCursor.getResumeToken(); - assert.docEq(curResumeToken, nextDoc._id); + assert.docEq(curResumeToken, currentDoc._id); assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0); } + // When we reach here, 'currentDoc' is the final document in the batch, but we have not yet + // updated the resume token. Assert that this resume token sorts before currentDoc's. + prevResumeToken = curResumeToken; + assert.gt(bsonWoCompare(currentDoc._id, prevResumeToken), 0); - // When we pull the final document out of the cursor, the resume token should become the + // After we have pulled the final document out of the cursor, the resume token should be the // postBatchResumeToken rather than the document's _id. Because we inserted an item into the // unrelated collection to push the oplog past the final event returned by the change stream, // this will be strictly greater than the final document's _id. - const finalDoc = csCursor.next(); - prevResumeToken = curResumeToken; - curResumeToken = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(finalDoc._id, prevResumeToken), 0); - assert.gt(bsonWoCompare(curResumeToken, finalDoc._id), 0); + assert.soon(() => { + curResumeToken = csCursor.getResumeToken(); + assert(!csCursor.hasNext(), () => tojson(csCursor.next())); + return bsonWoCompare(curResumeToken, currentDoc._id) > 0; + }); + + rst.stopSet(); }()); diff --git a/jstests/noPassthrough/report_post_batch_resume_token_mongod.js b/jstests/noPassthrough/report_post_batch_resume_token_mongod.js new file mode 100644 index 00000000000..cf7dd55b1d0 --- /dev/null +++ b/jstests/noPassthrough/report_post_batch_resume_token_mongod.js @@ -0,0 +1,114 @@ +/** + * Tests mongoD-specific semantics of postBatchResumeToken for $changeStream aggregations. + * @tags: [uses_transactions] + */ +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + + // Create a new single-node replica set, and ensure that it can support $changeStream. + const rst = new ReplSetTest({nodes: 1}); + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + rst.initiate(); + + const db = rst.getPrimary().getDB(jsTestName()); + const collName = "report_post_batch_resume_token"; + const testCollection = assertDropAndRecreateCollection(db, collName); + const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName); + const adminDB = db.getSiblingDB("admin"); + + let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate. + const batchSize = 2; + + // Start watching the test collection in order to capture a resume token. + let csCursor = testCollection.watch(); + + // Write some documents to the test collection and get the resume token from the first doc. + for (let i = 0; i < 5; ++i) { + assert.commandWorked(testCollection.insert({_id: docId++})); + } + const resumeTokenFromDoc = csCursor.next()._id; + csCursor.close(); + + // Test that postBatchResumeToken is present on a non-empty initial aggregate batch. + assert.soon(() => { + csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc}); + csCursor.close(); // We don't need any results after the initial batch. + return csCursor.objsLeftInBatch(); + }); + while (csCursor.objsLeftInBatch()) { + csCursor.next(); + } + let initialAggPBRT = csCursor.getResumeToken(); + assert.neq(undefined, initialAggPBRT); + + // Test that the PBRT is correctly updated when reading events from within a transaction. + const session = db.getMongo().startSession(); + const sessionDB = session.getDatabase(db.getName()); + + const sessionColl = sessionDB[testCollection.getName()]; + const sessionOtherColl = sessionDB[otherCollection.getName()]; + session.startTransaction(); + + // Open a stream of batchSize:2 and grab the PBRT of the initial batch. + csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}}); + initialAggPBRT = csCursor.getResumeToken(); + assert.eq(csCursor.objsLeftInBatch(), 0); + + // Write 3 documents to testCollection and 1 to the unrelated collection within the transaction. + for (let i = 0; i < 3; ++i) { + assert.commandWorked(sessionColl.insert({_id: docId++})); + } + assert.commandWorked(sessionOtherColl.insert({})); + assert.commandWorked(session.commitTransaction_forTesting()); + session.endSession(); + + // Grab the next 2 events, which should be the first 2 events in the transaction. + assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.eq(csCursor.objsLeftInBatch(), 2); + + // The clusterTime should be the same on each, but the resume token keeps advancing. + const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next(); + const txnClusterTime = txnEvent1.clusterTime; + assert.eq(txnEvent2.clusterTime, txnClusterTime); + assert.gt(bsonWoCompare(txnEvent1._id, initialAggPBRT), 0); + assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0); + + // The PBRT of the first transaction batch is equal to the last document's resumeToken. + let getMorePBRT = csCursor.getResumeToken(); + assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0); + + // Save this PBRT so that we can test resuming from it later on. + const resumePBRT = getMorePBRT; + + // Now get the next batch. This contains the third of the four transaction operations. + let previousGetMorePBRT = getMorePBRT; + assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.eq(csCursor.objsLeftInBatch(), 1); + + // The clusterTime of this event is the same as the two events from the previous batch, but its + // resume token is greater than the previous PBRT. + const txnEvent3 = csCursor.next(); + assert.eq(txnEvent3.clusterTime, txnClusterTime); + assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); + + // Because we wrote to the unrelated collection, the final event in the transaction does not + // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the + // last event in the batch, because the unrelated event is within the same transaction and + // therefore has the same clusterTime. + getMorePBRT = csCursor.getResumeToken(); + assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0); + + // Confirm that resuming from the PBRT of the first batch gives us the third transaction write. + csCursor = testCollection.watch([], {resumeAfter: resumePBRT}); + assert.docEq(csCursor.next(), txnEvent3); + assert(!csCursor.hasNext()); + + rst.stopSet(); +})(); |