diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-27 03:50:42 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-03-01 04:44:07 +0000 |
commit | 4d06a4769bcf1c58d6c5ee524c4bb610f9e658ba (patch) | |
tree | 64b1f47448f8c22677c1fef0371ba231040aba28 | |
parent | 689bd54bb22d4ff22f0e280431e6429c2c8bea5d (diff) | |
download | mongo-4d06a4769bcf1c58d6c5ee524c4bb610f9e658ba.tar.gz |
SERVER-38411 Propagate postBatchResumeToken through mongoS to client
(cherry picked from commit 560a0c3a3e20924b362fc2b159c30255d62e81d2)
(cherry picked from commit 027cf6e2ac4958119dc5c108518f220722cd6f97)
32 files changed, 459 insertions, 213 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index 5624cd6120a..300f165d425 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -7,9 +7,6 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml index c49fb06bb7f..c462f686ce5 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -7,9 +7,6 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected to # work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml index 6a4a7bce0bd..6794c2bea6d 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml @@ -9,9 +9,6 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml index 614e05994d2..e8f978f4907 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -13,9 +13,6 @@ selector: - jstests/change_streams/whole_db.js - jstests/change_streams/whole_db_metadata_notifications.js - jstests/change_streams/whole_db_resumability.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml index 821bfd5e4dd..4dda4a931d0 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml @@ -7,9 +7,6 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml index 810b2e1f6bb..1df3df2cdc3 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml @@ -9,9 +9,6 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml index 2f72f137e5a..41a04dc6f71 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml @@ -10,9 +10,6 @@ selector: - jstests/change_streams/whole_cluster.js - jstests/change_streams/whole_cluster_metadata_notifications.js - jstests/change_streams/whole_cluster_resumability.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml index 32a10fd6051..21ec2902dab 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml @@ -13,9 +13,6 @@ selector: - jstests/change_streams/include_cluster_time.js # Only relevant for single-collection change streams. - jstests/change_streams/metadata_notifications.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml index a13f0fb6350..e3040118ca2 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml @@ -11,10 +11,6 @@ selector: - jstests/change_streams/report_latest_observed_oplog_timestamp.js # Only relevant for single-collection change streams. - jstests/change_streams/metadata_notifications.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js - exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml index 4ef0d266ac0..d0017dd42dd 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml @@ -15,9 +15,6 @@ selector: - jstests/change_streams/whole_db.js - jstests/change_streams/whole_db_metadata_notifications.js - jstests/change_streams/whole_db_resumability.js - # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/shell_helper_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js index b84cc6189f7..4c161e85a03 100644 --- a/jstests/change_streams/report_post_batch_resume_token.js +++ b/jstests/change_streams/report_post_batch_resume_token.js @@ -1,5 +1,6 @@ /** - * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. + * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This + * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams. * @tags: [uses_transactions] */ (function() { @@ -55,7 +56,7 @@ // Test that postBatchResumeToken advances with returned events. Insert one document into the // collection and consume the resulting change stream event. assert.commandWorked(testCollection.insert({_id: docId++})); - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert(csCursor.objsLeftInBatch() == 1); // Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal @@ -77,28 +78,16 @@ assert.neq(undefined, initialAggPBRT); assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0); - // Test that postBatchResumeToken is present on non-empty initial aggregate batch. - csCursor = - testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}}); - assert.eq(csCursor.objsLeftInBatch(), batchSize); - while (csCursor.objsLeftInBatch()) { - csCursor.next(); - } - // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the - // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it. - initialAggPBRT = csCursor.getResumeToken(); - assert.neq(undefined, initialAggPBRT); - assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); - // Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the // observed postBatchResumeToken advanced. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), batchSize); + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. // The postBatchResumeToken is again equal to the final token in the batch, and greater than the // PBRT from the initial response. + let eventFromCursor = null; while (csCursor.objsLeftInBatch()) { - resumeTokenFromDoc = csCursor.next()._id; + eventFromCursor = csCursor.next(); + resumeTokenFromDoc = eventFromCursor._id; } getMorePBRT = csCursor.getResumeToken(); assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0); @@ -106,15 +95,21 @@ // Test that postBatchResumeToken advances with writes to an unrelated collection. First make // sure there is nothing left in our cursor, and obtain the latest PBRT... - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + while (eventFromCursor.fullDocument._id < (docId - 1)) { + assert.soon(() => csCursor.hasNext()); + eventFromCursor = csCursor.next(); + } + assert(!csCursor.hasNext()); let previousGetMorePBRT = csCursor.getResumeToken(); assert.neq(undefined, previousGetMorePBRT); // ... then test that it advances on an insert to an unrelated collection. assert.commandWorked(otherCollection.insert({})); - assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. - getMorePBRT = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); + assert.soon(() => { + assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + getMorePBRT = csCursor.getResumeToken(); + return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0; + }); // Insert two documents into the collection which are of the maximum BSON object size. const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize; @@ -128,70 +123,20 @@ // Test that we return the correct postBatchResumeToken in the event that the batch hits the // byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result, // because the second result cannot fit in the batch. - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); - // Verify that the postBatchResumeToken matches the last event actually added to the batch. + // Obtain the resume token and the PBRT from the first document. resumeTokenFromDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); - // Now retrieve the second event and confirm that the PBRT matches its resume token. + // Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order. previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert.eq(csCursor.objsLeftInBatch(), 1); - resumeTokenFromDoc = csCursor.next()._id; - getMorePBRT = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); - assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); - - // Test that the PBRT is correctly updated when reading events from within a transaction. - const session = db.getMongo().startSession(); - const sessionDB = session.getDatabase(db.getName()); - - const sessionColl = sessionDB[testCollection.getName()]; - const sessionOtherColl = sessionDB[otherCollection.getName()]; - session.startTransaction(); - - // Write 3 documents to testCollection and 1 to the unrelated collection within the transaction. - for (let i = 0; i < 3; ++i) { - assert.commandWorked(sessionColl.insert({_id: docId++})); - } - assert.commandWorked(sessionOtherColl.insert({})); - assert.commandWorked(session.commitTransaction_forTesting()); - session.endSession(); - - // Grab the next 2 events, which should be the first 2 events in the transaction. - previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), 2); - - // The clusterTime should be the same on each, but the resume token keeps advancing. - const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next(); - const txnClusterTime = txnEvent1.clusterTime; - assert.eq(txnEvent2.clusterTime, txnClusterTime); - assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0); - assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0); - - // The PBRT of the first transaction batch is equal to the last document's resumeToken. - getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0); - - // Now get the next batch. This contains the third of the four transaction operations. - previousGetMorePBRT = getMorePBRT; - assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), 1); - - // The clusterTime of this event is the same as the two events from the previous batch, but its - // resume token is greater than the previous PBRT. - const txnEvent3 = csCursor.next(); - assert.eq(txnEvent3.clusterTime, txnClusterTime); - assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); - - // Because we wrote to the unrelated collection, the final event in the transaction does not - // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the - // last event in the batch, because the unrelated event is within the same transaction and - // therefore has the same clusterTime. + const resumeTokenFromSecondDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); - assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0); + assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0); + assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0); + assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0); })(); diff --git a/jstests/change_streams/shell_helper_resume_token.js b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js index 8cfd2328451..b10f85a7702 100644 --- a/jstests/change_streams/shell_helper_resume_token.js +++ b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js @@ -2,18 +2,30 @@ * Tests that the cursor.getResumeToken() shell helper behaves as expected, tracking the resume * token with each document and returning the postBatchResumeToken as soon as each batch is * exhausted. + * @tags: [requires_journaling] */ (function() { "use strict"; load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. - // Drop and recreate collections to assure a clean run. + // Create a new single-node replica set, and ensure that it can support $changeStream. + const rst = new ReplSetTest({nodes: 1}); + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + rst.initiate(); + + const db = rst.getPrimary().getDB(jsTestName()); const collName = "change_stream_shell_helper_resume_token"; const csCollection = assertDropAndRecreateCollection(db, collName); const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName); const batchSize = 5; + let docId = 0; // Test that getResumeToken() returns the postBatchResumeToken when an empty batch is received. const csCursor = csCollection.watch([], {cursor: {batchSize: batchSize}}); @@ -31,49 +43,54 @@ // Insert 9 documents into the collection, followed by a write to the unrelated collection. for (let i = 0; i < 9; ++i) { - assert.commandWorked(csCollection.insert({_id: i})); + assert.commandWorked(csCollection.insert({_id: ++docId})); } assert.commandWorked(otherCollection.insert({})); - // Retrieve the first batch of 5 events. + // Retrieve the first batch of events from the cursor. assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), batchSize); // We have not yet iterated any of the events. Verify that the resume token is unchanged. assert.docEq(curResumeToken, csCursor.getResumeToken()); // For each event in the first batch, the resume token should match the document's _id. + let currentDoc = null; while (csCursor.objsLeftInBatch()) { - const nextDoc = csCursor.next(); + currentDoc = csCursor.next(); prevResumeToken = curResumeToken; curResumeToken = csCursor.getResumeToken(); - assert.docEq(curResumeToken, nextDoc._id); + assert.docEq(curResumeToken, currentDoc._id); assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0); } - // Retrieve the second batch. This should be 4 documents. + // Retrieve the second batch of events from the cursor. assert(csCursor.hasNext()); // Causes a getMore to be dispatched. - assert.eq(csCursor.objsLeftInBatch(), 4); // We haven't pulled any events out of the cursor yet, so the resumeToken should be unchanged. assert.docEq(curResumeToken, csCursor.getResumeToken()); - // For each of the first 3 events, the resume token should match the document's _id. - while (csCursor.objsLeftInBatch() > 1) { - const nextDoc = csCursor.next(); + // For all but the final event, the resume token should match the document's _id. + while ((currentDoc = csCursor.next()).fullDocument._id < docId) { + assert(csCursor.hasNext()); prevResumeToken = curResumeToken; curResumeToken = csCursor.getResumeToken(); - assert.docEq(curResumeToken, nextDoc._id); + assert.docEq(curResumeToken, currentDoc._id); assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0); } + // When we reach here, 'currentDoc' is the final document in the batch, but we have not yet + // updated the resume token. Assert that this resume token sorts before currentDoc's. + prevResumeToken = curResumeToken; + assert.gt(bsonWoCompare(currentDoc._id, prevResumeToken), 0); - // When we pull the final document out of the cursor, the resume token should become the + // After we have pulled the final document out of the cursor, the resume token should be the // postBatchResumeToken rather than the document's _id. Because we inserted an item into the // unrelated collection to push the oplog past the final event returned by the change stream, // this will be strictly greater than the final document's _id. - const finalDoc = csCursor.next(); - prevResumeToken = curResumeToken; - curResumeToken = csCursor.getResumeToken(); - assert.gt(bsonWoCompare(finalDoc._id, prevResumeToken), 0); - assert.gt(bsonWoCompare(curResumeToken, finalDoc._id), 0); + assert.soon(() => { + curResumeToken = csCursor.getResumeToken(); + assert(!csCursor.hasNext(), () => tojson(csCursor.next())); + return bsonWoCompare(curResumeToken, currentDoc._id) > 0; + }); + + rst.stopSet(); }()); diff --git a/jstests/noPassthrough/report_post_batch_resume_token_mongod.js b/jstests/noPassthrough/report_post_batch_resume_token_mongod.js new file mode 100644 index 00000000000..cf7dd55b1d0 --- /dev/null +++ b/jstests/noPassthrough/report_post_batch_resume_token_mongod.js @@ -0,0 +1,114 @@ +/** + * Tests mongoD-specific semantics of postBatchResumeToken for $changeStream aggregations. + * @tags: [uses_transactions] + */ +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + + // Create a new single-node replica set, and ensure that it can support $changeStream. + const rst = new ReplSetTest({nodes: 1}); + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + rst.initiate(); + + const db = rst.getPrimary().getDB(jsTestName()); + const collName = "report_post_batch_resume_token"; + const testCollection = assertDropAndRecreateCollection(db, collName); + const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName); + const adminDB = db.getSiblingDB("admin"); + + let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate. + const batchSize = 2; + + // Start watching the test collection in order to capture a resume token. + let csCursor = testCollection.watch(); + + // Write some documents to the test collection and get the resume token from the first doc. + for (let i = 0; i < 5; ++i) { + assert.commandWorked(testCollection.insert({_id: docId++})); + } + const resumeTokenFromDoc = csCursor.next()._id; + csCursor.close(); + + // Test that postBatchResumeToken is present on a non-empty initial aggregate batch. + assert.soon(() => { + csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc}); + csCursor.close(); // We don't need any results after the initial batch. + return csCursor.objsLeftInBatch(); + }); + while (csCursor.objsLeftInBatch()) { + csCursor.next(); + } + let initialAggPBRT = csCursor.getResumeToken(); + assert.neq(undefined, initialAggPBRT); + + // Test that the PBRT is correctly updated when reading events from within a transaction. + const session = db.getMongo().startSession(); + const sessionDB = session.getDatabase(db.getName()); + + const sessionColl = sessionDB[testCollection.getName()]; + const sessionOtherColl = sessionDB[otherCollection.getName()]; + session.startTransaction(); + + // Open a stream of batchSize:2 and grab the PBRT of the initial batch. + csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}}); + initialAggPBRT = csCursor.getResumeToken(); + assert.eq(csCursor.objsLeftInBatch(), 0); + + // Write 3 documents to testCollection and 1 to the unrelated collection within the transaction. + for (let i = 0; i < 3; ++i) { + assert.commandWorked(sessionColl.insert({_id: docId++})); + } + assert.commandWorked(sessionOtherColl.insert({})); + assert.commandWorked(session.commitTransaction_forTesting()); + session.endSession(); + + // Grab the next 2 events, which should be the first 2 events in the transaction. + assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.eq(csCursor.objsLeftInBatch(), 2); + + // The clusterTime should be the same on each, but the resume token keeps advancing. + const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next(); + const txnClusterTime = txnEvent1.clusterTime; + assert.eq(txnEvent2.clusterTime, txnClusterTime); + assert.gt(bsonWoCompare(txnEvent1._id, initialAggPBRT), 0); + assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0); + + // The PBRT of the first transaction batch is equal to the last document's resumeToken. + let getMorePBRT = csCursor.getResumeToken(); + assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0); + + // Save this PBRT so that we can test resuming from it later on. + const resumePBRT = getMorePBRT; + + // Now get the next batch. This contains the third of the four transaction operations. + let previousGetMorePBRT = getMorePBRT; + assert(csCursor.hasNext()); // Causes a getMore to be dispatched. + assert.eq(csCursor.objsLeftInBatch(), 1); + + // The clusterTime of this event is the same as the two events from the previous batch, but its + // resume token is greater than the previous PBRT. + const txnEvent3 = csCursor.next(); + assert.eq(txnEvent3.clusterTime, txnClusterTime); + assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); + + // Because we wrote to the unrelated collection, the final event in the transaction does not + // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the + // last event in the batch, because the unrelated event is within the same transaction and + // therefore has the same clusterTime. + getMorePBRT = csCursor.getResumeToken(); + assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0); + + // Confirm that resuming from the PBRT of the first batch gives us the third transaction write. + csCursor = testCollection.watch([], {resumeAfter: resumePBRT}); + assert.docEq(csCursor.next(), txnEvent3); + assert(!csCursor.hasNext()); + + rst.stopSet(); +})(); diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index e8bb575418a..e323c01ac29 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -67,6 +67,14 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { return Document::fromBsonWithMetaData(*next.getResult()); } +BSONObj DocumentSourceMergeCursors::getHighWaterMark() { + if (!_arm) { + _arm.emplace(pExpCtx->opCtx, _executor, std::move(*_armParams)); + _armParams = boost::none; + } + return _arm->getHighWaterMark(); +} + Value DocumentSourceMergeCursors::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { invariant(!_arm); diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 5ca80f98146..52772d66765 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -97,6 +97,13 @@ public: GetNextResult getNext() final; + /** + * Returns the high water mark sort key for the given cursor, if it exists; otherwise, returns + * an empty BSONObj. Calling this method causes the underlying ARM to be populated and assumes + * ownership of the remote cursors. + */ + BSONObj getHighWaterMark(); + protected: void doDispose() final; diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index d2307ecc354..465c588bc9d 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -233,8 +233,7 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, } batchBuilder.doneFast(); - if (_postBatchResumeToken) { - invariant(!_postBatchResumeToken->isEmpty()); + if (_postBatchResumeToken && !_postBatchResumeToken->isEmpty()) { cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken); } diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index d5a3044a8a2..f5c15b64105 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -560,6 +560,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, BSONObjBuilder cursorResponse; CursorResponseBuilder responseBuilder(true, &cursorResponse); + bool stashedResult = false; for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { ClusterQueryResult next; @@ -591,12 +592,21 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { ccc->queueResult(nextObj); + stashedResult = true; break; } + // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty. + responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); responseBuilder.append(nextObj); } + // For empty batches, or in the case where the final result was added to the batch rather than + // being stashed, we update the PBRT here to ensure that it is the most recent available. + if (!stashedResult) { + responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); + } + ccc->detachFromOperationContext(); int nShards = ccc->getNumRemotes(); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 15957cc3aba..ac76205ba10 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -93,9 +93,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, _tailableMode(params.getTailableMode() ? *params.getTailableMode() : TailableModeEnum::kNormal), _params(std::move(params)), - _mergeQueue(MergingComparator(_remotes, - _params.getSort() ? *_params.getSort() : BSONObj(), - _params.getCompareWholeSortKey())) { + _mergeQueue(MergingComparator( + _remotes, _params.getSort().value_or(BSONObj()), _params.getCompareWholeSortKey())), + _promisedMinSortKeys(PromisedMinSortKeyComparator(_params.getSort().value_or(BSONObj()))) { if (params.getTxnNumber()) { invariant(params.getSessionId()); } @@ -111,6 +111,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse()); ++remoteIndex; } + // If this is a change stream, then we expect to have already received PBRTs from every shard. + invariant(_promisedMinSortKeys.empty() || _promisedMinSortKeys.size() == _remotes.size()); + _highWaterMark = _promisedMinSortKeys.empty() ? BSONObj() : _promisedMinSortKeys.begin()->first; } AsyncResultsMerger::~AsyncResultsMerger() { @@ -176,13 +179,32 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { stdx::lock_guard<stdx::mutex> lk(_mutex); + // Create a new entry in the '_remotes' list for each new shard, and add the first cursor batch + // to its buffer. This ensures the shard's initial high water mark is respected, if it exists. for (auto&& remote : newCursors) { + const auto newIndex = _remotes.size(); _remotes.emplace_back(remote.getHostAndPort(), remote.getCursorResponse().getNSS(), remote.getCursorResponse().getCursorId()); + _addBatchToBuffer(lk, newIndex, remote.getCursorResponse()); } } +BSONObj AsyncResultsMerger::getHighWaterMark() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto minPromisedSortKey = _getMinPromisedSortKey(lk); + if (!minPromisedSortKey.isEmpty() && !_ready(lk)) { + _highWaterMark = minPromisedSortKey; + } + return _highWaterMark; +} + +BSONObj AsyncResultsMerger::_getMinPromisedSortKey(WithLock) { + // We cannot return the minimum promised sort key unless all shards have reported one. + return _promisedMinSortKeys.size() < _remotes.size() ? BSONObj() + : _promisedMinSortKeys.begin()->first; +} + bool AsyncResultsMerger::_ready(WithLock lk) { if (_lifecycleState != kAlive) { return true; @@ -221,7 +243,7 @@ bool AsyncResultsMerger::_readySorted(WithLock lk) { return true; } -bool AsyncResultsMerger::_readySortedTailable(WithLock) { +bool AsyncResultsMerger::_readySortedTailable(WithLock lk) { if (_mergeQueue.empty()) { return false; } @@ -230,19 +252,10 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) { auto smallestResult = _remotes[smallestRemote].docBuffer.front(); auto keyWeWantToReturn = extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey()); - for (const auto& remote : _remotes) { - if (!remote.promisedMinSortKey) { - // In order to merge sorted tailable cursors, we need this value to be populated. - return false; - } - if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) > - 0) { - // The key we want to return is not guaranteed to be smaller than future results from - // this remote, so we can't yet return it. - return false; - } - } - return true; + // We should always have a minPromisedSortKey from every shard in the sorted tailable case. + auto minPromisedSortKey = _getMinPromisedSortKey(lk); + invariant(!minPromisedSortKey.isEmpty()); + return compareSortKeys(keyWeWantToReturn, minPromisedSortKey, *_params.getSort()) <= 0; } bool AsyncResultsMerger::_readyUnsorted(WithLock) { @@ -302,6 +315,12 @@ ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { _mergeQueue.push(smallestRemote); } + // For sorted tailable awaitData cursors, update the high water mark to the document's sort key. + if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) { + _highWaterMark = + extractSortKey(*front.getResult(), _params.getCompareWholeSortKey()).getOwned(); + } + return front; } @@ -481,10 +500,12 @@ StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse( return std::move(cursorResponse); } -void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, - const CursorResponse& response) { +void AsyncResultsMerger::_updateRemoteMetadata(WithLock, + size_t remoteIndex, + const CursorResponse& response) { // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard. - remote->cursorId = response.getCursorId(); + auto& remote = _remotes[remoteIndex]; + remote.cursorId = response.getCursorId(); if (response.getPostBatchResumeToken()) { // We only expect to see this for change streams. invariant(_params.getSort()); @@ -497,14 +518,17 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, // The most recent minimum sort key should never be smaller than the previous promised // minimum sort key for this remote, if one exists. auto newMinSortKey = *response.getPostBatchResumeToken(); - if (auto& oldMinSortKey = remote->promisedMinSortKey) { + if (auto& oldMinSortKey = remote.promisedMinSortKey) { invariant(compareSortKeys(newMinSortKey, *oldMinSortKey, *_params.getSort()) >= 0); + invariant(_promisedMinSortKeys.size() <= _remotes.size()); + _promisedMinSortKeys.erase({*oldMinSortKey, remoteIndex}); } - remote->promisedMinSortKey = newMinSortKey; + _promisedMinSortKeys.insert({newMinSortKey, remoteIndex}); + remote.promisedMinSortKey = newMinSortKey; } else { // If we don't have a postBatchResumeToken, then we should never have an oplog timestamp. uassert(ErrorCodes::InternalErrorNotSupported, - str::stream() << "Host " << remote->shardHostAndPort + str::stream() << "Host " << remote.shardHostAndPort << " returned a cursor which has an oplog timestamp but does not " "have a postBatchResumeToken, suggesting that one or more shards" " are running an older version of MongoDB. This configuration " @@ -609,7 +633,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, size_t remoteIndex, const CursorResponse& response) { auto& remote = _remotes[remoteIndex]; - updateRemoteMetadata(&remote, response); + _updateRemoteMetadata(lk, remoteIndex, response); for (const auto& obj : response.getBatch()) { // If there's a sort, we're expecting the remote node to have given us back a sort key. if (_params.getSort()) { @@ -791,4 +815,10 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() { return nextReady(); } +bool AsyncResultsMerger::PromisedMinSortKeyComparator::operator()( + const MinSortKeyRemoteIdPair& lhs, const MinSortKeyRemoteIdPair& rhs) const { + auto sortKeyComp = compareSortKeys(lhs.first, rhs.first, _sort); + return sortKeyComp < 0 || (sortKeyComp == 0 && lhs.second < rhs.second); +} + } // namespace mongo diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index f08d55385f1..05b88156918 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -222,6 +222,14 @@ public: } /** + * For sorted tailable cursors, returns the most recent available sort key. This guarantees that + * we will never return any future results which precede this key. If no results are ready to be + * returned, this method may cause the high water mark to advance to the lowest promised sortkey + * received from the shards. Returns an empty BSONObj if no such sort key is available. + */ + BSONObj getHighWaterMark(); + + /** * Starts shutting down this ARM by canceling all pending requests and scheduling killCursors * on all of the unexhausted remotes. Returns a handle to an event that is signaled when this * ARM is safe to destroy. @@ -325,6 +333,18 @@ private: const bool _compareWholeSortKey; }; + using MinSortKeyRemoteIdPair = std::pair<BSONObj, size_t>; + + class PromisedMinSortKeyComparator { + public: + PromisedMinSortKeyComparator(BSONObj sort) : _sort(std::move(sort)) {} + + bool operator()(const MinSortKeyRemoteIdPair& lhs, const MinSortKeyRemoteIdPair& rhs) const; + + private: + BSONObj _sort; + }; + enum LifecycleState { kAlive, kKillStarted, kKillComplete }; /** @@ -413,6 +433,11 @@ private: */ bool _haveOutstandingBatchRequests(WithLock); + /** + * If a promisedMinSortKey has been obtained from all remotes, returns the lowest such key. + * Otherwise, returns an empty BSONObj. + */ + BSONObj _getMinPromisedSortKey(WithLock); /** * Schedules a getMore on any remote hosts which we need another batch from. @@ -425,9 +450,9 @@ private: void _scheduleKillCursors(WithLock, OperationContext* opCtx); /** - * Updates 'remote's metadata (e.g. the cursor id) based on information in 'response'. + * Updates the given remote's metadata (e.g. the cursor id) based on information in 'response'. */ - void updateRemoteMetadata(RemoteCursorData* remote, const CursorResponse& response); + void _updateRemoteMetadata(WithLock, size_t remoteIndex, const CursorResponse& response); OperationContext* _opCtx; executor::TaskExecutor* _executor; @@ -458,6 +483,13 @@ private: boost::optional<Milliseconds> _awaitDataTimeout; + // An ordered set of (promisedMinSortKey, remoteIndex) pairs received from the shards. The first + // element in the set will be the lowest sort key across all shards. + std::set<MinSortKeyRemoteIdPair, PromisedMinSortKeyComparator> _promisedMinSortKeys; + + // For sorted tailable cursors, records the current high-water-mark sort key. Empty otherwise. + BSONObj _highWaterMark; + // // Killing // diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 1b5d4e97d63..1e4cce51fca 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -183,6 +183,15 @@ protected: } /** + * Schedules a single cursor response to be returned by the mock network. + */ + void scheduleNetworkResponse(CursorResponse&& response) { + std::vector<CursorResponse> responses; + responses.push_back(std::move(response)); + scheduleNetworkResponses(std::move(responses)); + } + + /** * Schedules a list of raw BSON command responses to be returned by the mock network. */ void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) { @@ -1544,14 +1553,28 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { scheduleNetworkResponses(std::move(responses)); } -TEST_F(AsyncResultsMergerTest, - SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoPostBatchResumeToken) { +DEATH_TEST_F(AsyncResultsMergerTest, + SortedTailableInvariantsIfInitialBatchHasNoPostBatchResumeToken, + "Invariant failure _promisedMinSortKeys.empty() || _promisedMinSortKeys.size() == " + "_remotes.size()") { AsyncResultsMergerParams params; params.setNss(kTestNss); UUID uuid = UUID::gen(); std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + // Create one cursor whose initial response has a postBatchResumeToken. + auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); + auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); + auto firstCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 1}}, $sortKey: {'': '" + << firstDocSortKey.firstElement().String() + << "'}}"); + cursors.push_back(makeRemoteCursor( + kTestShardIds[0], + kTestShardHosts[0], + CursorResponse( + kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor))); + // Create a second cursor whose initial batch has no PBRT. cursors.push_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); params.setRemotes(std::move(cursors)); @@ -1561,57 +1584,10 @@ TEST_F(AsyncResultsMergerTest, stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); auto readyEvent = unittest::assertGet(arm->nextEvent()); - - ASSERT_FALSE(arm->ready()); - - // Schedule one response with a postBatchResumeToken in it. - auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 6)); - auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); - std::vector<CursorResponse> responses; - auto firstCursorResult = fromjson( - str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() - << "', documentKey: {_id: 1}}, $sortKey: {'': '" - << firstDocSortKey.firstElement().String() - << "'}}"); - std::vector<BSONObj> batch1{firstCursorResult}; - responses.emplace_back( - kTestNss, CursorId(123), std::move(batch1), boost::none, boost::none, pbrtFirstCursor); - scheduleNetworkResponses(std::move(responses)); - - // Still shouldn't be ready, we don't have a guarantee from each shard. - ASSERT_FALSE(arm->ready()); - - // Schedule another response from the other shard with a later postBatchResumeToken. - responses.clear(); - auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 7)); - auto secondDocSortKey = makeResumeToken(Timestamp(1, 5), uuid, BSON("_id" << 2)); - auto secondCursorResult = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: '" - << uuid.toString() + "', documentKey: {_id: 2}}, $sortKey: {'': '" - << secondDocSortKey.firstElement().String() - << "'}}"); - std::vector<BSONObj> batch2{secondCursorResult}; - responses.emplace_back( - kTestNss, CursorId(456), std::move(batch2), boost::none, boost::none, pbrtSecondCursor); - scheduleNetworkResponses(std::move(responses)); - executor()->waitForEvent(readyEvent); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(firstCursorResult, *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(secondCursorResult, *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); - - // Clean up the cursors. - responses.clear(); - std::vector<BSONObj> batch3 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses)); - responses.clear(); - std::vector<BSONObj> batch4 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses)); + // We should be dead by now. + MONGO_UNREACHABLE; } DEATH_TEST_F(AsyncResultsMergerTest, @@ -1922,6 +1898,82 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting scheduleNetworkResponses(std::move(responses)); } +TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey) { + AsyncResultsMergerParams params; + params.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + // Create three cursors with empty initial batches. Each batch has a PBRT. + auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); + cursors.push_back(makeRemoteCursor( + kTestShardIds[0], + kTestShardHosts[0], + CursorResponse(kTestNss, 123, {}, boost::none, boost::none, pbrtFirstCursor))); + auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 1)); + cursors.push_back(makeRemoteCursor( + kTestShardIds[1], + kTestShardHosts[1], + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, pbrtSecondCursor))); + auto pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 4)); + cursors.push_back(makeRemoteCursor( + kTestShardIds[2], + kTestShardHosts[2], + CursorResponse(kTestNss, 789, {}, boost::none, boost::none, pbrtThirdCursor))); + params.setRemotes(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + params.setSort(change_stream_constants::kSortSpec); + auto arm = + stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); + + // We have no results to return, so the ARM is not ready. + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // The high water mark should be the second cursor's PBRT, since it is the lowest of the three. + ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor); + + // Advance the PBRT of the second cursor. It should still be the lowest. The fixture expects + // each cursor to be updated in-order, so we keep the first and third PBRTs constant. + pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 3)); + std::vector<BSONObj> emptyBatch = {}; + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); + ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor); + ASSERT_FALSE(arm->ready()); + + // Advance the second cursor again, so that it surpasses the other two. The third cursor becomes + // the new high water mark. + pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6)); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); + ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtThirdCursor); + ASSERT_FALSE(arm->ready()); + + // Advance the third cursor such that the first cursor becomes the high water mark. + pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 7)); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); + ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtFirstCursor); + ASSERT_FALSE(arm->ready()); + + // Clean up the cursors. + std::vector<BSONObj> cleanupBatch = {}; + scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch}); + scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch}); + scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch}); +} + TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); std::vector<RemoteCursor> cursors; diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 064554438b9..0801ee969d6 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -118,6 +118,12 @@ public: virtual std::size_t getNumRemotes() const = 0; /** + * Returns the current most-recent resume token for this cursor, or an empty object if this is + * not a $changeStream cursor. + */ + virtual BSONObj getPostBatchResumeToken() const = 0; + + /** * Returns the number of result documents returned so far by this cursor via the next() method. */ virtual long long getNumReturnedSoFar() const = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 1239e1ce263..45aa31b0072 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -153,6 +153,10 @@ std::size_t ClusterClientCursorImpl::getNumRemotes() const { return _root->getNumRemotes(); } +BSONObj ClusterClientCursorImpl::getPostBatchResumeToken() const { + return _root->getPostBatchResumeToken(); +} + long long ClusterClientCursorImpl::getNumReturnedSoFar() const { return _numReturnedSoFar; } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 17a4721b8d0..be0e2390576 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -109,6 +109,8 @@ public: std::size_t getNumRemotes() const final; + BSONObj getPostBatchResumeToken() const final; + long long getNumReturnedSoFar() const final; void queueResult(const ClusterQueryResult& result) final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 72e4a5914b8..b16e10b8ef4 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -75,6 +75,10 @@ std::size_t ClusterClientCursorMock::getNumRemotes() const { MONGO_UNREACHABLE; } +BSONObj ClusterClientCursorMock::getPostBatchResumeToken() const { + MONGO_UNREACHABLE; +} + long long ClusterClientCursorMock::getNumReturnedSoFar() const { return _numReturnedSoFar; } diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 101d59e7bb5..c96209a7f88 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -74,6 +74,8 @@ public: std::size_t getNumRemotes() const final; + BSONObj getPostBatchResumeToken() const final; + long long getNumReturnedSoFar() const final; void queueResult(const ClusterQueryResult& result) final; diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 41589a9ebf0..b413eb9db13 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -154,6 +154,11 @@ std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const { return _cursor->getNumRemotes(); } +BSONObj ClusterCursorManager::PinnedCursor::getPostBatchResumeToken() const { + invariant(_cursor); + return _cursor->getPostBatchResumeToken(); +} + CursorId ClusterCursorManager::PinnedCursor::getCursorId() const { return _cursorId; } diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index 53ce2ddf345..77fb5cb6288 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -203,6 +203,11 @@ public: std::size_t getNumRemotes() const; /** + * If applicable, returns the current most-recent resume token for this cursor. + */ + BSONObj getPostBatchResumeToken() const; + + /** * Returns the cursor id for the underlying cursor, or zero if no cursor is owned. */ CursorId getCursorId() const; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 9fd84fd43c1..5ce9d4ee7a7 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -602,6 +602,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, long long batchSize = request.batchSize.value_or(0); long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar(); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; + BSONObj postBatchResumeToken; + bool stashedResult = false; while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { auto context = batch.empty() @@ -639,6 +641,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, if (!FindCommon::haveSpaceForNext( *next.getValue().getResult(), batch.size(), bytesBuffered)) { pinnedCursor.getValue().queueResult(*next.getValue().getResult()); + stashedResult = true; break; } @@ -647,6 +650,15 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, bytesBuffered += (next.getValue().getResult()->objsize() + kPerDocumentOverheadBytesUpperBound); batch.push_back(std::move(*next.getValue().getResult())); + + // Update the postBatchResumeToken. For non-$changeStream aggregations, this will be empty. + postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken(); + } + + // For empty batches, or in the case where the final result was added to the batch rather than + // being stashed, we update the PBRT here to ensure that it is the most recent available. + if (!stashedResult) { + postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken(); } pinnedCursor.getValue().setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); @@ -669,7 +681,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"); } - return CursorResponse(request.nss, idToReturn, std::move(batch), startingFrom); + return CursorResponse( + request.nss, idToReturn, std::move(batch), startingFrom, boost::none, postBatchResumeToken); } } // namespace mongo diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index dbcc156d0d5..eaa0d23bc46 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -124,6 +124,14 @@ public: } /** + * Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream; + * otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child. + */ + virtual BSONObj getPostBatchResumeToken() { + return _child ? _child->getPostBatchResumeToken() : BSONObj(); + } + + /** * Sets the current operation context to be used by the router stage. */ void reattachToOperationContext(OperationContext* opCtx) { @@ -181,7 +189,7 @@ protected: /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. */ - RouterExecStage* getChildStage() { + RouterExecStage* getChildStage() const { return _child.get(); } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 0db33eac77f..2309cec5602 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -66,6 +66,10 @@ public: */ void addNewShardCursors(std::vector<RemoteCursor>&& newShards); + BSONObj getPostBatchResumeToken() final { + return _arm.getHighWaterMark(); + } + protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index c9cd1166805..3f99e2c3547 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -47,6 +47,9 @@ RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child, _mergePipeline(std::move(mergePipeline)), _mongosOnlyPipeline(!_mergePipeline->isSplitForMerge()) { if (!_mongosOnlyPipeline) { + // Save a pointer to the child RouterExecStage before it is absorbed into the pipeline. This + // is either a merge stage, or an ancestor that can forward calls to the RouterStageMerge. + _mergeCursorsStage = child.get(); // Add an adapter to the front of the pipeline to draw results from 'child'. _routerAdapter = DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)), @@ -90,6 +93,10 @@ std::size_t RouterStagePipeline::getNumRemotes() const { return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes(); } +BSONObj RouterStagePipeline::getPostBatchResumeToken() { + return _mergeCursorsStage ? _mergeCursorsStage->getPostBatchResumeToken() : BSONObj(); +} + bool RouterStagePipeline::remotesExhausted() { return _mongosOnlyPipeline || _routerAdapter->remotesExhausted(); } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index 635d28f1505..01f6328ca26 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -55,6 +55,8 @@ public: std::size_t getNumRemotes() const final; + BSONObj getPostBatchResumeToken() final; + protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; @@ -65,6 +67,7 @@ protected: private: boost::intrusive_ptr<DocumentSourceRouterAdapter> _routerAdapter; std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline; - bool _mongosOnlyPipeline; + RouterExecStage* _mergeCursorsStage = nullptr; + bool _mongosOnlyPipeline = false; }; } // namespace mongo |