summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-27 03:50:42 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-03-01 04:44:07 +0000
commit4d06a4769bcf1c58d6c5ee524c4bb610f9e658ba (patch)
tree64b1f47448f8c22677c1fef0371ba231040aba28
parent689bd54bb22d4ff22f0e280431e6429c2c8bea5d (diff)
downloadmongo-4d06a4769bcf1c58d6c5ee524c4bb610f9e658ba.tar.gz
SERVER-38411 Propagate postBatchResumeToken through mongoS to client
(cherry picked from commit 560a0c3a3e20924b362fc2b159c30255d62e81d2) (cherry picked from commit 027cf6e2ac4958119dc5c108518f220722cd6f97)
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml3
-rw-r--r--jstests/change_streams/report_post_batch_resume_token.js105
-rw-r--r--jstests/noPassthrough/change_streams_shell_helper_resume_token.js (renamed from jstests/change_streams/shell_helper_resume_token.js)53
-rw-r--r--jstests/noPassthrough/report_post_batch_resume_token_mongod.js114
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h7
-rw-r--r--src/mongo/db/query/cursor_response.cpp3
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp10
-rw-r--r--src/mongo/s/query/async_results_merger.cpp78
-rw-r--r--src/mongo/s/query/async_results_merger.h36
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp158
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h5
-rw-r--r--src/mongo/s/query/cluster_find.cpp15
-rw-r--r--src/mongo/s/query/router_exec_stage.h10
-rw-r--r--src/mongo/s/query/router_stage_merge.h4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp7
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h5
32 files changed, 459 insertions, 213 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
index 5624cd6120a..300f165d425 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
@@ -7,9 +7,6 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index c49fb06bb7f..c462f686ce5 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -7,9 +7,6 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected to
# work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
index 6a4a7bce0bd..6794c2bea6d 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
@@ -9,9 +9,6 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
index 614e05994d2..e8f978f4907 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
@@ -13,9 +13,6 @@ selector:
- jstests/change_streams/whole_db.js
- jstests/change_streams/whole_db_metadata_notifications.js
- jstests/change_streams/whole_db_resumability.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
index 821bfd5e4dd..4dda4a931d0 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
@@ -7,9 +7,6 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
index 810b2e1f6bb..1df3df2cdc3 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
@@ -9,9 +9,6 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
index 2f72f137e5a..41a04dc6f71 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
@@ -10,9 +10,6 @@ selector:
- jstests/change_streams/whole_cluster.js
- jstests/change_streams/whole_cluster_metadata_notifications.js
- jstests/change_streams/whole_cluster_resumability.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
index 32a10fd6051..21ec2902dab 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
@@ -13,9 +13,6 @@ selector:
- jstests/change_streams/include_cluster_time.js
# Only relevant for single-collection change streams.
- jstests/change_streams/metadata_notifications.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
index a13f0fb6350..e3040118ca2 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
@@ -11,10 +11,6 @@ selector:
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
# Only relevant for single-collection change streams.
- jstests/change_streams/metadata_notifications.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
-
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
index 4ef0d266ac0..d0017dd42dd 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
@@ -15,9 +15,6 @@ selector:
- jstests/change_streams/whole_db.js
- jstests/change_streams/whole_db_metadata_notifications.js
- jstests/change_streams/whole_db_resumability.js
- # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/shell_helper_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js
index b84cc6189f7..4c161e85a03 100644
--- a/jstests/change_streams/report_post_batch_resume_token.js
+++ b/jstests/change_streams/report_post_batch_resume_token.js
@@ -1,5 +1,6 @@
/**
- * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken.
+ * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. This
+ * test verifies postBatchResumeToken semantics that are common to sharded and unsharded streams.
* @tags: [uses_transactions]
*/
(function() {
@@ -55,7 +56,7 @@
// Test that postBatchResumeToken advances with returned events. Insert one document into the
// collection and consume the resulting change stream event.
assert.commandWorked(testCollection.insert({_id: docId++}));
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert(csCursor.objsLeftInBatch() == 1);
// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal
@@ -77,28 +78,16 @@
assert.neq(undefined, initialAggPBRT);
assert.eq(bsonWoCompare(initialAggPBRT, resumeTokenFromDoc), 0);
- // Test that postBatchResumeToken is present on non-empty initial aggregate batch.
- csCursor =
- testCollection.watch([], {resumeAfter: resumeTokenFromDoc, cursor: {batchSize: batchSize}});
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
- while (csCursor.objsLeftInBatch()) {
- csCursor.next();
- }
- // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the
- // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it.
- initialAggPBRT = csCursor.getResumeToken();
- assert.neq(undefined, initialAggPBRT);
- assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
-
// Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the
// observed postBatchResumeToken advanced.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
// The postBatchResumeToken is again equal to the final token in the batch, and greater than the
// PBRT from the initial response.
+ let eventFromCursor = null;
while (csCursor.objsLeftInBatch()) {
- resumeTokenFromDoc = csCursor.next()._id;
+ eventFromCursor = csCursor.next();
+ resumeTokenFromDoc = eventFromCursor._id;
}
getMorePBRT = csCursor.getResumeToken();
assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0);
@@ -106,15 +95,21 @@
// Test that postBatchResumeToken advances with writes to an unrelated collection. First make
// sure there is nothing left in our cursor, and obtain the latest PBRT...
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ while (eventFromCursor.fullDocument._id < (docId - 1)) {
+ assert.soon(() => csCursor.hasNext());
+ eventFromCursor = csCursor.next();
+ }
+ assert(!csCursor.hasNext());
let previousGetMorePBRT = csCursor.getResumeToken();
assert.neq(undefined, previousGetMorePBRT);
// ... then test that it advances on an insert to an unrelated collection.
assert.commandWorked(otherCollection.insert({}));
- assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
- getMorePBRT = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
+ assert.soon(() => {
+ assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ getMorePBRT = csCursor.getResumeToken();
+ return bsonWoCompare(getMorePBRT, previousGetMorePBRT) > 0;
+ });
// Insert two documents into the collection which are of the maximum BSON object size.
const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize;
@@ -128,70 +123,20 @@
// Test that we return the correct postBatchResumeToken in the event that the batch hits the
// byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result,
// because the second result cannot fit in the batch.
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
- // Verify that the postBatchResumeToken matches the last event actually added to the batch.
+ // Obtain the resume token and the PBRT from the first document.
resumeTokenFromDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
- // Now retrieve the second event and confirm that the PBRT matches its resume token.
+ // Now retrieve the second event and confirm that the PBRTs and resume tokens are in-order.
previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert.eq(csCursor.objsLeftInBatch(), 1);
- resumeTokenFromDoc = csCursor.next()._id;
- getMorePBRT = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
- assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
-
- // Test that the PBRT is correctly updated when reading events from within a transaction.
- const session = db.getMongo().startSession();
- const sessionDB = session.getDatabase(db.getName());
-
- const sessionColl = sessionDB[testCollection.getName()];
- const sessionOtherColl = sessionDB[otherCollection.getName()];
- session.startTransaction();
-
- // Write 3 documents to testCollection and 1 to the unrelated collection within the transaction.
- for (let i = 0; i < 3; ++i) {
- assert.commandWorked(sessionColl.insert({_id: docId++}));
- }
- assert.commandWorked(sessionOtherColl.insert({}));
- assert.commandWorked(session.commitTransaction_forTesting());
- session.endSession();
-
- // Grab the next 2 events, which should be the first 2 events in the transaction.
- previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), 2);
-
- // The clusterTime should be the same on each, but the resume token keeps advancing.
- const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next();
- const txnClusterTime = txnEvent1.clusterTime;
- assert.eq(txnEvent2.clusterTime, txnClusterTime);
- assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0);
- assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0);
-
- // The PBRT of the first transaction batch is equal to the last document's resumeToken.
- getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0);
-
- // Now get the next batch. This contains the third of the four transaction operations.
- previousGetMorePBRT = getMorePBRT;
- assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), 1);
-
- // The clusterTime of this event is the same as the two events from the previous batch, but its
- // resume token is greater than the previous PBRT.
- const txnEvent3 = csCursor.next();
- assert.eq(txnEvent3.clusterTime, txnClusterTime);
- assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
-
- // Because we wrote to the unrelated collection, the final event in the transaction does not
- // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
- // last event in the batch, because the unrelated event is within the same transaction and
- // therefore has the same clusterTime.
+ const resumeTokenFromSecondDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
- assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
+ assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0);
+ assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0);
+ assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0);
})();
diff --git a/jstests/change_streams/shell_helper_resume_token.js b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js
index 8cfd2328451..b10f85a7702 100644
--- a/jstests/change_streams/shell_helper_resume_token.js
+++ b/jstests/noPassthrough/change_streams_shell_helper_resume_token.js
@@ -2,18 +2,30 @@
* Tests that the cursor.getResumeToken() shell helper behaves as expected, tracking the resume
* token with each document and returning the postBatchResumeToken as soon as each batch is
* exhausted.
+ * @tags: [requires_journaling]
*/
(function() {
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
- // Drop and recreate collections to assure a clean run.
+ // Create a new single-node replica set, and ensure that it can support $changeStream.
+ const rst = new ReplSetTest({nodes: 1});
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+ rst.initiate();
+
+ const db = rst.getPrimary().getDB(jsTestName());
const collName = "change_stream_shell_helper_resume_token";
const csCollection = assertDropAndRecreateCollection(db, collName);
const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
const batchSize = 5;
+ let docId = 0;
// Test that getResumeToken() returns the postBatchResumeToken when an empty batch is received.
const csCursor = csCollection.watch([], {cursor: {batchSize: batchSize}});
@@ -31,49 +43,54 @@
// Insert 9 documents into the collection, followed by a write to the unrelated collection.
for (let i = 0; i < 9; ++i) {
- assert.commandWorked(csCollection.insert({_id: i}));
+ assert.commandWorked(csCollection.insert({_id: ++docId}));
}
assert.commandWorked(otherCollection.insert({}));
- // Retrieve the first batch of 5 events.
+ // Retrieve the first batch of events from the cursor.
assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), batchSize);
// We have not yet iterated any of the events. Verify that the resume token is unchanged.
assert.docEq(curResumeToken, csCursor.getResumeToken());
// For each event in the first batch, the resume token should match the document's _id.
+ let currentDoc = null;
while (csCursor.objsLeftInBatch()) {
- const nextDoc = csCursor.next();
+ currentDoc = csCursor.next();
prevResumeToken = curResumeToken;
curResumeToken = csCursor.getResumeToken();
- assert.docEq(curResumeToken, nextDoc._id);
+ assert.docEq(curResumeToken, currentDoc._id);
assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
}
- // Retrieve the second batch. This should be 4 documents.
+ // Retrieve the second batch of events from the cursor.
assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
- assert.eq(csCursor.objsLeftInBatch(), 4);
// We haven't pulled any events out of the cursor yet, so the resumeToken should be unchanged.
assert.docEq(curResumeToken, csCursor.getResumeToken());
- // For each of the first 3 events, the resume token should match the document's _id.
- while (csCursor.objsLeftInBatch() > 1) {
- const nextDoc = csCursor.next();
+ // For all but the final event, the resume token should match the document's _id.
+ while ((currentDoc = csCursor.next()).fullDocument._id < docId) {
+ assert(csCursor.hasNext());
prevResumeToken = curResumeToken;
curResumeToken = csCursor.getResumeToken();
- assert.docEq(curResumeToken, nextDoc._id);
+ assert.docEq(curResumeToken, currentDoc._id);
assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
}
+ // When we reach here, 'currentDoc' is the final document in the batch, but we have not yet
+ // updated the resume token. Assert that this resume token sorts before currentDoc's.
+ prevResumeToken = curResumeToken;
+ assert.gt(bsonWoCompare(currentDoc._id, prevResumeToken), 0);
- // When we pull the final document out of the cursor, the resume token should become the
+ // After we have pulled the final document out of the cursor, the resume token should be the
// postBatchResumeToken rather than the document's _id. Because we inserted an item into the
// unrelated collection to push the oplog past the final event returned by the change stream,
// this will be strictly greater than the final document's _id.
- const finalDoc = csCursor.next();
- prevResumeToken = curResumeToken;
- curResumeToken = csCursor.getResumeToken();
- assert.gt(bsonWoCompare(finalDoc._id, prevResumeToken), 0);
- assert.gt(bsonWoCompare(curResumeToken, finalDoc._id), 0);
+ assert.soon(() => {
+ curResumeToken = csCursor.getResumeToken();
+ assert(!csCursor.hasNext(), () => tojson(csCursor.next()));
+ return bsonWoCompare(curResumeToken, currentDoc._id) > 0;
+ });
+
+ rst.stopSet();
}());
diff --git a/jstests/noPassthrough/report_post_batch_resume_token_mongod.js b/jstests/noPassthrough/report_post_batch_resume_token_mongod.js
new file mode 100644
index 00000000000..cf7dd55b1d0
--- /dev/null
+++ b/jstests/noPassthrough/report_post_batch_resume_token_mongod.js
@@ -0,0 +1,114 @@
+/**
+ * Tests mongoD-specific semantics of postBatchResumeToken for $changeStream aggregations.
+ * @tags: [uses_transactions]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+ // Create a new single-node replica set, and ensure that it can support $changeStream.
+ const rst = new ReplSetTest({nodes: 1});
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+ rst.initiate();
+
+ const db = rst.getPrimary().getDB(jsTestName());
+ const collName = "report_post_batch_resume_token";
+ const testCollection = assertDropAndRecreateCollection(db, collName);
+ const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
+ const adminDB = db.getSiblingDB("admin");
+
+ let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate.
+ const batchSize = 2;
+
+ // Start watching the test collection in order to capture a resume token.
+ let csCursor = testCollection.watch();
+
+ // Write some documents to the test collection and get the resume token from the first doc.
+ for (let i = 0; i < 5; ++i) {
+ assert.commandWorked(testCollection.insert({_id: docId++}));
+ }
+ const resumeTokenFromDoc = csCursor.next()._id;
+ csCursor.close();
+
+ // Test that postBatchResumeToken is present on a non-empty initial aggregate batch.
+ assert.soon(() => {
+ csCursor = testCollection.watch([], {resumeAfter: resumeTokenFromDoc});
+ csCursor.close(); // We don't need any results after the initial batch.
+ return csCursor.objsLeftInBatch();
+ });
+ while (csCursor.objsLeftInBatch()) {
+ csCursor.next();
+ }
+ let initialAggPBRT = csCursor.getResumeToken();
+ assert.neq(undefined, initialAggPBRT);
+
+ // Test that the PBRT is correctly updated when reading events from within a transaction.
+ const session = db.getMongo().startSession();
+ const sessionDB = session.getDatabase(db.getName());
+
+ const sessionColl = sessionDB[testCollection.getName()];
+ const sessionOtherColl = sessionDB[otherCollection.getName()];
+ session.startTransaction();
+
+ // Open a stream of batchSize:2 and grab the PBRT of the initial batch.
+ csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}});
+ initialAggPBRT = csCursor.getResumeToken();
+ assert.eq(csCursor.objsLeftInBatch(), 0);
+
+ // Write 3 documents to testCollection and 1 to the unrelated collection within the transaction.
+ for (let i = 0; i < 3; ++i) {
+ assert.commandWorked(sessionColl.insert({_id: docId++}));
+ }
+ assert.commandWorked(sessionOtherColl.insert({}));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ session.endSession();
+
+ // Grab the next 2 events, which should be the first 2 events in the transaction.
+ assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.eq(csCursor.objsLeftInBatch(), 2);
+
+ // The clusterTime should be the same on each, but the resume token keeps advancing.
+ const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next();
+ const txnClusterTime = txnEvent1.clusterTime;
+ assert.eq(txnEvent2.clusterTime, txnClusterTime);
+ assert.gt(bsonWoCompare(txnEvent1._id, initialAggPBRT), 0);
+ assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0);
+
+ // The PBRT of the first transaction batch is equal to the last document's resumeToken.
+ let getMorePBRT = csCursor.getResumeToken();
+ assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0);
+
+ // Save this PBRT so that we can test resuming from it later on.
+ const resumePBRT = getMorePBRT;
+
+ // Now get the next batch. This contains the third of the four transaction operations.
+ let previousGetMorePBRT = getMorePBRT;
+ assert(csCursor.hasNext()); // Causes a getMore to be dispatched.
+ assert.eq(csCursor.objsLeftInBatch(), 1);
+
+ // The clusterTime of this event is the same as the two events from the previous batch, but its
+ // resume token is greater than the previous PBRT.
+ const txnEvent3 = csCursor.next();
+ assert.eq(txnEvent3.clusterTime, txnClusterTime);
+ assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
+
+ // Because we wrote to the unrelated collection, the final event in the transaction does not
+ // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
+ // last event in the batch, because the unrelated event is within the same transaction and
+ // therefore has the same clusterTime.
+ getMorePBRT = csCursor.getResumeToken();
+ assert.eq(bsonWoCompare(getMorePBRT, txnEvent3._id), 0);
+
+ // Confirm that resuming from the PBRT of the first batch gives us the third transaction write.
+ csCursor = testCollection.watch([], {resumeAfter: resumePBRT});
+ assert.docEq(csCursor.next(), txnEvent3);
+ assert(!csCursor.hasNext());
+
+ rst.stopSet();
+})();
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index e8bb575418a..e323c01ac29 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -67,6 +67,14 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
return Document::fromBsonWithMetaData(*next.getResult());
}
+BSONObj DocumentSourceMergeCursors::getHighWaterMark() {
+ if (!_arm) {
+ _arm.emplace(pExpCtx->opCtx, _executor, std::move(*_armParams));
+ _armParams = boost::none;
+ }
+ return _arm->getHighWaterMark();
+}
+
Value DocumentSourceMergeCursors::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
invariant(!_arm);
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 5ca80f98146..52772d66765 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -97,6 +97,13 @@ public:
GetNextResult getNext() final;
+ /**
+ * Returns the high water mark sort key for the given cursor, if it exists; otherwise, returns
+ * an empty BSONObj. Calling this method causes the underlying ARM to be populated and assumes
+ * ownership of the remote cursors.
+ */
+ BSONObj getHighWaterMark();
+
protected:
void doDispose() final;
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index d2307ecc354..465c588bc9d 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -233,8 +233,7 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
}
batchBuilder.doneFast();
- if (_postBatchResumeToken) {
- invariant(!_postBatchResumeToken->isEmpty());
+ if (_postBatchResumeToken && !_postBatchResumeToken->isEmpty()) {
cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken);
}
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index d5a3044a8a2..f5c15b64105 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -560,6 +560,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
BSONObjBuilder cursorResponse;
CursorResponseBuilder responseBuilder(true, &cursorResponse);
+ bool stashedResult = false;
for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
ClusterQueryResult next;
@@ -591,12 +592,21 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
ccc->queueResult(nextObj);
+ stashedResult = true;
break;
}
+ // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty.
+ responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
responseBuilder.append(nextObj);
}
+ // For empty batches, or in the case where the final result was added to the batch rather than
+ // being stashed, we update the PBRT here to ensure that it is the most recent available.
+ if (!stashedResult) {
+ responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
+ }
+
ccc->detachFromOperationContext();
int nShards = ccc->getNumRemotes();
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 15957cc3aba..ac76205ba10 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -93,9 +93,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
_tailableMode(params.getTailableMode() ? *params.getTailableMode()
: TailableModeEnum::kNormal),
_params(std::move(params)),
- _mergeQueue(MergingComparator(_remotes,
- _params.getSort() ? *_params.getSort() : BSONObj(),
- _params.getCompareWholeSortKey())) {
+ _mergeQueue(MergingComparator(
+ _remotes, _params.getSort().value_or(BSONObj()), _params.getCompareWholeSortKey())),
+ _promisedMinSortKeys(PromisedMinSortKeyComparator(_params.getSort().value_or(BSONObj()))) {
if (params.getTxnNumber()) {
invariant(params.getSessionId());
}
@@ -111,6 +111,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
_addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
++remoteIndex;
}
+ // If this is a change stream, then we expect to have already received PBRTs from every shard.
+ invariant(_promisedMinSortKeys.empty() || _promisedMinSortKeys.size() == _remotes.size());
+ _highWaterMark = _promisedMinSortKeys.empty() ? BSONObj() : _promisedMinSortKeys.begin()->first;
}
AsyncResultsMerger::~AsyncResultsMerger() {
@@ -176,13 +179,32 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Create a new entry in the '_remotes' list for each new shard, and add the first cursor batch
+ // to its buffer. This ensures the shard's initial high water mark is respected, if it exists.
for (auto&& remote : newCursors) {
+ const auto newIndex = _remotes.size();
_remotes.emplace_back(remote.getHostAndPort(),
remote.getCursorResponse().getNSS(),
remote.getCursorResponse().getCursorId());
+ _addBatchToBuffer(lk, newIndex, remote.getCursorResponse());
}
}
+BSONObj AsyncResultsMerger::getHighWaterMark() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto minPromisedSortKey = _getMinPromisedSortKey(lk);
+ if (!minPromisedSortKey.isEmpty() && !_ready(lk)) {
+ _highWaterMark = minPromisedSortKey;
+ }
+ return _highWaterMark;
+}
+
+BSONObj AsyncResultsMerger::_getMinPromisedSortKey(WithLock) {
+ // We cannot return the minimum promised sort key unless all shards have reported one.
+ return _promisedMinSortKeys.size() < _remotes.size() ? BSONObj()
+ : _promisedMinSortKeys.begin()->first;
+}
+
bool AsyncResultsMerger::_ready(WithLock lk) {
if (_lifecycleState != kAlive) {
return true;
@@ -221,7 +243,7 @@ bool AsyncResultsMerger::_readySorted(WithLock lk) {
return true;
}
-bool AsyncResultsMerger::_readySortedTailable(WithLock) {
+bool AsyncResultsMerger::_readySortedTailable(WithLock lk) {
if (_mergeQueue.empty()) {
return false;
}
@@ -230,19 +252,10 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) {
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
auto keyWeWantToReturn =
extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey());
- for (const auto& remote : _remotes) {
- if (!remote.promisedMinSortKey) {
- // In order to merge sorted tailable cursors, we need this value to be populated.
- return false;
- }
- if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) >
- 0) {
- // The key we want to return is not guaranteed to be smaller than future results from
- // this remote, so we can't yet return it.
- return false;
- }
- }
- return true;
+ // We should always have a minPromisedSortKey from every shard in the sorted tailable case.
+ auto minPromisedSortKey = _getMinPromisedSortKey(lk);
+ invariant(!minPromisedSortKey.isEmpty());
+ return compareSortKeys(keyWeWantToReturn, minPromisedSortKey, *_params.getSort()) <= 0;
}
bool AsyncResultsMerger::_readyUnsorted(WithLock) {
@@ -302,6 +315,12 @@ ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
_mergeQueue.push(smallestRemote);
}
+ // For sorted tailable awaitData cursors, update the high water mark to the document's sort key.
+ if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ _highWaterMark =
+ extractSortKey(*front.getResult(), _params.getCompareWholeSortKey()).getOwned();
+ }
+
return front;
}
@@ -481,10 +500,12 @@ StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse(
return std::move(cursorResponse);
}
-void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
- const CursorResponse& response) {
+void AsyncResultsMerger::_updateRemoteMetadata(WithLock,
+ size_t remoteIndex,
+ const CursorResponse& response) {
// Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard.
- remote->cursorId = response.getCursorId();
+ auto& remote = _remotes[remoteIndex];
+ remote.cursorId = response.getCursorId();
if (response.getPostBatchResumeToken()) {
// We only expect to see this for change streams.
invariant(_params.getSort());
@@ -497,14 +518,17 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
// The most recent minimum sort key should never be smaller than the previous promised
// minimum sort key for this remote, if one exists.
auto newMinSortKey = *response.getPostBatchResumeToken();
- if (auto& oldMinSortKey = remote->promisedMinSortKey) {
+ if (auto& oldMinSortKey = remote.promisedMinSortKey) {
invariant(compareSortKeys(newMinSortKey, *oldMinSortKey, *_params.getSort()) >= 0);
+ invariant(_promisedMinSortKeys.size() <= _remotes.size());
+ _promisedMinSortKeys.erase({*oldMinSortKey, remoteIndex});
}
- remote->promisedMinSortKey = newMinSortKey;
+ _promisedMinSortKeys.insert({newMinSortKey, remoteIndex});
+ remote.promisedMinSortKey = newMinSortKey;
} else {
// If we don't have a postBatchResumeToken, then we should never have an oplog timestamp.
uassert(ErrorCodes::InternalErrorNotSupported,
- str::stream() << "Host " << remote->shardHostAndPort
+ str::stream() << "Host " << remote.shardHostAndPort
<< " returned a cursor which has an oplog timestamp but does not "
"have a postBatchResumeToken, suggesting that one or more shards"
" are running an older version of MongoDB. This configuration "
@@ -609,7 +633,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
size_t remoteIndex,
const CursorResponse& response) {
auto& remote = _remotes[remoteIndex];
- updateRemoteMetadata(&remote, response);
+ _updateRemoteMetadata(lk, remoteIndex, response);
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
if (_params.getSort()) {
@@ -791,4 +815,10 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() {
return nextReady();
}
+bool AsyncResultsMerger::PromisedMinSortKeyComparator::operator()(
+ const MinSortKeyRemoteIdPair& lhs, const MinSortKeyRemoteIdPair& rhs) const {
+ auto sortKeyComp = compareSortKeys(lhs.first, rhs.first, _sort);
+ return sortKeyComp < 0 || (sortKeyComp == 0 && lhs.second < rhs.second);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index f08d55385f1..05b88156918 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -222,6 +222,14 @@ public:
}
/**
+ * For sorted tailable cursors, returns the most recent available sort key. This guarantees that
+ * we will never return any future results which precede this key. If no results are ready to be
+ * returned, this method may cause the high water mark to advance to the lowest promised sortkey
+ * received from the shards. Returns an empty BSONObj if no such sort key is available.
+ */
+ BSONObj getHighWaterMark();
+
+ /**
* Starts shutting down this ARM by canceling all pending requests and scheduling killCursors
* on all of the unexhausted remotes. Returns a handle to an event that is signaled when this
* ARM is safe to destroy.
@@ -325,6 +333,18 @@ private:
const bool _compareWholeSortKey;
};
+ using MinSortKeyRemoteIdPair = std::pair<BSONObj, size_t>;
+
+ class PromisedMinSortKeyComparator {
+ public:
+ PromisedMinSortKeyComparator(BSONObj sort) : _sort(std::move(sort)) {}
+
+ bool operator()(const MinSortKeyRemoteIdPair& lhs, const MinSortKeyRemoteIdPair& rhs) const;
+
+ private:
+ BSONObj _sort;
+ };
+
enum LifecycleState { kAlive, kKillStarted, kKillComplete };
/**
@@ -413,6 +433,11 @@ private:
*/
bool _haveOutstandingBatchRequests(WithLock);
+ /**
+ * If a promisedMinSortKey has been obtained from all remotes, returns the lowest such key.
+ * Otherwise, returns an empty BSONObj.
+ */
+ BSONObj _getMinPromisedSortKey(WithLock);
/**
* Schedules a getMore on any remote hosts which we need another batch from.
@@ -425,9 +450,9 @@ private:
void _scheduleKillCursors(WithLock, OperationContext* opCtx);
/**
- * Updates 'remote's metadata (e.g. the cursor id) based on information in 'response'.
+ * Updates the given remote's metadata (e.g. the cursor id) based on information in 'response'.
*/
- void updateRemoteMetadata(RemoteCursorData* remote, const CursorResponse& response);
+ void _updateRemoteMetadata(WithLock, size_t remoteIndex, const CursorResponse& response);
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
@@ -458,6 +483,13 @@ private:
boost::optional<Milliseconds> _awaitDataTimeout;
+ // An ordered set of (promisedMinSortKey, remoteIndex) pairs received from the shards. The first
+ // element in the set will be the lowest sort key across all shards.
+ std::set<MinSortKeyRemoteIdPair, PromisedMinSortKeyComparator> _promisedMinSortKeys;
+
+ // For sorted tailable cursors, records the current high-water-mark sort key. Empty otherwise.
+ BSONObj _highWaterMark;
+
//
// Killing
//
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 1b5d4e97d63..1e4cce51fca 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -183,6 +183,15 @@ protected:
}
/**
+ * Schedules a single cursor response to be returned by the mock network.
+ */
+ void scheduleNetworkResponse(CursorResponse&& response) {
+ std::vector<CursorResponse> responses;
+ responses.push_back(std::move(response));
+ scheduleNetworkResponses(std::move(responses));
+ }
+
+ /**
* Schedules a list of raw BSON command responses to be returned by the mock network.
*/
void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) {
@@ -1544,14 +1553,28 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
scheduleNetworkResponses(std::move(responses));
}
-TEST_F(AsyncResultsMergerTest,
- SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoPostBatchResumeToken) {
+DEATH_TEST_F(AsyncResultsMergerTest,
+ SortedTailableInvariantsIfInitialBatchHasNoPostBatchResumeToken,
+ "Invariant failure _promisedMinSortKeys.empty() || _promisedMinSortKeys.size() == "
+ "_remotes.size()") {
AsyncResultsMergerParams params;
params.setNss(kTestNss);
UUID uuid = UUID::gen();
std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ // Create one cursor whose initial response has a postBatchResumeToken.
+ auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
+ auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
+ auto firstCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 1}}, $sortKey: {'': '"
+ << firstDocSortKey.firstElement().String()
+ << "'}}");
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[0],
+ kTestShardHosts[0],
+ CursorResponse(
+ kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor)));
+ // Create a second cursor whose initial batch has no PBRT.
cursors.push_back(
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
params.setRemotes(std::move(cursors));
@@ -1561,57 +1584,10 @@ TEST_F(AsyncResultsMergerTest,
stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
auto readyEvent = unittest::assertGet(arm->nextEvent());
-
- ASSERT_FALSE(arm->ready());
-
- // Schedule one response with a postBatchResumeToken in it.
- auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 6));
- auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
- std::vector<CursorResponse> responses;
- auto firstCursorResult = fromjson(
- str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
- << "', documentKey: {_id: 1}}, $sortKey: {'': '"
- << firstDocSortKey.firstElement().String()
- << "'}}");
- std::vector<BSONObj> batch1{firstCursorResult};
- responses.emplace_back(
- kTestNss, CursorId(123), std::move(batch1), boost::none, boost::none, pbrtFirstCursor);
- scheduleNetworkResponses(std::move(responses));
-
- // Still shouldn't be ready, we don't have a guarantee from each shard.
- ASSERT_FALSE(arm->ready());
-
- // Schedule another response from the other shard with a later postBatchResumeToken.
- responses.clear();
- auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 7));
- auto secondDocSortKey = makeResumeToken(Timestamp(1, 5), uuid, BSON("_id" << 2));
- auto secondCursorResult =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: '"
- << uuid.toString() + "', documentKey: {_id: 2}}, $sortKey: {'': '"
- << secondDocSortKey.firstElement().String()
- << "'}}");
- std::vector<BSONObj> batch2{secondCursorResult};
- responses.emplace_back(
- kTestNss, CursorId(456), std::move(batch2), boost::none, boost::none, pbrtSecondCursor);
- scheduleNetworkResponses(std::move(responses));
- executor()->waitForEvent(readyEvent);
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(firstCursorResult, *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(secondCursorResult, *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
-
- // Clean up the cursors.
- responses.clear();
- std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses));
- responses.clear();
- std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses));
+ // We should be dead by now.
+ MONGO_UNREACHABLE;
}
DEATH_TEST_F(AsyncResultsMergerTest,
@@ -1922,6 +1898,82 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
scheduleNetworkResponses(std::move(responses));
}
+TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey) {
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ // Create three cursors with empty initial batches. Each batch has a PBRT.
+ auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[0],
+ kTestShardHosts[0],
+ CursorResponse(kTestNss, 123, {}, boost::none, boost::none, pbrtFirstCursor)));
+ auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 1));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[1],
+ kTestShardHosts[1],
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, pbrtSecondCursor)));
+ auto pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 4));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[2],
+ kTestShardHosts[2],
+ CursorResponse(kTestNss, 789, {}, boost::none, boost::none, pbrtThirdCursor)));
+ params.setRemotes(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ params.setSort(change_stream_constants::kSortSpec);
+ auto arm =
+ stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+
+ // We have no results to return, so the ARM is not ready.
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // The high water mark should be the second cursor's PBRT, since it is the lowest of the three.
+ ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor);
+
+ // Advance the PBRT of the second cursor. It should still be the lowest. The fixture expects
+ // each cursor to be updated in-order, so we keep the first and third PBRTs constant.
+ pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 3));
+ std::vector<BSONObj> emptyBatch = {};
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
+ ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor);
+ ASSERT_FALSE(arm->ready());
+
+ // Advance the second cursor again, so that it surpasses the other two. The third cursor becomes
+ // the new high water mark.
+ pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6));
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
+ ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtThirdCursor);
+ ASSERT_FALSE(arm->ready());
+
+ // Advance the third cursor such that the first cursor becomes the high water mark.
+ pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 7));
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
+ ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtFirstCursor);
+ ASSERT_FALSE(arm->ready());
+
+ // Clean up the cursors.
+ std::vector<BSONObj> cleanupBatch = {};
+ scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch});
+ scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch});
+ scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch});
+}
+
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
std::vector<RemoteCursor> cursors;
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 064554438b9..0801ee969d6 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -118,6 +118,12 @@ public:
virtual std::size_t getNumRemotes() const = 0;
/**
+ * Returns the current most-recent resume token for this cursor, or an empty object if this is
+ * not a $changeStream cursor.
+ */
+ virtual BSONObj getPostBatchResumeToken() const = 0;
+
+ /**
* Returns the number of result documents returned so far by this cursor via the next() method.
*/
virtual long long getNumReturnedSoFar() const = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1239e1ce263..45aa31b0072 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -153,6 +153,10 @@ std::size_t ClusterClientCursorImpl::getNumRemotes() const {
return _root->getNumRemotes();
}
+BSONObj ClusterClientCursorImpl::getPostBatchResumeToken() const {
+ return _root->getPostBatchResumeToken();
+}
+
long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 17a4721b8d0..be0e2390576 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -109,6 +109,8 @@ public:
std::size_t getNumRemotes() const final;
+ BSONObj getPostBatchResumeToken() const final;
+
long long getNumReturnedSoFar() const final;
void queueResult(const ClusterQueryResult& result) final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 72e4a5914b8..b16e10b8ef4 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -75,6 +75,10 @@ std::size_t ClusterClientCursorMock::getNumRemotes() const {
MONGO_UNREACHABLE;
}
+BSONObj ClusterClientCursorMock::getPostBatchResumeToken() const {
+ MONGO_UNREACHABLE;
+}
+
long long ClusterClientCursorMock::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 101d59e7bb5..c96209a7f88 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -74,6 +74,8 @@ public:
std::size_t getNumRemotes() const final;
+ BSONObj getPostBatchResumeToken() const final;
+
long long getNumReturnedSoFar() const final;
void queueResult(const ClusterQueryResult& result) final;
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 41589a9ebf0..b413eb9db13 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -154,6 +154,11 @@ std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const {
return _cursor->getNumRemotes();
}
+BSONObj ClusterCursorManager::PinnedCursor::getPostBatchResumeToken() const {
+ invariant(_cursor);
+ return _cursor->getPostBatchResumeToken();
+}
+
CursorId ClusterCursorManager::PinnedCursor::getCursorId() const {
return _cursorId;
}
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index 53ce2ddf345..77fb5cb6288 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -203,6 +203,11 @@ public:
std::size_t getNumRemotes() const;
/**
+ * If applicable, returns the current most-recent resume token for this cursor.
+ */
+ BSONObj getPostBatchResumeToken() const;
+
+ /**
* Returns the cursor id for the underlying cursor, or zero if no cursor is owned.
*/
CursorId getCursorId() const;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 9fd84fd43c1..5ce9d4ee7a7 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -602,6 +602,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
long long batchSize = request.batchSize.value_or(0);
long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar();
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
+ BSONObj postBatchResumeToken;
+ bool stashedResult = false;
while (!FindCommon::enoughForGetMore(batchSize, batch.size())) {
auto context = batch.empty()
@@ -639,6 +641,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
if (!FindCommon::haveSpaceForNext(
*next.getValue().getResult(), batch.size(), bytesBuffered)) {
pinnedCursor.getValue().queueResult(*next.getValue().getResult());
+ stashedResult = true;
break;
}
@@ -647,6 +650,15 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
bytesBuffered +=
(next.getValue().getResult()->objsize() + kPerDocumentOverheadBytesUpperBound);
batch.push_back(std::move(*next.getValue().getResult()));
+
+ // Update the postBatchResumeToken. For non-$changeStream aggregations, this will be empty.
+ postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken();
+ }
+
+ // For empty batches, or in the case where the final result was added to the batch rather than
+ // being stashed, we update the PBRT here to ensure that it is the most recent available.
+ if (!stashedResult) {
+ postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken();
}
pinnedCursor.getValue().setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
@@ -669,7 +681,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
"waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch");
}
- return CursorResponse(request.nss, idToReturn, std::move(batch), startingFrom);
+ return CursorResponse(
+ request.nss, idToReturn, std::move(batch), startingFrom, boost::none, postBatchResumeToken);
}
} // namespace mongo
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index dbcc156d0d5..eaa0d23bc46 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -124,6 +124,14 @@ public:
}
/**
+ * Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream;
+ * otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child.
+ */
+ virtual BSONObj getPostBatchResumeToken() {
+ return _child ? _child->getPostBatchResumeToken() : BSONObj();
+ }
+
+ /**
* Sets the current operation context to be used by the router stage.
*/
void reattachToOperationContext(OperationContext* opCtx) {
@@ -181,7 +189,7 @@ protected:
/**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
*/
- RouterExecStage* getChildStage() {
+ RouterExecStage* getChildStage() const {
return _child.get();
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 0db33eac77f..2309cec5602 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -66,6 +66,10 @@ public:
*/
void addNewShardCursors(std::vector<RemoteCursor>&& newShards);
+ BSONObj getPostBatchResumeToken() final {
+ return _arm.getHighWaterMark();
+ }
+
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index c9cd1166805..3f99e2c3547 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -47,6 +47,9 @@ RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
_mergePipeline(std::move(mergePipeline)),
_mongosOnlyPipeline(!_mergePipeline->isSplitForMerge()) {
if (!_mongosOnlyPipeline) {
+ // Save a pointer to the child RouterExecStage before it is absorbed into the pipeline. This
+ // is either a merge stage, or an ancestor that can forward calls to the RouterStageMerge.
+ _mergeCursorsStage = child.get();
// Add an adapter to the front of the pipeline to draw results from 'child'.
_routerAdapter =
DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)),
@@ -90,6 +93,10 @@ std::size_t RouterStagePipeline::getNumRemotes() const {
return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes();
}
+BSONObj RouterStagePipeline::getPostBatchResumeToken() {
+ return _mergeCursorsStage ? _mergeCursorsStage->getPostBatchResumeToken() : BSONObj();
+}
+
bool RouterStagePipeline::remotesExhausted() {
return _mongosOnlyPipeline || _routerAdapter->remotesExhausted();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index 635d28f1505..01f6328ca26 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -55,6 +55,8 @@ public:
std::size_t getNumRemotes() const final;
+ BSONObj getPostBatchResumeToken() final;
+
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
@@ -65,6 +67,7 @@ protected:
private:
boost::intrusive_ptr<DocumentSourceRouterAdapter> _routerAdapter;
std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline;
- bool _mongosOnlyPipeline;
+ RouterExecStage* _mergeCursorsStage = nullptr;
+ bool _mongosOnlyPipeline = false;
};
} // namespace mongo