diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-24 16:56:20 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-28 23:23:24 +0000 |
commit | 296dfba8bd13e5c420b8ccfb51878e017eac2f5a (patch) | |
tree | 42a1bad913d0a87165a16f541eab018a95335269 | |
parent | 7e86ff743d75a3cd9a76249740c9e659b5f93cf3 (diff) | |
download | mongo-296dfba8bd13e5c420b8ccfb51878e017eac2f5a.tar.gz |
SERVER-38408 Return postBatchResumeToken with each mongoD change stream batch
(cherry picked from commit fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c)
(cherry picked from commit 7f70c1213b2cc79591ea1d11f9724d057886a0fd)
36 files changed, 580 insertions, 50 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index 300f165d425..b74478cafc4 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -7,6 +7,8 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml index c462f686ce5..24fe451f074 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -7,6 +7,8 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected to # work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml index 6794c2bea6d..3846b1f607e 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml @@ -9,6 +9,8 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml index e8f978f4907..2598e40df77 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -13,6 +13,8 @@ selector: - jstests/change_streams/whole_db.js - jstests/change_streams/whole_db_metadata_notifications.js - jstests/change_streams/whole_db_resumability.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml index 4dda4a931d0..aa5f9e7662a 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml @@ -7,6 +7,8 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml index 1df3df2cdc3..6b86253f6a8 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml @@ -9,6 +9,8 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml index 41a04dc6f71..d619cf96c97 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml @@ -10,6 +10,8 @@ selector: - jstests/change_streams/whole_cluster.js - jstests/change_streams/whole_cluster_metadata_notifications.js - jstests/change_streams/whole_cluster_resumability.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml index 21ec2902dab..bbe2186a6ef 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml @@ -13,6 +13,8 @@ selector: - jstests/change_streams/include_cluster_time.js # Only relevant for single-collection change streams. - jstests/change_streams/metadata_notifications.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml index 5ac70f20dcf..981ba646b39 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml @@ -11,6 +11,8 @@ selector: - jstests/change_streams/report_latest_observed_oplog_timestamp.js # Only relevant for single-collection change streams. - jstests/change_streams/metadata_notifications.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml index d0017dd42dd..6478118e1df 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml @@ -15,6 +15,8 @@ selector: - jstests/change_streams/whole_db.js - jstests/change_streams/whole_db_metadata_notifications.js - jstests/change_streams/whole_db_resumability.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js new file mode 100644 index 00000000000..2094f12246c --- /dev/null +++ b/jstests/change_streams/report_post_batch_resume_token.js @@ -0,0 +1,183 @@ +/** + * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. + * @tags: [uses_transactions] + */ +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + // Drop and recreate collections to assure a clean run. + const collName = "report_post_batch_resume_token"; + const testCollection = assertDropAndRecreateCollection(db, collName); + const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName); + const adminDB = db.getSiblingDB("admin"); + + // Helper function to return the next batch given an initial aggregate command response. + function runNextGetMore(initialCursorResponse) { + const getMoreCollName = initialCursorResponse.cursor.ns.substr( + initialCursorResponse.cursor.ns.indexOf('.') + 1); + return assert.commandWorked(testCollection.runCommand({ + getMore: initialCursorResponse.cursor.id, + collection: getMoreCollName, + batchSize: batchSize + })); + } + + let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate. + const batchSize = 2; + + // Test that postBatchResumeToken is present on empty initial aggregate batch. + let initialAggResponse = assert.commandWorked(testCollection.runCommand( + {aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {batchSize: batchSize}})); + + // Examine the response from the initial agg. It should have a postBatchResumeToken (PBRT), + // despite the fact that the batch is empty. + let initialAggPBRT = initialAggResponse.cursor.postBatchResumeToken; + assert.neq(undefined, initialAggPBRT, tojson(initialAggResponse)); + assert.eq(0, initialAggResponse.cursor.firstBatch.length); + + // Test that postBatchResumeToken is present on empty getMore batch. + let getMoreResponse = runNextGetMore(initialAggResponse); + let getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.neq(undefined, getMorePBRT, tojson(getMoreResponse)); + assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); + assert.eq(0, getMoreResponse.cursor.nextBatch.length); + + // Test that postBatchResumeToken advances with returned events. Insert one document into the + // collection and consume the resulting change stream event. + assert.commandWorked(testCollection.insert({_id: docId++})); + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(1, getMoreResponse.cursor.nextBatch.length); + + // Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal + // to the resume token of the last item in the batch and greater than the initial PBRT. + let resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id; + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(getMorePBRT, resumeTokenFromDoc); + assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); + + // Now seed the collection with enough documents to fit in two batches. + for (let i = 0; i < batchSize * 2; i++) { + assert.commandWorked(testCollection.insert({_id: docId++})); + } + + // Test that postBatchResumeToken is present on non-empty initial aggregate batch. + initialAggResponse = assert.commandWorked(testCollection.runCommand({ + aggregate: collName, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromDoc}}], + cursor: {batchSize: batchSize} + })); + // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the + // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it. + initialAggPBRT = initialAggResponse.cursor.postBatchResumeToken; + assert.neq(undefined, initialAggPBRT, tojson(initialAggResponse)); + assert.eq(batchSize, initialAggResponse.cursor.firstBatch.length); + assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); + + // Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the + // observed postBatchResumeToken advanced. + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(batchSize, getMoreResponse.cursor.nextBatch.length); + + // The postBatchResumeToken is again equal to the final token in the batch, and greater than the + // PBRT from the initial response. + resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[batchSize - 1]._id; + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(resumeTokenFromDoc, getMorePBRT, tojson(getMoreResponse)); + assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); + + // Test that postBatchResumeToken advances with writes to an unrelated collection. First make + // sure there is nothing left in our cursor, and obtain the latest PBRT... + getMoreResponse = runNextGetMore(initialAggResponse); + let previousGetMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.neq(undefined, previousGetMorePBRT, tojson(getMoreResponse)); + assert.eq(getMoreResponse.cursor.nextBatch, []); + + // ... then test that it advances on an insert to an unrelated collection. + assert.commandWorked(otherCollection.insert({})); + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(0, getMoreResponse.cursor.nextBatch.length); + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); + + // Insert two documents into the collection which are of the maximum BSON object size. + const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize; + assert.gt(bsonUserSizeLimit, 0); + for (let i = 0; i < 2; ++i) { + const docToInsert = {_id: docId++, padding: ""}; + docToInsert.padding = "a".repeat(bsonUserSizeLimit - Object.bsonsize(docToInsert)); + assert.commandWorked(testCollection.insert(docToInsert)); + } + + // Test that we return the correct postBatchResumeToken in the event that the batch hits the + // byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result, + // because the second result cannot fit in the batch. + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(1, getMoreResponse.cursor.nextBatch.length); + + // Verify that the postBatchResumeToken matches the last event actually added to the batch. + resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id; + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(getMorePBRT, resumeTokenFromDoc); + + // Now retrieve the second event and confirm that the PBRT matches its resume token. + previousGetMorePBRT = getMorePBRT; + getMoreResponse = runNextGetMore(initialAggResponse); + resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id; + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.eq(1, getMoreResponse.cursor.nextBatch.length); + assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); + assert.docEq(getMorePBRT, resumeTokenFromDoc); + + // Test that the PBRT is correctly updated when reading events from within a transaction. + const session = db.getMongo().startSession(); + const sessionDB = session.getDatabase(db.getName()); + + const sessionColl = sessionDB[testCollection.getName()]; + const sessionOtherColl = sessionDB[otherCollection.getName()]; + session.startTransaction(); + + // Write 3 documents to the test collection and 1 to the unrelated collection. + for (let i = 0; i < 3; ++i) { + assert.commandWorked(sessionColl.insert({_id: docId++})); + } + assert.commandWorked(sessionOtherColl.insert({_id: docId++})); + assert.commandWorked(session.commitTransaction_forTesting()); + session.endSession(); + + // Grab the next 2 events, which should be the first 2 events in the transaction. + previousGetMorePBRT = getMorePBRT; + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(2, getMoreResponse.cursor.nextBatch.length); + + // The clusterTime should be the same on each, but the resume token keeps advancing. + const txnEvent1 = getMoreResponse.cursor.nextBatch[0], + txnEvent2 = getMoreResponse.cursor.nextBatch[1]; + const txnClusterTime = txnEvent1.clusterTime; + assert.eq(txnEvent2.clusterTime, txnClusterTime); + assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0); + assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0); + + // The PBRT of the first transaction batch is equal to the last document's resumeToken. + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(getMorePBRT, txnEvent2._id); + + // Now get the next batch. This contains the third and final transaction operation. + previousGetMorePBRT = getMorePBRT; + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(1, getMoreResponse.cursor.nextBatch.length); + + // The clusterTime of this event is the same as the two events from the previous batch, but its + // resume token is greater than the previous PBRT. + const txnEvent3 = getMoreResponse.cursor.nextBatch[0]; + assert.eq(txnEvent3.clusterTime, txnClusterTime); + assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); + + // Because we wrote to the unrelated collection, the final event in the transaction does not + // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the + // last event in the batch, because the unrelated event is within the same transaction and + // therefore has the same clusterTime. + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(getMorePBRT, txnEvent3._id); +})(); diff --git a/src/mongo/base/uuid_test.cpp b/src/mongo/base/uuid_test.cpp index daf43f34d5a..02e1f9fe4ed 100644 --- a/src/mongo/base/uuid_test.cpp +++ b/src/mongo/base/uuid_test.cpp @@ -190,5 +190,11 @@ TEST(UUIDTest, toBSONUsingBSONMacro) { ASSERT_BSONOBJ_EQ(expectedBson, bson); } +TEST(UUIDTest, NilUUID) { + // Test that UUID::nil() returns an all-zero UUID. + auto nilUUID = UUID::parse("00000000-0000-0000-0000-000000000000"); + ASSERT_EQUALS(UUID::makeDefaultForChangeStream(), unittest::assertGet(nilUUID)); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index e2c297186d5..e98200213d0 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -947,6 +947,7 @@ env.Library( 'exec/and_hash.cpp', 'exec/and_sorted.cpp', 'exec/cached_plan.cpp', + 'exec/change_stream_proxy.cpp', 'exec/collection_scan.cpp', 'exec/count.cpp', 'exec/count_scan.cpp', diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 8366f354c06..1eb743af5b4 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -539,8 +539,9 @@ public: // As soon as we get a result, this operation no longer waits. awaitDataState(opCtx).shouldWaitForInserts = false; - // Add result to output buffer. + // Add result to output buffer, set latestOplogTimestamp and postBatchResumeToken. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken()); nextBatch->append(obj); (*numResults)++; } @@ -563,9 +564,10 @@ public: return status; } case PlanExecutor::IS_EOF: - // This causes the reported latest oplog timestamp to advance even when there are - // no results for this particular query. + // This causes the reported latest oplog timestamp and postBatchResumeToken to + // advance even when there are no results for this particular query. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken()); default: return Status::OK(); } diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index c9eb61bb7a5..7b1c6635b48 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -41,7 +41,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" -#include "mongo/db/exec/pipeline_proxy.h" +#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/accumulator.h" @@ -115,8 +115,11 @@ bool handleCursorCommand(OperationContext* opCtx, } if (state == PlanExecutor::IS_EOF) { + // Set both the latestOplogTimestamp and the postBatchResumeToken on the response. responseBuilder.setLatestOplogTimestamp( cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.setPostBatchResumeToken( + cursor->getExecutor()->getPostBatchResumeToken()); if (!cursor->isTailable()) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; @@ -136,7 +139,9 @@ bool handleCursorCommand(OperationContext* opCtx, break; } + // Set both the latestOplogTimestamp and the postBatchResumeToken on the response. responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.setPostBatchResumeToken(cursor->getExecutor()->getPostBatchResumeToken()); responseBuilder.append(next); } @@ -508,7 +513,9 @@ Status runAggregate(OperationContext* opCtx, // Transfer ownership of the Pipeline to the PipelineProxyStage. unownedPipeline = pipeline.get(); auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get()); + auto proxy = liteParsedPipeline.hasChangeStream() + ? make_unique<ChangeStreamProxyStage>(opCtx, std::move(pipeline), ws.get()) + : make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get()); // This PlanExecutor will simply forward requests to the Pipeline, so does not need to // yield or to be registered with any collection's CursorManager to receive invalidations. diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp new file mode 100644 index 00000000000..7780652286a --- /dev/null +++ b/src/mongo/db/exec/change_stream_proxy.cpp @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/change_stream_proxy.h" + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/pipeline/resume_token.h" + +namespace mongo { + +const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY"; + +ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws) + : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) { + invariant(std::any_of( + _pipeline->getSources().begin(), _pipeline->getSources().end(), [](const auto& stage) { + return stage->constraints().isChangeStreamStage(); + })); +} + +boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() { + if (auto next = _pipeline->getNext()) { + // While we have more results to return, we track both the timestamp and the resume token of + // the latest event observed in the oplog, the latter via its _id field. + auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson()); + _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + if (next->getField("_id").getType() == BSONType::Object) { + _postBatchResumeToken = next->getField("_id").getDocument().toBson(); + } + return nextBSON; + } + + // We ran out of results to return. Check whether the oplog cursor has moved forward since the + // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return, + // if the new time is higher than the last then we are guaranteed not to have already returned + // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token + // at the current clusterTime. + auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + if (highWaterMark > _latestOplogTimestamp) { + auto token = ResumeToken::makeHighWaterMarkResumeToken(highWaterMark); + _postBatchResumeToken = + token.toDocument(ResumeToken::SerializationFormat::kHexString).toBson(); + _latestOplogTimestamp = highWaterMark; + } + return boost::none; +} + +std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() { + std::unique_ptr<PlanStageStats> ret = + std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY); + ret->specific = std::make_unique<CollectionScanStats>(); + return ret; +} + +} // namespace mongo diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h new file mode 100644 index 00000000000..601fe43b582 --- /dev/null +++ b/src/mongo/db/exec/change_stream_proxy.h @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/pipeline_proxy.h" + +namespace mongo { + +/** + * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the + * serialization of change stream pipeline output from Document to BSON. In particular, it is + * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that + * are necessary for correct merging on mongoS and, in the latter case, must also be provided to + * mongoD clients. + */ +class ChangeStreamProxyStage final : public PipelineProxyStage { +public: + static const char* kStageType; + + /** + * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into + * the constructor will cause an invariant() to fail. + */ + ChangeStreamProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws); + + /** + * Returns an empty PlanStageStats object. + */ + std::unique_ptr<PlanStageStats> getStats() final; + + /** + * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog + * timestamp in the event that we need to merge on mongoS. + */ + Timestamp getLatestOplogTimestamp() const { + return _includeMetaData ? _latestOplogTimestamp : Timestamp(); + } + + /** + * Passes through the most recent resume token from the proxied pipeline. + */ + BSONObj getPostBatchResumeToken() const { + return _postBatchResumeToken; + } + + StageType stageType() const final { + return STAGE_CHANGE_STREAM_PROXY; + } + +protected: + boost::optional<BSONObj> getNextBson() final; + +private: + Timestamp _latestOplogTimestamp; + BSONObj _postBatchResumeToken; +}; +} // namespace mongo diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index 4267337ab2b..9b0119d1429 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -51,7 +51,13 @@ const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY"; PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, WorkingSet* ws) - : PlanStage(kStageType, opCtx), + : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {} + +PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws, + const char* stageTypeName) + : PlanStage(stageTypeName, opCtx), _pipeline(std::move(pipeline)), _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger _ws(ws) { @@ -128,10 +134,6 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() { return boost::none; } -Timestamp PipelineProxyStage::getLatestOplogTimestamp() const { - return PipelineD::getLatestOplogTimestamp(_pipeline.get()); -} - std::string PipelineProxyStage::getPlanSummaryStr() const { return PipelineD::getPlanSummaryStr(_pipeline.get()); } diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index 370b2b80db6..54c4b7dc6df 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -46,12 +46,14 @@ namespace mongo { /** * Stage for pulling results out from an aggregation pipeline. */ -class PipelineProxyStage final : public PlanStage { +class PipelineProxyStage : public PlanStage { public: PipelineProxyStage(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, WorkingSet* ws); + virtual ~PipelineProxyStage() = default; + PlanStage::StageState doWork(WorkingSetID* out) final; bool isEOF() final; @@ -63,7 +65,7 @@ public: void doReattachToOperationContext() final; // Returns empty PlanStageStats object - std::unique_ptr<PlanStageStats> getStats() final; + std::unique_ptr<PlanStageStats> getStats() override; // Not used. SpecificStats* getSpecificStats() const final { @@ -76,15 +78,10 @@ public: MONGO_UNREACHABLE; } - /** - * Pass through the last oplog timestamp from the proxied pipeline. - */ - Timestamp getLatestOplogTimestamp() const; - std::string getPlanSummaryStr() const; void getPlanSummaryStats(PlanSummaryStats* statsOut) const; - StageType stageType() const final { + StageType stageType() const override { return STAGE_PIPELINE_PROXY; } @@ -97,17 +94,20 @@ public: static const char* kStageType; protected: - void doDispose() final; + PipelineProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws, + const char* stageTypeName); -private: - boost::optional<BSONObj> getNextBson(); + virtual boost::optional<BSONObj> getNextBson(); + void doDispose() final; - // Things in the _stash should be returned before pulling items from _pipeline. + // Items in the _stash should be returned before pulling items from _pipeline. std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; - std::vector<BSONObj> _stash; const bool _includeMetaData; - // Not owned by us. +private: + std::vector<BSONObj> _stash; WorkingSet* _ws; }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 6c4f9a4f25e..f1431ffa8de 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1473,17 +1473,19 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) { TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false); - // Drop database entry doesn't have a UUID. + // Drop database entry has a nil UUID. Document expectedDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, UUID::makeDefaultForChangeStream())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}}, }; Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + UUID::makeDefaultForChangeStream(), + Value(), + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, }; @@ -1674,7 +1676,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabaseShouldReturn UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry. - auto dropDBResumeToken = makeResumeToken(kDefaultTs); + auto dropDBResumeToken = makeResumeToken(kDefaultTs, UUID::makeDefaultForChangeStream()); OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false); Document expectedDropDatabase{ @@ -1684,8 +1686,11 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabaseShouldReturn {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}}, }; - auto fromInvalidateResumeToken = makeResumeToken( - kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate); + auto fromInvalidateResumeToken = + makeResumeToken(kDefaultTs, + UUID::makeDefaultForChangeStream(), + Value(), + ResumeTokenData::FromInvalidate::kFromInvalidate); Document expectedInvalidate{ {DSChangeStream::kIdField, fromInvalidateResumeToken}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 6643c186ebc..9b1455242d1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -340,13 +340,19 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document // UUID should always be present except for invalidate and dropDatabase entries. It will not be // under FCV 3.4, so we should close the stream as invalid. if (operationType != DocumentSourceChangeStream::kInvalidateOpType && - operationType != DocumentSourceChangeStream::kDropDatabaseOpType && uuid.missing()) { - warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get " - "downgraded after opening the stream?"; - operationType = DocumentSourceChangeStream::kInvalidateOpType; - fullDocument = Value(); - updateDescription = Value(); - documentKey = Value(); + operationType != DocumentSourceChangeStream::kDropDatabaseOpType) { + if (uuid.missing()) { + warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get " + "downgraded after opening the stream?"; + operationType = DocumentSourceChangeStream::kInvalidateOpType; + fullDocument = Value(); + updateDescription = Value(); + documentKey = Value(); + } + } else { + // Fill in a dummy UUID for invalidate and dropDatabase, to ensure that they sort after + // high-water-mark tokens. Their sorting relative to other events remains unchanged. + uuid = Value(UUID::makeDefaultForChangeStream()); } // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 49df0d6b557..8871bf07bb7 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -105,9 +105,9 @@ void DocumentSourceCursor::loadBatch() { // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. // Furthermore, if we need to return the latest oplog time (in the tailable and - // needs-merge case), batching will result in a wrong time. - if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || - (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || + // awaitData case), batching will result in a wrong time. + if (pExpCtx->isTailableAwaitData() || + awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f7f1dcf4a7f..beb9542fcca 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -386,6 +386,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } + auto* firstSource = + pipeline->getSources().empty() ? nullptr : pipeline->getSources().front().get(); + if (firstSource && firstSource->constraints().isChangeStreamStage()) { + invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); + plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + } + if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; } diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index e3f7971b869..1c5ed4d037e 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -79,6 +79,14 @@ std::pair<Value, Value> encodeInBinDataFormat(const ResumeTokenData& data) { : Value(BSONBinData(typeBits.getBuffer(), typeBits.getSize(), BinDataType::BinDataGeneral)); return {Value(rawBinary), typeBitsValue}; } + +// Helper function for makeHighWaterMarkResumeToken and isHighWaterMarkResumeToken. +ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime) { + invariant(!clusterTime.isNull()); + ResumeTokenData tokenData; + tokenData.clusterTime = clusterTime; + return tokenData; +} } // namespace bool ResumeTokenData::operator==(const ResumeTokenData& other) const { @@ -293,4 +301,12 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) { return ResumeToken(resumeDoc); } +ResumeToken ResumeToken::makeHighWaterMarkResumeToken(Timestamp clusterTime) { + return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime)); +} + +bool ResumeToken::isHighWaterMarkResumeToken(const ResumeTokenData& tokenData) { + return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 678a1bbea18..f1fc32a1644 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -120,6 +120,18 @@ public: static ResumeToken parse(const Document& document); /** + * Generate a high-water-mark pseudo-token for 'clusterTime', with no UUID or documentKey. + */ + static ResumeToken makeHighWaterMarkResumeToken(Timestamp clusterTime); + + /** + * Returns true if the given token data represents a valid high-water-mark resume token; that + * is, it does not refer to a specific operation, but instead specifies a clusterTime after + * which the stream should resume. + */ + static bool isHighWaterMarkResumeToken(const ResumeTokenData& tokenData); + + /** * The default no-argument constructor is required by the IDL for types used as non-optional * fields. */ diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index bb8eb142176..486dda916b9 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -179,6 +179,7 @@ env.CppUnitTest( ], LIBDEPS=[ "$BUILD_DIR/mongo/db/pipeline/aggregation_request", + "$BUILD_DIR/mongo/db/pipeline/document_sources_idl", 'command_request_response', ] ) diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 5d497aa5f6d..d2307ecc354 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -47,6 +47,7 @@ const char kNsField[] = "ns"; const char kBatchField[] = "nextBatch"; const char kBatchFieldInitial[] = "firstBatch"; const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp"; +const char kPostBatchResumeTokenField[] = "postBatchResumeToken"; } // namespace @@ -60,6 +61,9 @@ CursorResponseBuilder::CursorResponseBuilder(bool isInitialResponse, void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) { invariant(_active); _batch.doneFast(); + if (!_postBatchResumeToken.isEmpty()) { + _cursorObject.append(kPostBatchResumeTokenField, _postBatchResumeToken); + } _cursorObject.append(kIdField, cursorId); _cursorObject.append(kNsField, cursorNamespace); _cursorObject.doneFast(); @@ -105,12 +109,14 @@ CursorResponse::CursorResponse(NamespaceString nss, std::vector<BSONObj> batch, boost::optional<long long> numReturnedSoFar, boost::optional<Timestamp> latestOplogTimestamp, + boost::optional<BSONObj> postBatchResumeToken, boost::optional<BSONObj> writeConcernError) : _nss(std::move(nss)), _cursorId(cursorId), _batch(std::move(batch)), _numReturnedSoFar(numReturnedSoFar), _latestOplogTimestamp(latestOplogTimestamp), + _postBatchResumeToken(std::move(postBatchResumeToken)), _writeConcernError(std::move(writeConcernError)) {} StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdResponse) { @@ -176,6 +182,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo doc.shareOwnershipWith(cmdResponse); } + auto postBatchResumeTokenElem = cursorObj[kPostBatchResumeTokenField]; + if (postBatchResumeTokenElem && postBatchResumeTokenElem.type() != BSONType::Object) { + return {ErrorCodes::BadValue, + str::stream() << kPostBatchResumeTokenField + << " format is invalid; expected Object, but found: " + << postBatchResumeTokenElem.type()}; + } + auto latestOplogTimestampElem = cmdResponse[kInternalLatestOplogTimestampField]; if (latestOplogTimestampElem && latestOplogTimestampElem.type() != BSONType::bsonTimestamp) { return { @@ -199,6 +213,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo boost::none, latestOplogTimestampElem ? latestOplogTimestampElem.timestamp() : boost::optional<Timestamp>{}, + postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned() + : boost::optional<BSONObj>{}, writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}}; } @@ -217,6 +233,11 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, } batchBuilder.doneFast(); + if (_postBatchResumeToken) { + invariant(!_postBatchResumeToken->isEmpty()); + cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken); + } + cursorBuilder.doneFast(); if (_latestOplogTimestamp) { diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index ece6ce88a2b..e7aeed9201a 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -78,6 +78,10 @@ public: _latestOplogTimestamp = ts; } + void setPostBatchResumeToken(BSONObj token) { + _postBatchResumeToken = token.getOwned(); + } + long long numDocs() const { return _numDocs; } @@ -103,6 +107,7 @@ private: BSONArrayBuilder _batch; long long _numDocs = 0; Timestamp _latestOplogTimestamp; + BSONObj _postBatchResumeToken; }; /** @@ -173,6 +178,7 @@ public: std::vector<BSONObj> batch, boost::optional<long long> numReturnedSoFar = boost::none, boost::optional<Timestamp> latestOplogTimestamp = boost::none, + boost::optional<BSONObj> postBatchResumeToken = boost::none, boost::optional<BSONObj> writeConcernError = boost::none); CursorResponse(CursorResponse&& other) = default; @@ -215,6 +221,10 @@ public: return _latestOplogTimestamp; } + boost::optional<BSONObj> getPostBatchResumeToken() const { + return _postBatchResumeToken; + } + boost::optional<BSONObj> getWriteConcernError() const { return _writeConcernError; } @@ -234,6 +244,7 @@ private: std::vector<BSONObj> _batch; boost::optional<long long> _numReturnedSoFar; boost::optional<Timestamp> _latestOplogTimestamp; + boost::optional<BSONObj> _postBatchResumeToken; boost::optional<BSONObj> _writeConcernError; }; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index aa44ec9c66a..33cc2b40228 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -32,6 +32,7 @@ #include "mongo/db/query/cursor_response.h" +#include "mongo/db/pipeline/resume_token.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -332,6 +333,36 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) { ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2)); } +TEST(CursorResponseTest, serializePostBatchResumeToken) { + std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; + auto postBatchResumeToken = ResumeToken::makeHighWaterMarkResumeToken(Timestamp(1, 2)) + .toDocument(ResumeToken::SerializationFormat::kHexString) + .toBson(); + CursorResponse response(NamespaceString("db.coll"), + CursorId(123), + batch, + boost::none, + boost::none, + postBatchResumeToken); + auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse); + ASSERT_BSONOBJ_EQ(serialized, + BSON("cursor" << BSON("id" << CursorId(123) << "ns" + << "db.coll" + << "nextBatch" + << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2)) + << "postBatchResumeToken" + << postBatchResumeToken) + << "ok" + << 1)); + auto reparsed = CursorResponse::parseFromBSON(serialized); + ASSERT_OK(reparsed.getStatus()); + CursorResponse reparsedResponse = std::move(reparsed.getValue()); + ASSERT_EQ(reparsedResponse.getCursorId(), CursorId(123)); + ASSERT_EQ(reparsedResponse.getNSS().ns(), "db.coll"); + ASSERT_EQ(reparsedResponse.getBatch().size(), 2U); + ASSERT_BSONOBJ_EQ(*reparsedResponse.getPostBatchResumeToken(), postBatchResumeToken); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp index a458799845b..1c9612833de 100644 --- a/src/mongo/db/query/explain.cpp +++ b/src/mongo/db/query/explain.cpp @@ -127,7 +127,8 @@ MultiPlanStage* getMultiPlanStage(PlanStage* root) { * there is no PPS that is root. */ PipelineProxyStage* getPipelineProxyStage(PlanStage* root) { - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { return static_cast<PipelineProxyStage*>(root); } @@ -884,7 +885,8 @@ std::string Explain::getPlanSummary(const PlanExecutor* exec) { // static std::string Explain::getPlanSummary(const PlanStage* root) { - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { auto pipelineProxy = static_cast<const PipelineProxyStage*>(root); return pipelineProxy->getPlanSummaryStr(); } @@ -918,7 +920,8 @@ void Explain::getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsO PlanStage* root = exec.getRootStage(); - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { auto pipelineProxy = static_cast<PipelineProxyStage*>(root); pipelineProxy->getPlanSummaryStats(statsOut); return; diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 352d5b198b4..2a5067f7f4b 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -41,9 +41,9 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" +#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/multi_plan.h" -#include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/exec/subplan.h" @@ -715,14 +715,20 @@ void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } -Timestamp PlanExecutor::getLatestOplogTimestamp() { - if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) - return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); +Timestamp PlanExecutor::getLatestOplogTimestamp() const { + if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) + return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getLatestOplogTimestamp(); if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp(); return Timestamp(); } +BSONObj PlanExecutor::getPostBatchResumeToken() const { + if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) + return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getPostBatchResumeToken(); + return {}; +} + // // PlanExecutor::Deleter // diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 76bb21290f1..37360382e60 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -467,7 +467,13 @@ public: * If the last oplog timestamp is being tracked for this PlanExecutor, return it. * Otherwise return a null timestamp. */ - Timestamp getLatestOplogTimestamp(); + Timestamp getLatestOplogTimestamp() const; + + /** + * If this PlanExecutor is tracking change stream resume tokens, return the most recent token + * for the batch that is currently being built. Otherwise, return an empty object. + */ + BSONObj getPostBatchResumeToken() const; private: /** diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index bbcc4d7d939..91ddbccb605 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -358,6 +358,7 @@ PlanStage* buildStages(OperationContext* opCtx, return new EnsureSortedStage(opCtx, esn->pattern, ws, childStage); } case STAGE_CACHED_PLAN: + case STAGE_CHANGE_STREAM_PROXY: case STAGE_COUNT: case STAGE_DELETE: case STAGE_NOTIFY_DELETE: diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index 74ed3bc5fc9..467e65fd610 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -91,7 +91,8 @@ enum StageType { STAGE_OR, STAGE_PROJECTION, - // Stage for running aggregation pipelines. + // Stages for running aggregation pipelines. + STAGE_CHANGE_STREAM_PROXY, STAGE_PIPELINE_PROXY, STAGE_QUEUED_DATA, diff --git a/src/mongo/util/uuid.cpp b/src/mongo/util/uuid.cpp index 4f243c99401..6df2b6853a9 100644 --- a/src/mongo/util/uuid.cpp +++ b/src/mongo/util/uuid.cpp @@ -89,6 +89,10 @@ UUID UUID::parse(const BSONObj& obj) { return res.getValue(); } +UUID UUID::makeDefaultForChangeStream() { + return UUID{}; +} + bool UUID::isUUIDString(const std::string& s) { return std::regex_match(s, uuidRegex); } diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h index 06535daf979..17398d21ab5 100644 --- a/src/mongo/util/uuid.h +++ b/src/mongo/util/uuid.h @@ -100,6 +100,12 @@ public: static StatusWith<UUID> parse(BSONElement from); /** + * Returns the nil UUID; that is, a UUID composed entirely of zeroes. + * Used in change streams only. + */ + static UUID makeDefaultForChangeStream(); + + /** * Parses a BSON document of the form { uuid: BinData(4, "...") }. * * For IDL. |