summaryrefslogtreecommitdiff
path: root/jstests
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-27 03:50:42 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-03-01 04:44:07 +0000
commit4d06a4769bcf1c58d6c5ee524c4bb610f9e658ba (patch)
tree64b1f47448f8c22677c1fef0371ba231040aba28 /jstests
parent689bd54bb22d4ff22f0e280431e6429c2c8bea5d (diff)
downloadmongo-4d06a4769bcf1c58d6c5ee524c4bb610f9e658ba.tar.gz
SERVER-38411 Propagate postBatchResumeToken through mongoS to client
(cherry picked from commit 560a0c3a3e20924b362fc2b159c30255d62e81d2) (cherry picked from commit 027cf6e2ac4958119dc5c108518f220722cd6f97)
Diffstat (limited to 'jstests')
-rw-r--r--jstests/change_streams/report_post_batch_resume_token.js105
-rw-r--r--jstests/noPassthrough/change_streams_shell_helper_resume_token.js (renamed from jstests/change_streams/shell_helper_resume_token.js)53
-rw-r--r--jstests/noPassthrough/report_post_batch_resume_token_mongod.js114
3 files changed, 174 insertions, 98 deletions
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js
index b84cc6189f7..4c161e85a03 100644
--- a/jstests/change_streams/report_post_batch_resume_token.js
+++ b/jstests/change_streams/report_post_batch_resume_token.js
@@ -1,5 +1,6 @@
/**
- * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken.
+ * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This
+ * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams.
* @tags: [uses_transactions]
*/
(function() {
@@ -55,7 +56,7 @@
// Test that postBatchResumeToken advances with returned events. Insert one document into the
// collection and consume the resulting change stream event.
assert.commandWorked(testCollection.insert({_id: docId++}));
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert(csCursor.objsLeftInBatch() == 1);
// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal
@@ -77,28 +78,16 @@
assert.neq(undefined, initialAggPBRT);
assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0);
- // Test that postBatchResumeToken is present on non-empty initial aggregate batch.
- csCursor =
- testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}});
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
- while (csCursor.objsLeftInBatch()) {
- csCursor.next();
- }
- // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the
- // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it.
- initialAggPBRT = csCursor.getResumeToken();
- assert.neq(undefined, initialAggPBRT);
- assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
-
// Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the
// observed postBatchResumeToken advanced.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
// The postBatchResumeToken is again equal to the final token in the batch, and greater than the
// PBRT from the initial response.
+ let eventFromCursor = null;
while (csCursor.objsLeftInBatch()) {
- resumeTokenFromDoc = csCursor.next()._id;
+ eventFromCursor = csCursor.next();
+ resumeTokenFromDoc = eventFromCursor._id;
}
getMorePBRT = csCursor.getResumeToken();
assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0);
@@ -106,15 +95,21 @@
// Test that postBatchResumeToken advances with writes to an unrelated collection. First make
// sure there is nothing left in our cursor, and obtain the latest PBRT...
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ while (eventFromCursor.fullDocument._id < (docId - 1)) {
+ assert.soon(() => csCursor.hasNext());
+ eventFromCursor = csCursor.next();
+ }
+ assert(!csCursor.hasNext());
let previousGetMorePBRT = csCursor.getResumeToken();
assert.neq(undefined, previousGetMorePBRT);
// ... then test that it advances on an insert to an unrelated collection.
assert.commandWorked(otherCollection.insert({}));
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
- getMorePBRT = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
+ assert.soon(() => {
+ assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ getMorePBRT = csCursor.getResumeToken();
+ return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0;
+ });
// Insert two documents into the collection which are of the maximum BSON object size.
const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize;
@@ -128,70 +123,20 @@
// Test that we return the correct postBatchResumeToken in the event that the batch hits the
// byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result,
// because the second result cannot fit in the batch.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
- // Verify that the postBatchResumeToken matches the last event actually added to the batch.
+ // Obtain the resume token and the PBRT from the first document.
resumeTokenFromDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
- // Now retrieve the second event and confirm that the PBRT matches its resume token.
+ // Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order.
previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
- resumeTokenFromDoc = csCursor.next()._id;
- getMorePBRT = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
- assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
-
- // Test that the PBRT is correctly updated when reading events from within a transaction.
- const session = db.getMongo().startSession();
- const sessionDB = session.getDatabase(db.getName());
-
- const sessionColl = sessionDB[testCollection.getName()];
- const sessionOtherColl = sessionDB[otherCollection.getName()];
- session.startTransaction();
-
- // Write 3 documents to testCollection and 1 to the unrelated collection within the transaction.
- for (let i = 0; i < 3; ++i) {
- assert.commandWorked(sessionColl.insert({_id: docId++}));
- }
- assert.commandWorked(sessionOtherColl.insert({}));
- assert.commandWorked(session.commitTransaction_forTesting());
- session.endSession();
-
- // Grab the next 2 events, which should be the first 2 events in the transaction.
- previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), 2);
-
- // The clusterTime should be the same on each, but the resume token keeps advancing.
- const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next();
- const txnClusterTime = txnEvent1.clusterTime;
- assert.eq(txnEvent2.clusterTime, txnClusterTime);
- assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0);
- assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0);
-
- // The PBRT of the first transaction batch is equal to the last document's resumeToken.
- getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0);
-
- // Now get the next batch. This contains the third of the four transaction operations.
- previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), 1);
-
- // The clusterTime of this event is the same as the two events from the previous batch, but its
- // resume token is greater than the previous PBRT.
- const txnEvent3 = csCursor.next();
- assert.eq(txnEvent3.clusterTime, txnClusterTime);
- assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
-
- // Because we wrote to the unrelated collection, the final event in the transaction does not
- // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
- // last event in the batch, because the unrelated event is within the same transaction and
- // therefore has the same clusterTime.
+ const resumeTokenFromSecondDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
+ assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0);
+ assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0);
+ assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0);
})();
diff --git a/jstests/change_streams/shell_helper_resume_token.js b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js
index 8cfd2328451..b10f85a7702 100644
--- a/jstests/change_streams/shell_helper_resume_token.js
+++ b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js
@@ -2,18 +2,30 @@
* Tests that the cursor.getResumeToken() shell helper behaves as expected, tracking the resume
* token with each document and returning the postBatchResumeToken as soon as each batch is
* exhausted.
+ * @tags: [requires_journaling]
*/
(function() {
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
- // Drop and recreate collections to assure a clean run.
+ // Create a new single-node replica set, and ensure that it can support $changeStream.
+ const rst = new ReplSetTest({nodes: 1});
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+ rst.initiate();
+
+ const db = rst.getPrimary().getDB(jsTestName());
const collName = "change_stream_shell_helper_resume_token";
const csCollection = assertDropAndRecreateCollection(db, collName);
const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
const batchSize = 5;
+ let docId = 0;
// Test that getResumeToken() returns the postBatchResumeToken when an empty batch is received.
const csCursor = csCollection.watch([], {cursor: {batchSize: batchSize}});
@@ -31,49 +43,54 @@
// Insert 9 documents into the collection, followed by a write to the unrelated collection.
for (let i = 0; i < 9; ++i) {
- assert.commandWorked(csCollection.insert({_id: i}));
+ assert.commandWorked(csCollection.insert({_id: ++docId}));
}
assert.commandWorked(otherCollection.insert({}));
- // Retrieve the first batch of 5 events.
+ // Retrieve the first batch of events from the cursor.
assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
// We have not yet iterated any of the events. Verify that the resume token is unchanged.
assert.docEq(curResumeToken, csCursor.getResumeToken());
// For each event in the first batch, the resume token should match the document's _id.
+ let currentDoc = null;
while (csCursor.objsLeftInBatch()) {
- const nextDoc = csCursor.next();
+ currentDoc = csCursor.next();
prevResumeToken = curResumeToken;
curResumeToken = csCursor.getResumeToken();
- assert.docEq(curResumeToken, nextDoc._id);
+ assert.docEq(curResumeToken, currentDoc._id);
assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
}
- // Retrieve the second batch. This should be 4 documents.
+ // Retrieve the second batch of events from the cursor.
assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), 4);
// We haven't pulled any events out of the cursor yet, so the resumeToken should be unchanged.
assert.docEq(curResumeToken, csCursor.getResumeToken());
- // For each of the first 3 events, the resume token should match the document's _id.
- while (csCursor.objsLeftInBatch() > 1) {
- const nextDoc = csCursor.next();
+ // For all but the final event, the resume token should match the document's _id.
+ while ((currentDoc = csCursor.next()).fullDocument._id < docId) {
+ assert(csCursor.hasNext());
prevResumeToken = curResumeToken;
curResumeToken = csCursor.getResumeToken();
- assert.docEq(curResumeToken, nextDoc._id);
+ assert.docEq(curResumeToken, currentDoc._id);
assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
}
+ // When we reach here, 'currentDoc' is the final document in the batch, but we have not yet
+ // updated the resume token. Assert that this resume token sorts before currentDoc's.
+ prevResumeToken = curResumeToken;
+ assert.gt(bsonWoCompare(currentDoc._id, prevResumeToken), 0);
- // When we pull the final document out of the cursor, the resume token should become the
+ // After we have pulled the final document out of the cursor, the resume token should be the
// postBatchResumeToken rather than the document's _id. Because we inserted an item into the
// unrelated collection to push the oplog past the final event returned by the change stream,
// this will be strictly greater than the final document's _id.
- const finalDoc = csCursor.next();
- prevResumeToken = curResumeToken;
- curResumeToken = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(finalDoc._id, prevResumeToken), 0);
- assert.gt(bsonWoCompare(curResumeToken, finalDoc._id), 0);
+ assert.soon(() => {
+ curResumeToken = csCursor.getResumeToken();
+ assert(!csCursor.hasNext(), () => tojson(csCursor.next()));
+ return bsonWoCompare(curResumeToken, currentDoc._id) > 0;
+ });
+
+ rst.stopSet();
}());
diff --git a/jstests/noPassthrough/report_post_batch_resume_token_mongod.js b/jstests/noPassthrough/report_post_batch_resume_token_mongod.js
new file mode 100644
index 00000000000..cf7dd55b1d0
--- /dev/null
+++ b/jstests/noPassthrough/report_post_batch_resume_token_mongod.js
@@ -0,0 +1,114 @@
+/**
+ * Tests mongoD-specific semantics of postBatchResumeToken for $changeStream aggregations.
+ * @tags: [uses_transactions]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+ // Create a new single-node replica set, and ensure that it can support $changeStream.
+ const rst = new ReplSetTest({nodes: 1});
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+ rst.initiate();
+
+ const db = rst.getPrimary().getDB(jsTestName());
+ const collName = "report_post_batch_resume_token";
+ const testCollection = assertDropAndRecreateCollection(db, collName);
+ const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
+ const adminDB = db.getSiblingDB("admin");
+
+ let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate.
+ const batchSize = 2;
+
+ // Start watching the test collection in order to capture a resume token.
+ let csCursor = testCollection.watch();
+
+ // Write some documents to the test collection and get the resume token from the first doc.
+ for (let i = 0; i < 5; ++i) {
+ assert.commandWorked(testCollection.insert({_id: docId++}));
+ }
+ const resumeTokenFromDoc = csCursor.next()._id;
+ csCursor.close();
+
+ // Test that postBatchResumeToken is present on a non-empty initial aggregate batch.
+ assert.soon(() => {
+ csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc});
+ csCursor.close(); // We don't need any results after the initial batch.
+ return csCursor.objsLeftInBatch();
+ });
+ while (csCursor.objsLeftInBatch()) {
+ csCursor.next();
+ }
+ let initialAggPBRT = csCursor.getResumeToken();
+ assert.neq(undefined, initialAggPBRT);
+
+ // Test that the PBRT is correctly updated when reading events from within a transaction.
+ const session = db.getMongo().startSession();
+ const sessionDB = session.getDatabase(db.getName());
+
+ const sessionColl = sessionDB[testCollection.getName()];
+ const sessionOtherColl = sessionDB[otherCollection.getName()];
+ session.startTransaction();
+
+ // Open a stream of batchSize:2 and grab the PBRT of the initial batch.
+ csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}});
+ initialAggPBRT = csCursor.getResumeToken();
+ assert.eq(csCursor.objsLeftInBatch(), 0);
+
+ // Write 3 documents to testCollection and 1 to the unrelated collection within the transaction.
+ for (let i = 0; i < 3; ++i) {
+ assert.commandWorked(sessionColl.insert({_id: docId++}));
+ }
+ assert.commandWorked(sessionOtherColl.insert({}));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ session.endSession();
+
+ // Grab the next 2 events, which should be the first 2 events in the transaction.
+ assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.eq(csCursor.objsLeftInBatch(), 2);
+
+ // The clusterTime should be the same on each, but the resume token keeps advancing.
+ const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next();
+ const txnClusterTime = txnEvent1.clusterTime;
+ assert.eq(txnEvent2.clusterTime, txnClusterTime);
+ assert.gt(bsonWoCompare(txnEvent1._id, initialAggPBRT), 0);
+ assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0);
+
+ // The PBRT of the first transaction batch is equal to the last document's resumeToken.
+ let getMorePBRT = csCursor.getResumeToken();
+ assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0);
+
+ // Save this PBRT so that we can test resuming from it later on.
+ const resumePBRT = getMorePBRT;
+
+ // Now get the next batch. This contains the third of the four transaction operations.
+ let previousGetMorePBRT = getMorePBRT;
+ assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.eq(csCursor.objsLeftInBatch(), 1);
+
+ // The clusterTime of this event is the same as the two events from the previous batch, but its
+ // resume token is greater than the previous PBRT.
+ const txnEvent3 = csCursor.next();
+ assert.eq(txnEvent3.clusterTime, txnClusterTime);
+ assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
+
+ // Because we wrote to the unrelated collection, the final event in the transaction does not
+ // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
+ // last event in the batch, because the unrelated event is within the same transaction and
+ // therefore has the same clusterTime.
+ getMorePBRT = csCursor.getResumeToken();
+ assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
+
+ // Confirm that resuming from the PBRT of the first batch gives us the third transaction write.
+ csCursor = testCollection.watch([], {resumeAfter: resumePBRT});
+ assert.docEq(csCursor.next(), txnEvent3);
+ assert(!csCursor.hasNext());
+
+ rst.stopSet();
+})();