diff options
-rw-r--r-- | jstests/sharding/change_streams_unsharded_becomes_sharded.js | 21 | ||||
-rw-r--r-- | src/mongo/base/error_codes.yml | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_helpers_legacy.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_topology_change_info.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_topology_change_info.h | 61 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp | 101 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_topology_change.h | 87 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_update_on_add_shard.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/plan_executor_pipeline.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/plan_executor_pipeline.h | 6 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 3 | ||||
-rw-r--r-- | src/mongo/shell/SConscript | 2 |
16 files changed, 379 insertions, 14 deletions
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js index 75e8f2a4388..b44fb6c1f88 100644 --- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js +++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js @@ -6,6 +6,9 @@ // @tags: [ // requires_majority_read_concern, // uses_change_streams, +// # TODO SERVER-30784: Remove 'multiversion_incompatible' tag and +// # 'throwChangeStreamTopologyChangeExceptionToClient'. +// multiversion_incompatible // ] (function() { "use strict"; @@ -172,6 +175,24 @@ function testUnshardedBecomesSharded(collToWatch) { ] }); + // Verify that the kNewShardDetected event is successfully delivered to mongoS even in cases + // where the event does not match the user's filter. + // TODO SERVER-30784: remove this test-case, or rework it without the failpoint, when the + // kNewShardDetected event is the only way we detect a new shard for the collection. + mongosDB.adminCommand( + {configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "alwaysOn"}); + ChangeStreamTest.assertChangeStreamThrowsCode({ + db: mongosDB, + collName: collToWatch, + pipeline: [ + {$changeStream: {resumeAfter: preShardCollectionResumeToken}}, + {$match: {operationType: "delete"}} + ], + expectedCode: ErrorCodes.ChangeStreamTopologyChange + }); + mongosDB.adminCommand( + {configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "off"}); + cst.cleanUp(); } diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 083c75dbd9f..061dc16d8ff 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -437,6 +437,8 @@ error_codes: - {code: 346,name: ChangeStreamInvalidated, extra: ChangeStreamInvalidationInfo} - {code: 347, name: APIMismatchError, categories: [VersionedAPIError,VoteAbortError]} + + - {code: 348,name: ChangeStreamTopologyChange, extra: ChangeStreamTopologyChangeInfo} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 1772d5b3a25..b2b407c65d6 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -352,9 +352,7 @@ public: // to the resume token of the invalidating event, and mark the cursor response as // invalidated. We always expect to have ExtraInfo for this error code. const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); - tassert(5493700, - "Missing ChangeStreamInvalidationInfo on exception", - extraInfo != nullptr); + tassert(5493700, "Missing ChangeStreamInvalidationInfo on exception", extraInfo); nextBatch->setPostBatchResumeToken(extraInfo->getInvalidateResumeToken()); nextBatch->setInvalidated(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index e458871d695..74874ee9e93 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -195,8 +195,7 @@ bool handleCursorCommand(OperationContext* opCtx, // to the resume token of the invalidating event, and mark the cursor response as // invalidated. We expect ExtraInfo to always be present for this exception. const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); - tassert( - 5493701, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr); + tassert(5493701, "Missing ChangeStreamInvalidationInfo on exception", extraInfo); responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken()); responseBuilder.setInvalidated(); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 57e731d200b..af8c34fcbf3 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -213,8 +213,9 @@ env.Library( ) env.Library( - target="change_stream_invalidation_info", + target="change_stream_error_extra_info", source=[ + 'change_stream_topology_change_info.cpp', 'change_stream_invalidation_info.cpp', ], LIBDEPS=[ @@ -298,7 +299,7 @@ pipelineEnv.Library( '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_id_helpers', '$BUILD_DIR/mongo/db/matcher/expressions', - '$BUILD_DIR/mongo/db/pipeline/change_stream_invalidation_info', + '$BUILD_DIR/mongo/db/pipeline/change_stream_error_extra_info', '$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source', '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface', '$BUILD_DIR/mongo/db/query/collation/collator_interface', @@ -343,6 +344,7 @@ env.Library( 'document_source_change_stream_close_cursor.cpp', 'document_source_change_stream_ensure_resume_token_present.cpp', 'document_source_change_stream_oplog_match.cpp', + 'document_source_change_stream_topology_change.cpp', 'document_source_change_stream_transform.cpp', 'document_source_change_stream_unwind_transactions.cpp', 'document_source_check_invalidate.cpp', diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 9e29df6c279..5d83629a751 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -34,6 +34,7 @@ #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" +#include "mongo/db/pipeline/document_source_change_stream_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h" #include "mongo/db/pipeline/document_source_check_invalidate.h" @@ -119,6 +120,14 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx)); stages.push_back(transformStage); + const bool csOptFeatureFlag = + feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV(); + + // The 'DocumentSourceChangeStreamTopologyChange' only runs in a cluster, and will be dispatched + // by mongoS to the shards. + if (csOptFeatureFlag && expCtx->inMongos) { + stages.push_back(DocumentSourceChangeStreamTopologyChange::create(expCtx)); + } // The resume stage must come after the check invalidate stage so that the former can determine // whether the event that matches the resume token should be followed by an "invalidate" event. @@ -145,7 +154,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( } if (!expCtx->needsMerge) { - if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + if (!csOptFeatureFlag) { // There should only be one close cursor stage. If we're on the shards and producing // input to be merged, do not add a close cursor stage, since the mongos will already // have one. @@ -226,4 +235,9 @@ Value DocumentSourceLookupChangePostImage::serializeLegacy( return (explain ? Value{Document{{kStageName, Document()}}} : Value()); } +Value DocumentSourceChangeStreamTopologyChange::serializeLegacy( + boost::optional<ExplainOptions::Verbosity> explain) const { + return (explain ? Value{Document{{kStageName, Document()}}} : Value()); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_topology_change_info.cpp b/src/mongo/db/pipeline/change_stream_topology_change_info.cpp new file mode 100644 index 00000000000..e64743b671e --- /dev/null +++ b/src/mongo/db/pipeline/change_stream_topology_change_info.cpp @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/change_stream_topology_change_info.h" + +#include "mongo/base/init.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { +namespace { + +MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(ChangeStreamTopologyChangeInfo); + +} // namespace + +std::shared_ptr<const ErrorExtraInfo> ChangeStreamTopologyChangeInfo::parse(const BSONObj& obj) { + return std::make_shared<ChangeStreamTopologyChangeInfo>(obj["topologyChangeEvent"].Obj()); +} + +void ChangeStreamTopologyChangeInfo::serialize(BSONObjBuilder* bob) const { + bob->append("topologyChangeEvent", _topologyChangeEvent); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_topology_change_info.h b/src/mongo/db/pipeline/change_stream_topology_change_info.h new file mode 100644 index 00000000000..18fd9b24838 --- /dev/null +++ b/src/mongo/db/pipeline/change_stream_topology_change_info.h @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/error_extra_info.h" +#include "mongo/bson/bsonobj.h" + +namespace mongo { + +class BSONObjBuilder; + +/** + * Contains a BSONObj the stores the topology change event in a sharded-cluster deployment. + */ +class ChangeStreamTopologyChangeInfo final : public ErrorExtraInfo { +public: + static constexpr auto code = ErrorCodes::ChangeStreamTopologyChange; + + static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj& obj); + + explicit ChangeStreamTopologyChangeInfo(BSONObj topologyChangeEvent) + : _topologyChangeEvent{topologyChangeEvent.getOwned()} {} + + BSONObj getTopologyChangeEvent() const { + return _topologyChangeEvent; + } + + void serialize(BSONObjBuilder* bob) const final; + +private: + BSONObj _topologyChangeEvent; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp new file mode 100644 index 00000000000..ce46192ac4a --- /dev/null +++ b/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_change_stream_topology_change.h" + +#include "mongo/db/pipeline/change_stream_topology_change_info.h" + +namespace mongo { + +REGISTER_INTERNAL_DOCUMENT_SOURCE( + _internalChangeStreamTopologyChange, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamTopologyChange::createFromBson, + feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); + +StageConstraints DocumentSourceChangeStreamTopologyChange::constraints( + Pipeline::SplitState pipeState) const { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; +} + + +boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> +DocumentSourceChangeStreamTopologyChange::createFromBson( + const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(5669601, + str::stream() << "the '" << kStageName << "' spec must be an object", + elem.type() == Object && elem.Obj().isEmpty()); + return new DocumentSourceChangeStreamTopologyChange(expCtx); +} + +DocumentSource::GetNextResult DocumentSourceChangeStreamTopologyChange::doGetNext() { + auto nextInput = pSource->getNext(); + + if (!nextInput.isAdvanced()) { + return nextInput; + } + + auto eventDoc = nextInput.getDocument(); + + const StringData eventOpType = + eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData(); + + // Throw the 'ChangeStreamTopologyChangeInfo' exception, wrapping the topology change event + // along with its metadata. This will bypass the remainder of the pipeline and will be passed + // directly up to mongoS. + if (eventOpType == DocumentSourceChangeStream::kNewShardDetectedOpType) { + uasserted(ChangeStreamTopologyChangeInfo(eventDoc.toBsonWithMetaData()), + "Collection migrated to new shard"); + } + + return nextInput; +} + +Value DocumentSourceChangeStreamTopologyChange::serializeLatest( + boost::optional<ExplainOptions::Verbosity> explain) const { + if (explain) { + return Value(DOC(DocumentSourceChangeStream::kStageName + << DOC("stage" + << "internalChangeStreamTopologyChange"_sd))); + } + + return Value(Document{{kStageName, Document()}}); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_topology_change.h new file mode 100644 index 00000000000..8e03f75ca15 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_change_stream_topology_change.h @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" + +namespace mongo { + +/** + * This stage detects change stream topology changes in the form of 'kNewShardDetectedOpType' events + * and forwards them directly to the executor via an exception. Using an exception bypasses the rest + * of the pipeline, ensuring that the event cannot be filtered out or modified by user-specified + * stages and that it will ultimately be available to the mongoS. + * + * The mongoS must see all 'kNewShardDetectedOpType' events, so that it knows when it needs to open + * cursors on newly active shards. These events are generated when a chunk is migrated to a shard + * that previously may not have held any data for the collection being watched, and they contain the + * information necessary for the mongoS to include the new shard in the merged change stream. + */ +class DocumentSourceChangeStreamTopologyChange final + : public DocumentSource, + public ChangeStreamStageSerializationInterface { +public: + static constexpr StringData kStageName = "$_internalChangeStreamTopologyChange"_sd; + + static boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> createFromBson( + const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + static boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceChangeStreamTopologyChange(expCtx); + } + + const char* getSourceName() const final { + return kStageName.rawData(); + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final; + + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + return boost::none; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { + return ChangeStreamStageSerializationInterface::serializeToValue(explain); + } + +private: + DocumentSourceChangeStreamTopologyChange(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(kStageName, expCtx) {} + + GetNextResult doGetNext() final; + + Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; + + Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp index 7aec2272d25..35dff608271 100644 --- a/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp +++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp @@ -31,6 +31,7 @@ #include <algorithm> +#include "mongo/db/pipeline/change_stream_topology_change_info.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/query_feature_flags_gen.h" @@ -40,10 +41,14 @@ #include "mongo/s/grid.h" #include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/s/query/establish_cursors.h" +#include "mongo/util/fail_point.h" namespace mongo { namespace { +// Failpoint to throw an exception when the 'kNewShardDetected' event is observed. +MONGO_FAIL_POINT_DEFINE(throwChangeStreamTopologyChangeExceptionToClient); + // Returns true if the change stream document is an event in 'config.shards'. bool isShardConfigEvent(const Document& eventDoc) { // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility @@ -52,8 +57,19 @@ bool isShardConfigEvent(const Document& eventDoc) { // where a change stream is targeted to a subset of shards. See SERVER-44039 for details. if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() == DocumentSourceChangeStream::kNewShardDetectedOpType) { + // If the failpoint is enabled, throw the 'ChangeStreamToplogyChange' exception to the + // client. This is used in testing to confirm that the swallowed 'kNewShardDetected' event + // has reached the mongoS. + // TODO SERVER-30784: remove this failpoint when the 'kNewShardDetected' event is the only + // way we detect a new shard. + if (MONGO_unlikely(throwChangeStreamTopologyChangeExceptionToClient.shouldFail())) { + uasserted(ChangeStreamTopologyChangeInfo(eventDoc.toBsonWithMetaData()), + "Collection migrated to new shard"); + } + return true; } + auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField]; return nsObj.getType() == BSONType::Object && nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() && diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index 01d31be0ff8..e491d99afdc 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/plan_executor_pipeline.h" +#include "mongo/db/pipeline/change_stream_topology_change_info.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/pipeline/plan_explainer_pipeline.h" #include "mongo/db/pipeline/resume_token.h" @@ -112,7 +113,7 @@ bool PlanExecutorPipeline::isEOF() { } boost::optional<Document> PlanExecutorPipeline::_getNext() { - auto nextDoc = _pipeline->getNext(); + auto nextDoc = _tryGetNext(); if (!nextDoc) { _pipelineIsEof = true; } @@ -123,6 +124,15 @@ boost::optional<Document> PlanExecutorPipeline::_getNext() { return nextDoc; } +boost::optional<Document> PlanExecutorPipeline::_tryGetNext() try { + return _pipeline->getNext(); +} catch (const ExceptionFor<ErrorCodes::ChangeStreamTopologyChange>& ex) { + // This exception contains the next document to be returned by the pipeline. + const auto extraInfo = ex.extraInfo<ChangeStreamTopologyChangeInfo>(); + tassert(5669600, "Missing ChangeStreamTopologyChangeInfo on exception", extraInfo); + return Document::fromBsonWithMetaData(extraInfo->getTopologyChangeEvent()); +} + void PlanExecutorPipeline::_updateResumableScanState(const boost::optional<Document>& document) { switch (_resumableScanType) { case ResumableScanType::kChangeStream: diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h index 7fbb0d677b4..ed77e3cc8c1 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.h +++ b/src/mongo/db/pipeline/plan_executor_pipeline.h @@ -157,6 +157,12 @@ private: boost::optional<Document> _getNext(); /** + * Obtains the next result from the pipeline, gracefully handling any known exceptions which may + * be thrown. + */ + boost::optional<Document> _tryGetNext(); + + /** * For a change stream or resumable oplog scan, updates the scan state based on the latest * document returned by the underlying pipeline. */ diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 8714c6b1aad..a9686f15920 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -296,8 +296,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, // to the resume token of the invalidating event, and mark the cursor response as // invalidated. We always expect to have ExtraInfo for this error code. const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); - tassert( - 5493706, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr); + tassert(5493706, "Missing ChangeStreamInvalidationInfo on exception", extraInfo); responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken()); responseBuilder.setInvalidated(); diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 9f2665d95ac..ff937c5837a 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -804,8 +804,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // to the resume token of the invalidating event, and mark the cursor response as // invalidated. We always expect to have ExtraInfo for this error code. const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); - tassert( - 5493707, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr); + tassert(5493707, "Missing ChangeStreamInvalidationInfo on exception", extraInfo); postBatchResumeToken = extraInfo->getInvalidateResumeToken(); cursorState = ClusterCursorManager::CursorState::Exhausted; diff --git a/src/mongo/shell/SConscript b/src/mongo/shell/SConscript index 367e59e1e8a..0ba234a9947 100644 --- a/src/mongo/shell/SConscript +++ b/src/mongo/shell/SConscript @@ -237,7 +237,7 @@ if not has_option('noshell') and usemozjs: "$BUILD_DIR/mongo/db/catalog/index_key_validate", "$BUILD_DIR/mongo/db/logical_session_id_helpers", "$BUILD_DIR/mongo/db/mongohasher", - "$BUILD_DIR/mongo/db/pipeline/change_stream_invalidation_info", + "$BUILD_DIR/mongo/db/pipeline/change_stream_error_extra_info", "$BUILD_DIR/mongo/db/query/command_request_response", "$BUILD_DIR/mongo/db/query/query_request", "$BUILD_DIR/mongo/db/server_options_core", |