summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-24 16:56:20 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-02-28 23:23:24 +0000
commit296dfba8bd13e5c420b8ccfb51878e017eac2f5a (patch)
tree42a1bad913d0a87165a16f541eab018a95335269
parent7e86ff743d75a3cd9a76249740c9e659b5f93cf3 (diff)
downloadmongo-296dfba8bd13e5c420b8ccfb51878e017eac2f5a.tar.gz
SERVER-38408 Return postBatchResumeToken with each mongoD change stream batch
(cherry picked from commit fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c) (cherry picked from commit 7f70c1213b2cc79591ea1d11f9724d057886a0fd)
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml2
-rw-r--r--jstests/change_streams/report_post_batch_resume_token.js183
-rw-r--r--src/mongo/base/uuid_test.cpp6
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp8
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp11
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp86
-rw-r--r--src/mongo/db/exec/change_stream_proxy.h86
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp12
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h28
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp7
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp16
-rw-r--r--src/mongo/db/pipeline/resume_token.h12
-rw-r--r--src/mongo/db/query/SConscript1
-rw-r--r--src/mongo/db/query/cursor_response.cpp21
-rw-r--r--src/mongo/db/query/cursor_response.h11
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp31
-rw-r--r--src/mongo/db/query/explain.cpp9
-rw-r--r--src/mongo/db/query/plan_executor.cpp14
-rw-r--r--src/mongo/db/query/plan_executor.h8
-rw-r--r--src/mongo/db/query/stage_builder.cpp1
-rw-r--r--src/mongo/db/query/stage_types.h3
-rw-r--r--src/mongo/util/uuid.cpp4
-rw-r--r--src/mongo/util/uuid.h6
36 files changed, 580 insertions, 50 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
index 300f165d425..b74478cafc4 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
@@ -7,6 +7,8 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index c462f686ce5..24fe451f074 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -7,6 +7,8 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected to
# work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
index 6794c2bea6d..3846b1f607e 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
@@ -9,6 +9,8 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
index e8f978f4907..2598e40df77 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
@@ -13,6 +13,8 @@ selector:
- jstests/change_streams/whole_db.js
- jstests/change_streams/whole_db_metadata_notifications.js
- jstests/change_streams/whole_db_resumability.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
index 4dda4a931d0..aa5f9e7662a 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
@@ -7,6 +7,8 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
index 1df3df2cdc3..6b86253f6a8 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
@@ -9,6 +9,8 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
index 41a04dc6f71..d619cf96c97 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
@@ -10,6 +10,8 @@ selector:
- jstests/change_streams/whole_cluster.js
- jstests/change_streams/whole_cluster_metadata_notifications.js
- jstests/change_streams/whole_cluster_resumability.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
index 21ec2902dab..bbe2186a6ef 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
@@ -13,6 +13,8 @@ selector:
- jstests/change_streams/include_cluster_time.js
# Only relevant for single-collection change streams.
- jstests/change_streams/metadata_notifications.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
index 5ac70f20dcf..981ba646b39 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
@@ -11,6 +11,8 @@ selector:
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
# Only relevant for single-collection change streams.
- jstests/change_streams/metadata_notifications.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
index d0017dd42dd..6478118e1df 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
@@ -15,6 +15,8 @@ selector:
- jstests/change_streams/whole_db.js
- jstests/change_streams/whole_db_metadata_notifications.js
- jstests/change_streams/whole_db_resumability.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js
new file mode 100644
index 00000000000..2094f12246c
--- /dev/null
+++ b/jstests/change_streams/report_post_batch_resume_token.js
@@ -0,0 +1,183 @@
+/**
+ * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken.
+ * @tags: [uses_transactions]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ // Drop and recreate collections to assure a clean run.
+ const collName = "report_post_batch_resume_token";
+ const testCollection = assertDropAndRecreateCollection(db, collName);
+ const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
+ const adminDB = db.getSiblingDB("admin");
+
+ // Helper function to return the next batch given an initial aggregate command response.
+ function runNextGetMore(initialCursorResponse) {
+ const getMoreCollName = initialCursorResponse.cursor.ns.substr(
+ initialCursorResponse.cursor.ns.indexOf('.') + 1);
+ return assert.commandWorked(testCollection.runCommand({
+ getMore: initialCursorResponse.cursor.id,
+ collection: getMoreCollName,
+ batchSize: batchSize
+ }));
+ }
+
+ let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate.
+ const batchSize = 2;
+
+ // Test that postBatchResumeToken is present on empty initial aggregate batch.
+ let initialAggResponse = assert.commandWorked(testCollection.runCommand(
+ {aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {batchSize: batchSize}}));
+
+ // Examine the response from the initial agg. It should have a postBatchResumeToken (PBRT),
+ // despite the fact that the batch is empty.
+ let initialAggPBRT = initialAggResponse.cursor.postBatchResumeToken;
+ assert.neq(undefined, initialAggPBRT, tojson(initialAggResponse));
+ assert.eq(0, initialAggResponse.cursor.firstBatch.length);
+
+ // Test that postBatchResumeToken is present on empty getMore batch.
+ let getMoreResponse = runNextGetMore(initialAggResponse);
+ let getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.neq(undefined, getMorePBRT, tojson(getMoreResponse));
+ assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
+ assert.eq(0, getMoreResponse.cursor.nextBatch.length);
+
+ // Test that postBatchResumeToken advances with returned events. Insert one document into the
+ // collection and consume the resulting change stream event.
+ assert.commandWorked(testCollection.insert({_id: docId++}));
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(1, getMoreResponse.cursor.nextBatch.length);
+
+ // Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal
+ // to the resume token of the last item in the batch and greater than the initial PBRT.
+ let resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id;
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(getMorePBRT, resumeTokenFromDoc);
+ assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
+
+ // Now seed the collection with enough documents to fit in two batches.
+ for (let i = 0; i < batchSize * 2; i++) {
+ assert.commandWorked(testCollection.insert({_id: docId++}));
+ }
+
+ // Test that postBatchResumeToken is present on non-empty initial aggregate batch.
+ initialAggResponse = assert.commandWorked(testCollection.runCommand({
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromDoc}}],
+ cursor: {batchSize: batchSize}
+ }));
+ // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the
+ // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it.
+ initialAggPBRT = initialAggResponse.cursor.postBatchResumeToken;
+ assert.neq(undefined, initialAggPBRT, tojson(initialAggResponse));
+ assert.eq(batchSize, initialAggResponse.cursor.firstBatch.length);
+ assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
+
+ // Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the
+ // observed postBatchResumeToken advanced.
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(batchSize, getMoreResponse.cursor.nextBatch.length);
+
+ // The postBatchResumeToken is again equal to the final token in the batch, and greater than the
+ // PBRT from the initial response.
+ resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[batchSize - 1]._id;
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(resumeTokenFromDoc, getMorePBRT, tojson(getMoreResponse));
+ assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
+
+ // Test that postBatchResumeToken advances with writes to an unrelated collection. First make
+ // sure there is nothing left in our cursor, and obtain the latest PBRT...
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ let previousGetMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.neq(undefined, previousGetMorePBRT, tojson(getMoreResponse));
+ assert.eq(getMoreResponse.cursor.nextBatch, []);
+
+ // ... then test that it advances on an insert to an unrelated collection.
+ assert.commandWorked(otherCollection.insert({}));
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(0, getMoreResponse.cursor.nextBatch.length);
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
+
+ // Insert two documents into the collection which are of the maximum BSON object size.
+ const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize;
+ assert.gt(bsonUserSizeLimit, 0);
+ for (let i = 0; i < 2; ++i) {
+ const docToInsert = {_id: docId++, padding: ""};
+ docToInsert.padding = "a".repeat(bsonUserSizeLimit - Object.bsonsize(docToInsert));
+ assert.commandWorked(testCollection.insert(docToInsert));
+ }
+
+ // Test that we return the correct postBatchResumeToken in the event that the batch hits the
+ // byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result,
+ // because the second result cannot fit in the batch.
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(1, getMoreResponse.cursor.nextBatch.length);
+
+ // Verify that the postBatchResumeToken matches the last event actually added to the batch.
+ resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id;
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(getMorePBRT, resumeTokenFromDoc);
+
+ // Now retrieve the second event and confirm that the PBRT matches its resume token.
+ previousGetMorePBRT = getMorePBRT;
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id;
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.eq(1, getMoreResponse.cursor.nextBatch.length);
+ assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
+ assert.docEq(getMorePBRT, resumeTokenFromDoc);
+
+ // Test that the PBRT is correctly updated when reading events from within a transaction.
+ const session = db.getMongo().startSession();
+ const sessionDB = session.getDatabase(db.getName());
+
+ const sessionColl = sessionDB[testCollection.getName()];
+ const sessionOtherColl = sessionDB[otherCollection.getName()];
+ session.startTransaction();
+
+ // Write 3 documents to the test collection and 1 to the unrelated collection.
+ for (let i = 0; i < 3; ++i) {
+ assert.commandWorked(sessionColl.insert({_id: docId++}));
+ }
+ assert.commandWorked(sessionOtherColl.insert({_id: docId++}));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ session.endSession();
+
+ // Grab the next 2 events, which should be the first 2 events in the transaction.
+ previousGetMorePBRT = getMorePBRT;
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(2, getMoreResponse.cursor.nextBatch.length);
+
+ // The clusterTime should be the same on each, but the resume token keeps advancing.
+ const txnEvent1 = getMoreResponse.cursor.nextBatch[0],
+ txnEvent2 = getMoreResponse.cursor.nextBatch[1];
+ const txnClusterTime = txnEvent1.clusterTime;
+ assert.eq(txnEvent2.clusterTime, txnClusterTime);
+ assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0);
+ assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0);
+
+ // The PBRT of the first transaction batch is equal to the last document's resumeToken.
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(getMorePBRT, txnEvent2._id);
+
+ // Now get the next batch. This contains the third and final transaction operation.
+ previousGetMorePBRT = getMorePBRT;
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(1, getMoreResponse.cursor.nextBatch.length);
+
+ // The clusterTime of this event is the same as the two events from the previous batch, but its
+ // resume token is greater than the previous PBRT.
+ const txnEvent3 = getMoreResponse.cursor.nextBatch[0];
+ assert.eq(txnEvent3.clusterTime, txnClusterTime);
+ assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
+
+ // Because we wrote to the unrelated collection, the final event in the transaction does not
+ // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
+ // last event in the batch, because the unrelated event is within the same transaction and
+ // therefore has the same clusterTime.
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(getMorePBRT, txnEvent3._id);
+})();
diff --git a/src/mongo/base/uuid_test.cpp b/src/mongo/base/uuid_test.cpp
index daf43f34d5a..02e1f9fe4ed 100644
--- a/src/mongo/base/uuid_test.cpp
+++ b/src/mongo/base/uuid_test.cpp
@@ -190,5 +190,11 @@ TEST(UUIDTest, toBSONUsingBSONMacro) {
ASSERT_BSONOBJ_EQ(expectedBson, bson);
}
+TEST(UUIDTest, NilUUID) {
+ // Test that UUID::nil() returns an all-zero UUID.
+ auto nilUUID = UUID::parse("00000000-0000-0000-0000-000000000000");
+ ASSERT_EQUALS(UUID::makeDefaultForChangeStream(), unittest::assertGet(nilUUID));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index e2c297186d5..e98200213d0 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -947,6 +947,7 @@ env.Library(
'exec/and_hash.cpp',
'exec/and_sorted.cpp',
'exec/cached_plan.cpp',
+ 'exec/change_stream_proxy.cpp',
'exec/collection_scan.cpp',
'exec/count.cpp',
'exec/count_scan.cpp',
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 8366f354c06..1eb743af5b4 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -539,8 +539,9 @@ public:
// As soon as we get a result, this operation no longer waits.
awaitDataState(opCtx).shouldWaitForInserts = false;
- // Add result to output buffer.
+ // Add result to output buffer, set latestOplogTimestamp and postBatchResumeToken.
nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
+ nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken());
nextBatch->append(obj);
(*numResults)++;
}
@@ -563,9 +564,10 @@ public:
return status;
}
case PlanExecutor::IS_EOF:
- // This causes the reported latest oplog timestamp to advance even when there are
- // no results for this particular query.
+ // This causes the reported latest oplog timestamp and postBatchResumeToken to
+ // advance even when there are no results for this particular query.
nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
+ nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken());
default:
return Status::OK();
}
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index c9eb61bb7a5..7b1c6635b48 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -41,7 +41,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/exec/pipeline_proxy.h"
+#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
@@ -115,8 +115,11 @@ bool handleCursorCommand(OperationContext* opCtx,
}
if (state == PlanExecutor::IS_EOF) {
+ // Set both the latestOplogTimestamp and the postBatchResumeToken on the response.
responseBuilder.setLatestOplogTimestamp(
cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.setPostBatchResumeToken(
+ cursor->getExecutor()->getPostBatchResumeToken());
if (!cursor->isTailable()) {
// make it an obvious error to use cursor or executor after this point
cursor = nullptr;
@@ -136,7 +139,9 @@ bool handleCursorCommand(OperationContext* opCtx,
break;
}
+ // Set both the latestOplogTimestamp and the postBatchResumeToken on the response.
responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.setPostBatchResumeToken(cursor->getExecutor()->getPostBatchResumeToken());
responseBuilder.append(next);
}
@@ -508,7 +513,9 @@ Status runAggregate(OperationContext* opCtx,
// Transfer ownership of the Pipeline to the PipelineProxyStage.
unownedPipeline = pipeline.get();
auto ws = make_unique<WorkingSet>();
- auto proxy = make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get());
+ auto proxy = liteParsedPipeline.hasChangeStream()
+ ? make_unique<ChangeStreamProxyStage>(opCtx, std::move(pipeline), ws.get())
+ : make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get());
// This PlanExecutor will simply forward requests to the Pipeline, so does not need to
// yield or to be registered with any collection's CursorManager to receive invalidations.
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
new file mode 100644
index 00000000000..7780652286a
--- /dev/null
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/change_stream_proxy.h"
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/pipeline/resume_token.h"
+
+namespace mongo {
+
+const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY";
+
+ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws)
+ : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {
+ invariant(std::any_of(
+ _pipeline->getSources().begin(), _pipeline->getSources().end(), [](const auto& stage) {
+ return stage->constraints().isChangeStreamStage();
+ }));
+}
+
+boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
+ if (auto next = _pipeline->getNext()) {
+ // While we have more results to return, we track both the timestamp and the resume token of
+ // the latest event observed in the oplog, the latter via its _id field.
+ auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson());
+ _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ if (next->getField("_id").getType() == BSONType::Object) {
+ _postBatchResumeToken = next->getField("_id").getDocument().toBson();
+ }
+ return nextBSON;
+ }
+
+ // We ran out of results to return. Check whether the oplog cursor has moved forward since the
+ // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return,
+ // if the new time is higher than the last then we are guaranteed not to have already returned
+ // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token
+ // at the current clusterTime.
+ auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ if (highWaterMark > _latestOplogTimestamp) {
+ auto token = ResumeToken::makeHighWaterMarkResumeToken(highWaterMark);
+ _postBatchResumeToken =
+ token.toDocument(ResumeToken::SerializationFormat::kHexString).toBson();
+ _latestOplogTimestamp = highWaterMark;
+ }
+ return boost::none;
+}
+
+std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() {
+ std::unique_ptr<PlanStageStats> ret =
+ std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY);
+ ret->specific = std::make_unique<CollectionScanStats>();
+ return ret;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h
new file mode 100644
index 00000000000..601fe43b582
--- /dev/null
+++ b/src/mongo/db/exec/change_stream_proxy.h
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/pipeline_proxy.h"
+
+namespace mongo {
+
+/**
+ * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the
+ * serialization of change stream pipeline output from Document to BSON. In particular, it is
+ * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that
+ * are necessary for correct merging on mongoS and, in the latter case, must also be provided to
+ * mongoD clients.
+ */
+class ChangeStreamProxyStage final : public PipelineProxyStage {
+public:
+ static const char* kStageType;
+
+ /**
+ * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into
+ * the constructor will cause an invariant() to fail.
+ */
+ ChangeStreamProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws);
+
+ /**
+ * Returns an empty PlanStageStats object.
+ */
+ std::unique_ptr<PlanStageStats> getStats() final;
+
+ /**
+ * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog
+ * timestamp in the event that we need to merge on mongoS.
+ */
+ Timestamp getLatestOplogTimestamp() const {
+ return _includeMetaData ? _latestOplogTimestamp : Timestamp();
+ }
+
+ /**
+ * Passes through the most recent resume token from the proxied pipeline.
+ */
+ BSONObj getPostBatchResumeToken() const {
+ return _postBatchResumeToken;
+ }
+
+ StageType stageType() const final {
+ return STAGE_CHANGE_STREAM_PROXY;
+ }
+
+protected:
+ boost::optional<BSONObj> getNextBson() final;
+
+private:
+ Timestamp _latestOplogTimestamp;
+ BSONObj _postBatchResumeToken;
+};
+} // namespace mongo
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index 4267337ab2b..9b0119d1429 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -51,7 +51,13 @@ const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY";
PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
WorkingSet* ws)
- : PlanStage(kStageType, opCtx),
+ : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {}
+
+PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws,
+ const char* stageTypeName)
+ : PlanStage(stageTypeName, opCtx),
_pipeline(std::move(pipeline)),
_includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger
_ws(ws) {
@@ -128,10 +134,6 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() {
return boost::none;
}
-Timestamp PipelineProxyStage::getLatestOplogTimestamp() const {
- return PipelineD::getLatestOplogTimestamp(_pipeline.get());
-}
-
std::string PipelineProxyStage::getPlanSummaryStr() const {
return PipelineD::getPlanSummaryStr(_pipeline.get());
}
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index 370b2b80db6..54c4b7dc6df 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -46,12 +46,14 @@ namespace mongo {
/**
* Stage for pulling results out from an aggregation pipeline.
*/
-class PipelineProxyStage final : public PlanStage {
+class PipelineProxyStage : public PlanStage {
public:
PipelineProxyStage(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
WorkingSet* ws);
+ virtual ~PipelineProxyStage() = default;
+
PlanStage::StageState doWork(WorkingSetID* out) final;
bool isEOF() final;
@@ -63,7 +65,7 @@ public:
void doReattachToOperationContext() final;
// Returns empty PlanStageStats object
- std::unique_ptr<PlanStageStats> getStats() final;
+ std::unique_ptr<PlanStageStats> getStats() override;
// Not used.
SpecificStats* getSpecificStats() const final {
@@ -76,15 +78,10 @@ public:
MONGO_UNREACHABLE;
}
- /**
- * Pass through the last oplog timestamp from the proxied pipeline.
- */
- Timestamp getLatestOplogTimestamp() const;
-
std::string getPlanSummaryStr() const;
void getPlanSummaryStats(PlanSummaryStats* statsOut) const;
- StageType stageType() const final {
+ StageType stageType() const override {
return STAGE_PIPELINE_PROXY;
}
@@ -97,17 +94,20 @@ public:
static const char* kStageType;
protected:
- void doDispose() final;
+ PipelineProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws,
+ const char* stageTypeName);
-private:
- boost::optional<BSONObj> getNextBson();
+ virtual boost::optional<BSONObj> getNextBson();
+ void doDispose() final;
- // Things in the _stash should be returned before pulling items from _pipeline.
+ // Items in the _stash should be returned before pulling items from _pipeline.
std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
- std::vector<BSONObj> _stash;
const bool _includeMetaData;
- // Not owned by us.
+private:
+ std::vector<BSONObj> _stash;
WorkingSet* _ws;
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 6c4f9a4f25e..f1431ffa8de 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1473,17 +1473,19 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) {
TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false);
- // Drop database entry doesn't have a UUID.
+ // Drop database entry has a nil UUID.
Document expectedDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, UUID::makeDefaultForChangeStream())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ UUID::makeDefaultForChangeStream(),
+ Value(),
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
@@ -1674,7 +1676,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabaseShouldReturn
UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
// Create a resume token from only the timestamp, similar to a 'dropDatabase' entry.
- auto dropDBResumeToken = makeResumeToken(kDefaultTs);
+ auto dropDBResumeToken = makeResumeToken(kDefaultTs, UUID::makeDefaultForChangeStream());
OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false);
Document expectedDropDatabase{
@@ -1684,8 +1686,11 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabaseShouldReturn
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
};
- auto fromInvalidateResumeToken = makeResumeToken(
- kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate);
+ auto fromInvalidateResumeToken =
+ makeResumeToken(kDefaultTs,
+ UUID::makeDefaultForChangeStream(),
+ Value(),
+ ResumeTokenData::FromInvalidate::kFromInvalidate);
Document expectedInvalidate{
{DSChangeStream::kIdField, fromInvalidateResumeToken},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 6643c186ebc..9b1455242d1 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -340,13 +340,19 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
// UUID should always be present except for invalidate and dropDatabase entries. It will not be
// under FCV 3.4, so we should close the stream as invalid.
if (operationType != DocumentSourceChangeStream::kInvalidateOpType &&
- operationType != DocumentSourceChangeStream::kDropDatabaseOpType && uuid.missing()) {
- warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get "
- "downgraded after opening the stream?";
- operationType = DocumentSourceChangeStream::kInvalidateOpType;
- fullDocument = Value();
- updateDescription = Value();
- documentKey = Value();
+ operationType != DocumentSourceChangeStream::kDropDatabaseOpType) {
+ if (uuid.missing()) {
+ warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get "
+ "downgraded after opening the stream?";
+ operationType = DocumentSourceChangeStream::kInvalidateOpType;
+ fullDocument = Value();
+ updateDescription = Value();
+ documentKey = Value();
+ }
+ } else {
+ // Fill in a dummy UUID for invalidate and dropDatabase, to ensure that they sort after
+ // high-water-mark tokens. Their sorting relative to other events remains unchanged.
+ uuid = Value(UUID::makeDefaultForChangeStream());
}
// Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 49df0d6b557..8871bf07bb7 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -105,9 +105,9 @@ void DocumentSourceCursor::loadBatch() {
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
// Furthermore, if we need to return the latest oplog time (in the tailable and
- // needs-merge case), batching will result in a wrong time.
- if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
- (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
+ // awaitData case), batching will result in a wrong time.
+ if (pExpCtx->isTailableAwaitData() ||
+ awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index f7f1dcf4a7f..beb9542fcca 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -386,6 +386,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS;
}
+ auto* firstSource =
+ pipeline->getSources().empty() ? nullptr : pipeline->getSources().front().get();
+ if (firstSource && firstSource->constraints().isChangeStreamStage()) {
+ invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ }
+
if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
}
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index e3f7971b869..1c5ed4d037e 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -79,6 +79,14 @@ std::pair<Value, Value> encodeInBinDataFormat(const ResumeTokenData& data) {
: Value(BSONBinData(typeBits.getBuffer(), typeBits.getSize(), BinDataType::BinDataGeneral));
return {Value(rawBinary), typeBitsValue};
}
+
+// Helper function for makeHighWaterMarkResumeToken and isHighWaterMarkResumeToken.
+ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime) {
+ invariant(!clusterTime.isNull());
+ ResumeTokenData tokenData;
+ tokenData.clusterTime = clusterTime;
+ return tokenData;
+}
} // namespace
bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
@@ -293,4 +301,12 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) {
return ResumeToken(resumeDoc);
}
+ResumeToken ResumeToken::makeHighWaterMarkResumeToken(Timestamp clusterTime) {
+ return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime));
+}
+
+bool ResumeToken::isHighWaterMarkResumeToken(const ResumeTokenData& tokenData) {
+ return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 678a1bbea18..f1fc32a1644 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -120,6 +120,18 @@ public:
static ResumeToken parse(const Document& document);
/**
+ * Generate a high-water-mark pseudo-token for 'clusterTime', with no UUID or documentKey.
+ */
+ static ResumeToken makeHighWaterMarkResumeToken(Timestamp clusterTime);
+
+ /**
+ * Returns true if the given token data represents a valid high-water-mark resume token; that
+ * is, it does not refer to a specific operation, but instead specifies a clusterTime after
+ * which the stream should resume.
+ */
+ static bool isHighWaterMarkResumeToken(const ResumeTokenData& tokenData);
+
+ /**
* The default no-argument constructor is required by the IDL for types used as non-optional
* fields.
*/
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index bb8eb142176..486dda916b9 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -179,6 +179,7 @@ env.CppUnitTest(
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/pipeline/aggregation_request",
+ "$BUILD_DIR/mongo/db/pipeline/document_sources_idl",
'command_request_response',
]
)
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 5d497aa5f6d..d2307ecc354 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -47,6 +47,7 @@ const char kNsField[] = "ns";
const char kBatchField[] = "nextBatch";
const char kBatchFieldInitial[] = "firstBatch";
const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp";
+const char kPostBatchResumeTokenField[] = "postBatchResumeToken";
} // namespace
@@ -60,6 +61,9 @@ CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse,
void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) {
invariant(_active);
_batch.doneFast();
+ if (!_postBatchResumeToken.isEmpty()) {
+ _cursorObject.append(kPostBatchResumeTokenField, _postBatchResumeToken);
+ }
_cursorObject.append(kIdField, cursorId);
_cursorObject.append(kNsField, cursorNamespace);
_cursorObject.doneFast();
@@ -105,12 +109,14 @@ CursorResponse::CursorResponse(NamespaceString nss,
std::vector<BSONObj> batch,
boost::optional<long long> numReturnedSoFar,
boost::optional<Timestamp> latestOplogTimestamp,
+ boost::optional<BSONObj> postBatchResumeToken,
boost::optional<BSONObj> writeConcernError)
: _nss(std::move(nss)),
_cursorId(cursorId),
_batch(std::move(batch)),
_numReturnedSoFar(numReturnedSoFar),
_latestOplogTimestamp(latestOplogTimestamp),
+ _postBatchResumeToken(std::move(postBatchResumeToken)),
_writeConcernError(std::move(writeConcernError)) {}
StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdResponse) {
@@ -176,6 +182,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
doc.shareOwnershipWith(cmdResponse);
}
+ auto postBatchResumeTokenElem = cursorObj[kPostBatchResumeTokenField];
+ if (postBatchResumeTokenElem && postBatchResumeTokenElem.type() != BSONType::Object) {
+ return {ErrorCodes::BadValue,
+ str::stream() << kPostBatchResumeTokenField
+ << " format is invalid; expected Object, but found: "
+ << postBatchResumeTokenElem.type()};
+ }
+
auto latestOplogTimestampElem = cmdResponse[kInternalLatestOplogTimestampField];
if (latestOplogTimestampElem && latestOplogTimestampElem.type() != BSONType::bsonTimestamp) {
return {
@@ -199,6 +213,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
boost::none,
latestOplogTimestampElem ? latestOplogTimestampElem.timestamp()
: boost::optional<Timestamp>{},
+ postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned()
+ : boost::optional<BSONObj>{},
writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}};
}
@@ -217,6 +233,11 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
}
batchBuilder.doneFast();
+ if (_postBatchResumeToken) {
+ invariant(!_postBatchResumeToken->isEmpty());
+ cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken);
+ }
+
cursorBuilder.doneFast();
if (_latestOplogTimestamp) {
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index ece6ce88a2b..e7aeed9201a 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -78,6 +78,10 @@ public:
_latestOplogTimestamp = ts;
}
+ void setPostBatchResumeToken(BSONObj token) {
+ _postBatchResumeToken = token.getOwned();
+ }
+
long long numDocs() const {
return _numDocs;
}
@@ -103,6 +107,7 @@ private:
BSONArrayBuilder _batch;
long long _numDocs = 0;
Timestamp _latestOplogTimestamp;
+ BSONObj _postBatchResumeToken;
};
/**
@@ -173,6 +178,7 @@ public:
std::vector<BSONObj> batch,
boost::optional<long long> numReturnedSoFar = boost::none,
boost::optional<Timestamp> latestOplogTimestamp = boost::none,
+ boost::optional<BSONObj> postBatchResumeToken = boost::none,
boost::optional<BSONObj> writeConcernError = boost::none);
CursorResponse(CursorResponse&& other) = default;
@@ -215,6 +221,10 @@ public:
return _latestOplogTimestamp;
}
+ boost::optional<BSONObj> getPostBatchResumeToken() const {
+ return _postBatchResumeToken;
+ }
+
boost::optional<BSONObj> getWriteConcernError() const {
return _writeConcernError;
}
@@ -234,6 +244,7 @@ private:
std::vector<BSONObj> _batch;
boost::optional<long long> _numReturnedSoFar;
boost::optional<Timestamp> _latestOplogTimestamp;
+ boost::optional<BSONObj> _postBatchResumeToken;
boost::optional<BSONObj> _writeConcernError;
};
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index aa44ec9c66a..33cc2b40228 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/pipeline/resume_token.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -332,6 +333,36 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) {
ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2));
}
+TEST(CursorResponseTest, serializePostBatchResumeToken) {
+ std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
+ auto postBatchResumeToken = ResumeToken::makeHighWaterMarkResumeToken(Timestamp(1, 2))
+ .toDocument(ResumeToken::SerializationFormat::kHexString)
+ .toBson();
+ CursorResponse response(NamespaceString("db.coll"),
+ CursorId(123),
+ batch,
+ boost::none,
+ boost::none,
+ postBatchResumeToken);
+ auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ ASSERT_BSONOBJ_EQ(serialized,
+ BSON("cursor" << BSON("id" << CursorId(123) << "ns"
+ << "db.coll"
+ << "nextBatch"
+ << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2))
+ << "postBatchResumeToken"
+ << postBatchResumeToken)
+ << "ok"
+ << 1));
+ auto reparsed = CursorResponse::parseFromBSON(serialized);
+ ASSERT_OK(reparsed.getStatus());
+ CursorResponse reparsedResponse = std::move(reparsed.getValue());
+ ASSERT_EQ(reparsedResponse.getCursorId(), CursorId(123));
+ ASSERT_EQ(reparsedResponse.getNSS().ns(), "db.coll");
+ ASSERT_EQ(reparsedResponse.getBatch().size(), 2U);
+ ASSERT_BSONOBJ_EQ(*reparsedResponse.getPostBatchResumeToken(), postBatchResumeToken);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index a458799845b..1c9612833de 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -127,7 +127,8 @@ MultiPlanStage* getMultiPlanStage(PlanStage* root) {
* there is no PPS that is root.
*/
PipelineProxyStage* getPipelineProxyStage(PlanStage* root) {
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
return static_cast<PipelineProxyStage*>(root);
}
@@ -884,7 +885,8 @@ std::string Explain::getPlanSummary(const PlanExecutor* exec) {
// static
std::string Explain::getPlanSummary(const PlanStage* root) {
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
auto pipelineProxy = static_cast<const PipelineProxyStage*>(root);
return pipelineProxy->getPlanSummaryStr();
}
@@ -918,7 +920,8 @@ void Explain::getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsO
PlanStage* root = exec.getRootStage();
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
auto pipelineProxy = static_cast<PipelineProxyStage*>(root);
pipelineProxy->getPlanSummaryStats(statsOut);
return;
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 352d5b198b4..2a5067f7f4b 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -41,9 +41,9 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
+#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/multi_plan.h"
-#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/exec/subplan.h"
@@ -715,14 +715,20 @@ void PlanExecutor::enqueue(const BSONObj& obj) {
_stash.push(obj.getOwned());
}
-Timestamp PlanExecutor::getLatestOplogTimestamp() {
- if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
- return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();
+Timestamp PlanExecutor::getLatestOplogTimestamp() const {
+ if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY))
+ return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getLatestOplogTimestamp();
if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN))
return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp();
return Timestamp();
}
+BSONObj PlanExecutor::getPostBatchResumeToken() const {
+ if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY))
+ return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getPostBatchResumeToken();
+ return {};
+}
+
//
// PlanExecutor::Deleter
//
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 76bb21290f1..37360382e60 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -467,7 +467,13 @@ public:
* If the last oplog timestamp is being tracked for this PlanExecutor, return it.
* Otherwise return a null timestamp.
*/
- Timestamp getLatestOplogTimestamp();
+ Timestamp getLatestOplogTimestamp() const;
+
+ /**
+ * If this PlanExecutor is tracking change stream resume tokens, return the most recent token
+ * for the batch that is currently being built. Otherwise, return an empty object.
+ */
+ BSONObj getPostBatchResumeToken() const;
private:
/**
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index bbcc4d7d939..91ddbccb605 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -358,6 +358,7 @@ PlanStage* buildStages(OperationContext* opCtx,
return new EnsureSortedStage(opCtx, esn->pattern, ws, childStage);
}
case STAGE_CACHED_PLAN:
+ case STAGE_CHANGE_STREAM_PROXY:
case STAGE_COUNT:
case STAGE_DELETE:
case STAGE_NOTIFY_DELETE:
diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h
index 74ed3bc5fc9..467e65fd610 100644
--- a/src/mongo/db/query/stage_types.h
+++ b/src/mongo/db/query/stage_types.h
@@ -91,7 +91,8 @@ enum StageType {
STAGE_OR,
STAGE_PROJECTION,
- // Stage for running aggregation pipelines.
+ // Stages for running aggregation pipelines.
+ STAGE_CHANGE_STREAM_PROXY,
STAGE_PIPELINE_PROXY,
STAGE_QUEUED_DATA,
diff --git a/src/mongo/util/uuid.cpp b/src/mongo/util/uuid.cpp
index 4f243c99401..6df2b6853a9 100644
--- a/src/mongo/util/uuid.cpp
+++ b/src/mongo/util/uuid.cpp
@@ -89,6 +89,10 @@ UUID UUID::parse(const BSONObj& obj) {
return res.getValue();
}
+UUID UUID::makeDefaultForChangeStream() {
+ return UUID{};
+}
+
bool UUID::isUUIDString(const std::string& s) {
return std::regex_match(s, uuidRegex);
}
diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h
index 06535daf979..17398d21ab5 100644
--- a/src/mongo/util/uuid.h
+++ b/src/mongo/util/uuid.h
@@ -100,6 +100,12 @@ public:
static StatusWith<UUID> parse(BSONElement from);
/**
+ * Returns the nil UUID; that is, a UUID composed entirely of zeroes.
+ * Used in change streams only.
+ */
+ static UUID makeDefaultForChangeStream();
+
+ /**
* Parses a BSON document of the form { uuid: BinData(4, "...") }.
*
* For IDL.