diff options
26 files changed, 485 insertions, 36 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js index a6d33b0335d..f51dacc5b51 100644 --- a/jstests/change_streams/metadata_notifications.js +++ b/jstests/change_streams/metadata_notifications.js @@ -5,7 +5,7 @@ (function() { "use strict"; -load("jstests/libs/change_stream_util.js"); +load("jstests/libs/change_stream_util.js"); // For isChangeStreamOptimizationEnabled. load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/fixture_helpers.js"); // For isSharded. @@ -126,6 +126,18 @@ assert.commandFailedWithCode(db.runCommand({ }), ErrorCodes.InvalidResumeToken); +// Test that if change stream optimization is enabled, then even after the 'invalidate' event has +// been filtered out, the cursor should hold the resume token of the 'invalidate' event. +if (isChangeStreamOptimizationEnabled(db)) { + const resumeStream = + coll.watch([{$match: {operationType: "DummyOperationType"}}], {resumeAfter: resumeToken}); + assert.soon(() => { + assert(!resumeStream.hasNext()); + return resumeStream.isExhausted(); + }); + assert.eq(resumeStream.getResumeToken(), resumeTokenInvalidate); +} + // Test resuming the change stream from the collection drop using 'startAfter'. assertResumeExpected({ coll: coll.getName(), diff --git a/jstests/change_streams/whole_db_metadata_notifications.js b/jstests/change_streams/whole_db_metadata_notifications.js index 45f751c9aa0..ff557cd0469 100644 --- a/jstests/change_streams/whole_db_metadata_notifications.js +++ b/jstests/change_streams/whole_db_metadata_notifications.js @@ -5,7 +5,8 @@ (function() { "use strict"; -load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest, + // isChangeStreamOptimizationEnabled. load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. @@ -198,7 +199,20 @@ assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}); // 'invalidate'. assert.commandWorked(testDB.dropDatabase()); cst.assertDatabaseDrop({cursor: aggCursor, db: testDB}); -cst.assertNextChangesEqual({cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}]}); +const invalidateEvent = cst.assertNextChangesEqual( + {cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}]}); + +// Test that if change stream optimization is enabled, then even after the 'invalidate' event has +// been filtered out, the cursor should hold the resume token of the 'invalidate' event. +if (isChangeStreamOptimizationEnabled(testDB)) { + const resumeStream = + testDB.watch([{$match: {operationType: "DummyOperationType"}}], {resumeAfter: change._id}); + assert.soon(() => { + assert(!resumeStream.hasNext()); + return resumeStream.isExhausted(); + }); + assert.eq(resumeStream.getResumeToken(), invalidateEvent[0]._id); +} cst.cleanUp(); }()); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 349165e9dc0..c7176c7f112 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -15,6 +15,15 @@ const ChangeStreamWatchMode = Object.freeze({ }); /** + * Returns true if feature flag 'featureFlagChangeStreamsOptimization' is enabled, false otherwise. + */ +function isChangeStreamOptimizationEnabled(db) { + const getParam = db.adminCommand({getParameter: 1, featureFlagChangeStreamsOptimization: 1}); + return getParam.hasOwnProperty("featureFlagChangeStreamsOptimization") && + getParam.featureFlagChangeStreamsOptimization.value; +} + +/** * Helper function used internally by ChangeStreamTest. If no passthrough is active, it is exactly * the same as calling db.runCommand. If a passthrough is active and has defined a function * 'changeStreamPassthroughAwareRunCommand', then this method will be overridden to allow individual @@ -52,6 +61,7 @@ function assertInvalidateOp({cursor, opType}) { assert.soon(() => cursor.hasNext()); const invalidate = cursor.next(); assert.eq(invalidate.operationType, "invalidate"); + assert(!cursor.hasNext()); assert(cursor.isExhausted()); assert(cursor.isClosed()); return invalidate; diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js index d5fa409b97e..2af508b217c 100644 --- a/jstests/sharding/change_stream_metadata_notifications.js +++ b/jstests/sharding/change_stream_metadata_notifications.js @@ -6,7 +6,7 @@ // ] (function() { "use strict"; - +load("jstests/libs/change_stream_util.js"); // For isChangeStreamOptimizationEnabled. load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. @@ -20,6 +20,7 @@ const st = new ShardingTest({ const mongosDB = st.s0.getDB(jsTestName()); const mongosColl = mongosDB[jsTestName()]; +const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB); assert.commandWorked(mongosDB.dropDatabase()); @@ -79,8 +80,25 @@ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "invalidate"); + +// Store the collection drop invalidate token for for subsequent tests. +const collectionDropinvalidateToken = next._id; + +assert(!changeStream.hasNext()); assert(changeStream.isExhausted()); +// If change stream optimization feature flag is enabled, verify that even after filtering out all +// events, the cursor still returns the invalidate resume token of the dropped collection. +if (isChangeStreamOptimized) { + const resumeStream = mongosColl.watch([{$match: {operationType: "DummyOperationType"}}], + {resumeAfter: resumeTokenFromFirstUpdate}); + assert.soon(() => { + assert(!resumeStream.hasNext()); + return resumeStream.isExhausted(); + }); + assert.eq(resumeStream.getResumeToken(), collectionDropinvalidateToken); +} + // With an explicit collation, test that we can resume from before the collection drop. changeStream = mongosColl.watch([], {resumeAfter: resumeTokenFromFirstUpdate, collation: {locale: "simple"}}); @@ -103,6 +121,7 @@ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "invalidate"); +assert(!changeStream.hasNext()); assert(changeStream.isExhausted()); // Test that we can resume the change stream without specifying an explicit collation. @@ -138,13 +157,33 @@ assert.commandWorked(mongosDB.dropDatabase()); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); + +// Store the token to be used as 'resumeAfter' token by other change streams. +const resumeTokenAfterDbDrop = next._id; + assert.eq(next.operationType, "drop"); assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "invalidate"); +assert(!changeStream.hasNext()); assert(changeStream.isExhausted()); +// Store the database drop invalidate token for other change streams. +const dbDropInvalidateToken = next._id; + +// If change stream optimization feature flag is enabled, verify that even after filtering out all +// events, the cursor still returns the invalidate resume token of the dropped database. +if (isChangeStreamOptimized) { + const resumeStream = mongosColl.watch([{$match: {operationType: "DummyOperationType"}}], + {resumeAfter: resumeTokenAfterDbDrop}); + assert.soon(() => { + assert(!resumeStream.hasNext()); + return resumeStream.isExhausted(); + }); + assert.eq(resumeStream.getResumeToken(), dbDropInvalidateToken); +} + st.stop(); })(); diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js index 9ea3357aecb..ad99d6caf95 100644 --- a/jstests/sharding/change_streams.js +++ b/jstests/sharding/change_streams.js @@ -190,6 +190,7 @@ function runTest(collName, shardKey) { assert.eq(changeStream.next().operationType, "drop"); assert.soon(() => changeStream.hasNext()); assert.eq(changeStream.next().operationType, "invalidate"); + assert(!changeStream.hasNext()); assert(changeStream.isExhausted()); jsTestLog('Testing aggregate command closes cursor for invalidate entries with shard key' + diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 948745052fd..9352da0f7a1 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -430,9 +430,11 @@ error_codes: # case where it would be possible for MongoS to retry where MongoD couldn't. - {code: 343, name: ShardCannotRefreshDueToLocksHeld, extra: ShardCannotRefreshDueToLocksHeldInfo} - + - {code: 344, name: AuditingNotEnabled} - {code: 345, name: RuntimeAuditConfigurationNotEnabled} + + - {code: 346,name: ChangeStreamInvalidated, extra: ChangeStreamInvalidationInfo} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index f3fc638cb63..5edf6fcc34a 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -46,6 +46,7 @@ #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find.h" #include "mongo/db/query/find_common.h" @@ -339,6 +340,18 @@ public: // This exception indicates that we should close the cursor without reporting an // error. return false; + } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) { + // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT + // to the resume token of the invalidating event, and mark the cursor response as + // invalidated. We always expect to have ExtraInfo for this error code. + const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); + tassert(5493700, + "Missing ChangeStreamInvalidationInfo on exception", + extraInfo != nullptr); + + nextBatch->setPostBatchResumeToken(extraInfo->getInvalidateResumeToken()); + nextBatch->setInvalidated(); + return false; } catch (DBException& exception) { nextBatch->abandon(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 7ba72ab079d..25a87ca9d06 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -49,6 +49,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/aggregation_request_helper.h" +#include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_exchange.h" #include "mongo/db/pipeline/document_source_geo_near.h" @@ -189,6 +190,20 @@ bool handleCursorCommand(OperationContext* opCtx, cursor = nullptr; exec = nullptr; break; + } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) { + // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT + // to the resume token of the invalidating event, and mark the cursor response as + // invalidated. We expect ExtraInfo to always be present for this exception. + const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); + tassert( + 5493701, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr); + + responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken()); + responseBuilder.setInvalidated(); + + cursor = nullptr; + exec = nullptr; + break; } catch (DBException& exception) { auto&& explainer = exec->getPlanExplainer(); auto&& [stats, _] = diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 5fde0b777e8..4a1bc56dc38 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -206,6 +206,16 @@ env.Library( ], ) +env.Library( + target="change_stream_invalidation_info", + source=[ + 'change_stream_invalidation_info.cpp', + ], + LIBDEPS=[ + "$BUILD_DIR/mongo/base", + ], +) + pipelineEnv = env.Clone() pipelineEnv.InjectThirdParty(libraries=['snappy']) pipelineEnv.Library( @@ -290,6 +300,7 @@ pipelineEnv.Library( '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_id_helpers', '$BUILD_DIR/mongo/db/matcher/expressions', + '$BUILD_DIR/mongo/db/pipeline/change_stream_invalidation_info', '$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source', '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface', '$BUILD_DIR/mongo/db/query/collation/collator_interface', diff --git a/src/mongo/db/pipeline/change_stream_invalidation_info.cpp b/src/mongo/db/pipeline/change_stream_invalidation_info.cpp new file mode 100644 index 00000000000..7be3bf8e614 --- /dev/null +++ b/src/mongo/db/pipeline/change_stream_invalidation_info.cpp @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "mongo/db/pipeline/change_stream_invalidation_info.h" +#include "mongo/base/init.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { +namespace { + +MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(ChangeStreamInvalidationInfo); + +} // namespace + +std::shared_ptr<const ErrorExtraInfo> ChangeStreamInvalidationInfo::parse(const BSONObj& obj) { + return std::make_shared<ChangeStreamInvalidationInfo>(obj["invalidateToken"].Obj()); +} + +void ChangeStreamInvalidationInfo::serialize(BSONObjBuilder* bob) const { + bob->append("invalidateToken", _invalidateToken); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_invalidation_info.h b/src/mongo/db/pipeline/change_stream_invalidation_info.h new file mode 100644 index 00000000000..5f962009308 --- /dev/null +++ b/src/mongo/db/pipeline/change_stream_invalidation_info.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/error_extra_info.h" +#include "mongo/bson/bsonobj.h" + +namespace mongo { + +class BSONObjBuilder; + +/** + * Contains information to augment the 'ChangeStreamInvalidated' error code. In particular, this + * class holds the resume token of the "invalidate" event which gave rise to the exception. + */ +class ChangeStreamInvalidationInfo final : public ErrorExtraInfo { +public: + static constexpr auto code = ErrorCodes::ChangeStreamInvalidated; + + static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj& obj); + + explicit ChangeStreamInvalidationInfo(BSONObj invalidateToken) + : _invalidateToken{invalidateToken.getOwned()} {} + + BSONObj getInvalidateResumeToken() const { + return _invalidateToken; + } + + void serialize(BSONObjBuilder* bob) const final; + +private: + BSONObj _invalidateToken; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index f3db4b1de3d..64a0c7c6cbd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -49,6 +49,7 @@ #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/resume_token.h" +#include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" @@ -490,10 +491,21 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( auto stages = buildPipeline(expCtx, spec, elem); + const bool csOptFeatureFlag = + feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV(); + + if (expCtx->inMongos && csOptFeatureFlag) { + // TODO SERVER-55491: replace with DocumentSourceUpdateOnAddShard. + stages.push_back(DocumentSourceChangeStreamPipelineSplitter::create(expCtx)); + } + if (!expCtx->needsMerge) { - // There should only be one close cursor stage. If we're on the shards and producing input - // to be merged, do not add a close cursor stage, since the mongos will already have one. - stages.push_back(DocumentSourceCloseCursor::create(expCtx)); + if (!csOptFeatureFlag) { + // There should only be one close cursor stage. If we're on the shards and producing + // input to be merged, do not add a close cursor stage, since the mongos will already + // have one. + stages.push_back(DocumentSourceCloseCursor::create(expCtx)); + } // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here // so that any $match stages which follow the $changeStream pipeline prefix may be able to diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index daa7cbb597a..eb40d2908ab 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -31,6 +31,7 @@ #include <type_traits> +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream_gen.h" #include "mongo/db/pipeline/document_source_match.h" @@ -268,4 +269,57 @@ private: using DocumentSourceMatch::DocumentSourceMatch; }; +/** + * A DocumentSource that if part of the pipeline, directly passes on the received documents to the + * next stages without interpreting it and marks where a sharded change streams pipeline should be + * split. This stage should only ever be created by a mongoS. + * + * TODO SERVER-55491: replace this class with DocumentSourceUpdateOnAddShard. + */ +class DocumentSourceChangeStreamPipelineSplitter final : public DocumentSource { +public: + static constexpr StringData kStageName = "$_internalChangeStreamPipelineSplitter"_sd; + + static boost::intrusive_ptr<DocumentSourceChangeStreamPipelineSplitter> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceChangeStreamPipelineSplitter(expCtx); + } + + const char* getSourceName() const final { + return DocumentSourceChangeStreamPipelineSplitter::kStageName.rawData(); + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kMongoS, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { + return (explain ? Value(Document{{kStageName, Document{}}}) : Value()); + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + return DistributedPlanLogic{nullptr, nullptr, change_stream_constants::kSortSpec}; + } + +private: + DocumentSourceChangeStreamPipelineSplitter( + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(kStageName, expCtx) { + invariant(expCtx->inMongos); + } + + GetNextResult doGetNext() final { + // Pass on the document to the next stage without interpreting. + return pSource->getNext(); + } +}; + } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 3d645027f0e..df87736286c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -51,6 +51,7 @@ #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" +#include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/transaction_history_iterator.h" @@ -206,12 +207,12 @@ public: const std::vector<repl::OplogEntry> transactionEntries = {}, std::vector<Document> documentsForLookup = {}) { vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.getEntry().toBSON(), spec); - auto closeCursor = stages.back(); + auto lastStage = stages.back(); getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( docKeyFields, transactionEntries, std::move(documentsForLookup)); - auto next = closeCursor->getNext(); + auto next = lastStage->getNext(); // Match stage should pass the doc down if expectedDoc is given. ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc)); if (expectedDoc) { @@ -219,11 +220,17 @@ public: } if (expectedInvalidate) { - next = closeCursor->getNext(); + next = lastStage->getNext(); ASSERT_TRUE(next.isAdvanced()); ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedInvalidate); + // Then throw an exception on the next call of getNext(). - ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); + if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); + } else { + ASSERT_THROWS(lastStage->getNext(), + ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); + } } } @@ -259,16 +266,20 @@ public: auto mock = DocumentSourceMock::createForTest(D(entry), getExpCtx()); stages.insert(stages.begin(), mock); + // Remove the DSEnsureResumeTokenPresent stage since it will swallow the result. + auto newEnd = std::remove_if(stages.begin(), stages.end(), [](auto& stage) { + return dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage.get()); + }); + stages.erase(newEnd, stages.end()); + // Wire up the stages by setting the source stage. - auto prevStage = stages[0].get(); + auto prevIt = stages.begin(); for (auto stageIt = stages.begin() + 1; stageIt != stages.end(); stageIt++) { auto stage = (*stageIt).get(); - // Do not include the check resume token stage since it will swallow the result. - if (dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage)) - continue; - stage->setSource(prevStage); - prevStage = stage; + stage->setSource((*prevIt).get()); + prevIt = stageIt; } + return stages; } @@ -1964,7 +1975,11 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj()); auto result = DSChangeStream::createFromBson(originalSpec.firstElement(), expCtx); vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); - ASSERT_EQ(allStages.size(), 5UL); + + const size_t changeStreamStageSize = + (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 4 : 5); + ASSERT_EQ(allStages.size(), changeStreamStageSize); + auto stage = allStages[2]; ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get())); @@ -1995,7 +2010,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid()); auto stages = makeStages(dropColl); - auto closeCursor = stages.back(); + auto lastStage = stages.back(); Document expectedDrop{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, @@ -2011,26 +2026,35 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { {DSChangeStream::kClusterTimeField, kDefaultTs}, }; - auto next = closeCursor->getNext(); + auto next = lastStage->getNext(); // Transform into drop entry. ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedDrop); - next = closeCursor->getNext(); + next = lastStage->getNext(); // Transform into invalidate entry. ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedInvalidate); + // Then throw an exception on the next call of getNext(). - ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); + if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); + } else { + ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); + } } TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) { OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid()); auto stages = makeStages(dropColl); - auto closeCursor = stages.back(); + auto lastStage = stages.back(); // Add a match stage after change stream to filter out the invalidate entries. auto match = DocumentSourceMatch::create(fromjson("{operationType: 'insert'}"), getExpCtx()); - match->setSource(closeCursor.get()); + match->setSource(lastStage.get()); // Throw an exception on the call of getNext(). - ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); + if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); + } else { + ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); + } } TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp index 02dd0cd6a61..ed98653b7f2 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp @@ -33,6 +33,8 @@ #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_check_invalidate.h" +#include "mongo/db/query/query_feature_flags_gen.h" +#include "mongo/util/assert_util.h" namespace mongo { @@ -58,12 +60,21 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt } // namespace DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() { + // To declare a change stream as invalidated, this stage first emits an invalidate event and + // then throws a 'ChangeStreamInvalidated' exception on the next call to this method. + if (_queuedInvalidate) { const auto res = DocumentSource::GetNextResult(std::move(_queuedInvalidate.get())); _queuedInvalidate.reset(); return res; } + if (_queuedException && + feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + uasserted(static_cast<ChangeStreamInvalidationInfo>(*_queuedException), + "Change stream invalidated"); + } + auto nextInput = pSource->getNext(); if (!nextInput.isAdvanced()) return nextInput; @@ -108,6 +119,15 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() { result.metadata().setSortKey(Value{resumeTokenDoc}, isSingleElementKey); _queuedInvalidate = result.freeze(); + + // By this point, either the '_startAfterInvalidate' is absent or it is present and the + // current event matches the resume token. In the latter case, we do not want to close the + // change stream and should not throw an exception. Therefore, we only queue up an exception + // if '_startAfterInvalidate' is absent. + if (!_startAfterInvalidate) { + _queuedException = ChangeStreamInvalidationInfo( + _queuedInvalidate->metadata().getSortKey().getDocument().toBson()); + } } // Regardless of whether the first document we see is an invalidating command, we only skip the diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h index a60fb991ead..7326461b1fa 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/pipeline/document_source.h" namespace mongo { @@ -90,6 +91,7 @@ private: boost::optional<ResumeTokenData> _startAfterInvalidate; boost::optional<Document> _queuedInvalidate; + boost::optional<ChangeStreamInvalidationInfo> _queuedException; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 4f5553022ee..2f6e05a0fc8 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -59,6 +59,7 @@ #include "mongo/db/pipeline/semantic_analysis.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/collation/collator_interface_mock.h" +#include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_test_service_context.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/dbtests/dbtests.h" @@ -74,6 +75,10 @@ using std::vector; const NamespaceString kTestNss = NamespaceString("a.collection"); +size_t getChangeStreamStageSize() { + return (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 5 : 6); +} + void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) { repl::ReplicationCoordinator::set( opCtx->getServiceContext(), @@ -2054,7 +2059,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { auto spec = BSON("$changeStream" << BSON("fullDocument" << "updateLookup")); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); - ASSERT_EQ(stages.size(), 6UL); + ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the change lookup is at the end. ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); @@ -2080,7 +2085,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage auto spec = BSON("$changeStream" << BSON("fullDocument" << "updateLookup")); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); - ASSERT_EQ(stages.size(), 6UL); + ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the change lookup is at the end. ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); @@ -2105,7 +2110,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); - ASSERT_EQ(stages.size(), 6UL); + ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the pre-image lookup is at the end. ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get())); @@ -2131,7 +2136,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPre auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); - ASSERT_EQ(stages.size(), 6UL); + ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the pre-image lookup is at the end. ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get())); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 678d5b24caf..9fe9104da25 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -273,8 +273,11 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi continue; } - // A source may not simultaneously be present on both sides of the split. - invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); + // TODO SERVER-55491: remove this 'if' to make the invariant unconditional. + if (distributedPlanLogic->shardsStage && distributedPlanLogic->mergingStage) { + // A source may not simultaneously be present on both sides of the split. + invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); + } if (distributedPlanLogic->shardsStage) shardPipe->push_back(std::move(distributedPlanLogic->shardsStage)); diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index fe5b3bc0175..06c62c3091f 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -51,6 +51,7 @@ const char kBatchDocSequenceField[] = "cursor.nextBatch"; const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch"; const char kPostBatchResumeTokenField[] = "postBatchResumeToken"; const char kPartialResultsReturnedField[] = "partialResultsReturned"; +const char kInvalidatedField[] = "invalidated"; } // namespace @@ -73,6 +74,11 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) if (_partialResultsReturned) { _cursorObject->append(kPartialResultsReturnedField, true); } + + if (_invalidated) { + _cursorObject->append(kInvalidatedField, _invalidated); + } + _cursorObject->append(kIdField, cursorId); _cursorObject->append(kNsField, cursorNamespace); if (_options.atClusterTime) { @@ -123,7 +129,8 @@ CursorResponse::CursorResponse(NamespaceString nss, boost::optional<long long> numReturnedSoFar, boost::optional<BSONObj> postBatchResumeToken, boost::optional<BSONObj> writeConcernError, - bool partialResultsReturned) + bool partialResultsReturned, + bool invalidated) : _nss(std::move(nss)), _cursorId(cursorId), _batch(std::move(batch)), @@ -131,7 +138,8 @@ CursorResponse::CursorResponse(NamespaceString nss, _numReturnedSoFar(numReturnedSoFar), _postBatchResumeToken(std::move(postBatchResumeToken)), _writeConcernError(std::move(writeConcernError)), - _partialResultsReturned(partialResultsReturned) {} + _partialResultsReturned(partialResultsReturned), + _invalidated(invalidated) {} std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany( const BSONObj& cmdResponse) { @@ -246,6 +254,16 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo } } + auto invalidatedElem = cursorObj[kInvalidatedField]; + if (invalidatedElem) { + if (invalidatedElem.type() != BSONType::Bool) { + return {ErrorCodes::BadValue, + str::stream() << kInvalidatedField + << " format is invalid; expected Bool, but found: " + << invalidatedElem.type()}; + } + } + auto writeConcernError = cmdResponse["writeConcernError"]; if (writeConcernError && writeConcernError.type() != BSONType::Object) { @@ -262,7 +280,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned() : boost::optional<BSONObj>{}, writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}, - partialResultsReturned.trueValue()}}; + partialResultsReturned.trueValue(), + invalidatedElem.trueValue()}}; } void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, @@ -292,6 +311,10 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, cursorBuilder.append(kPartialResultsReturnedField, true); } + if (_invalidated) { + cursorBuilder.append(kInvalidatedField, _invalidated); + } + cursorBuilder.doneFast(); builder->append("ok", 1.0); diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 6c7081fa1ec..0bd028825ed 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -95,6 +95,10 @@ public: _partialResultsReturned = partialResults; } + void setInvalidated() { + _invalidated = true; + } + long long numDocs() const { return _numDocs; } @@ -124,6 +128,7 @@ private: long long _numDocs = 0; BSONObj _postBatchResumeToken; bool _partialResultsReturned = false; + bool _invalidated = false; }; /** @@ -194,7 +199,8 @@ public: boost::optional<long long> numReturnedSoFar = boost::none, boost::optional<BSONObj> postBatchResumeToken = boost::none, boost::optional<BSONObj> writeConcernError = boost::none, - bool partialResultsReturned = false); + bool partialResultsReturned = false, + bool invalidated = false); CursorResponse(CursorResponse&& other) = default; CursorResponse& operator=(CursorResponse&& other) = default; @@ -239,6 +245,10 @@ public: return _partialResultsReturned; } + bool getInvalidated() const { + return _invalidated; + } + /** * Converts this response to its raw BSON representation. */ @@ -257,6 +267,7 @@ private: boost::optional<BSONObj> _postBatchResumeToken; boost::optional<BSONObj> _writeConcernError; bool _partialResultsReturned = false; + bool _invalidated = false; }; } // namespace mongo diff --git a/src/mongo/db/query/cursor_response.idl b/src/mongo/db/query/cursor_response.idl index 0679cb6ce5b..66c118ff5f6 100644 --- a/src/mongo/db/query/cursor_response.idl +++ b/src/mongo/db/query/cursor_response.idl @@ -59,6 +59,9 @@ structs: partialResultsReturned: description: "Boolean represents whether partial results are being returned." type: optionalBool + invalidated: + description: "Boolean represents if the cursor has been invalidated." + type: optionalBool InitialResponseCursor: description: "A struct representing an initial response cursor." diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 71fd81ef848..0199878764a 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -36,9 +36,11 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/pipeline/change_stream_constants.h" +#include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/kill_cursors_gen.h" +#include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" #include "mongo/s/catalog/type_shard.h" @@ -122,6 +124,10 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, // A remote cannot be flagged as 'partialResultsReturned' if 'allowPartialResults' is false. invariant(!(_remotes.back().partialResultsReturned && !_params.getAllowPartialResults())); + // For the first batch, cursor should never be invalidated. + tassert( + 5493704, "Found invalidated cursor on the first batch", !_remotes.back().invalidated); + // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse()); @@ -532,6 +538,11 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() "nextEvent() called before an outstanding event was signaled"); } + // Check if the cursor should be invalidated. + if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + _assertNotInvalidated(lk); + } + auto getMoresStatus = _scheduleGetMores(lk); if (!getMoresStatus.isOK()) { return getMoresStatus; @@ -552,6 +563,15 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() return eventToReturn; } +void AsyncResultsMerger::_assertNotInvalidated(WithLock lk) { + if (auto minPromisedSortKey = _getMinPromisedSortKey(lk)) { + const auto& minRemote = _remotes[minPromisedSortKey->second]; + uassert(ChangeStreamInvalidationInfo{minPromisedSortKey->first.firstElement().Obj()}, + "Change stream invalidated", + !(minRemote.invalidated && !_ready(lk))); + } +} + StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse( const BSONObj& responseObj, const RemoteCursorData& remote) { @@ -579,6 +599,14 @@ void AsyncResultsMerger::_updateRemoteMetadata(WithLock lk, // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard. auto& remote = _remotes[remoteIndex]; remote.cursorId = response.getCursorId(); + + // If the response indicates that the cursor has been invalidated, mark the corresponding + // remote as invalidated. This also signifies that the shard cursor has been closed. + remote.invalidated = response.getInvalidated(); + tassert(5493705, + "Unexpectedly encountered invalidated cursor with non-zero ID", + !(remote.invalidated && remote.cursorId > 0)); + if (response.getPostBatchResumeToken()) { // We only expect to see this for change streams. invariant(_params.getSort()); diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 11fc6eb2569..3fde29e141e 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -316,6 +316,9 @@ private: // Count of fetched docs during ARM processing of the current batch. Used to reduce the // batchSize in getMore when mongod returned less docs than the requested batchSize. long long fetchedCount = 0; + + // If set to 'true', the cursor on this shard has been invalidated. + bool invalidated = false; }; class MergingComparator { @@ -439,6 +442,12 @@ private: bool _haveOutstandingBatchRequests(WithLock); /** + * Called internally when attempting to get a new event for the caller to wait on. Throws if + * the shard cursor from which the next result is due has already been invalidated. + */ + void _assertNotInvalidated(WithLock); + + /** * If a promisedMinSortKey has been received from all remotes, returns the lowest such key. * Otherwise, returns boost::none. */ diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 90e2b822178..8827907d5a2 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" @@ -290,6 +291,18 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, // error. cursorState = ClusterCursorManager::CursorState::Exhausted; break; + } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) { + // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT + // to the resume token of the invalidating event, and mark the cursor response as + // invalidated. We always expect to have ExtraInfo for this error code. + const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); + tassert( + 5493706, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr); + + responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken()); + responseBuilder.setInvalidated(); + cursorState = ClusterCursorManager::CursorState::Exhausted; + break; } // Check whether we have exhausted the pipeline's results. diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index f8e9c420f37..9fd94d4ec96 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -45,6 +45,7 @@ #include "mongo/db/commands.h" #include "mongo/db/curop.h" #include "mongo/db/curop_failpoint_helpers.h" +#include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" @@ -790,6 +791,17 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // error. cursorState = ClusterCursorManager::CursorState::Exhausted; break; + } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) { + // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT + // to the resume token of the invalidating event, and mark the cursor response as + // invalidated. We always expect to have ExtraInfo for this error code. + const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); + tassert( + 5493707, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr); + + postBatchResumeToken = extraInfo->getInvalidateResumeToken(); + cursorState = ClusterCursorManager::CursorState::Exhausted; + break; } if (!next.isOK()) { diff --git a/src/mongo/shell/SConscript b/src/mongo/shell/SConscript index 8a201b1a59e..367e59e1e8a 100644 --- a/src/mongo/shell/SConscript +++ b/src/mongo/shell/SConscript @@ -237,6 +237,7 @@ if not has_option('noshell') and usemozjs: "$BUILD_DIR/mongo/db/catalog/index_key_validate", "$BUILD_DIR/mongo/db/logical_session_id_helpers", "$BUILD_DIR/mongo/db/mongohasher", + "$BUILD_DIR/mongo/db/pipeline/change_stream_invalidation_info", "$BUILD_DIR/mongo/db/query/command_request_response", "$BUILD_DIR/mongo/db/query/query_request", "$BUILD_DIR/mongo/db/server_options_core", |