summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/change_streams_unsharded_becomes_sharded.js21
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp4
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp3
-rw-r--r--src/mongo/db/pipeline/SConscript6
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp16
-rw-r--r--src/mongo/db/pipeline/change_stream_topology_change_info.cpp50
-rw-r--r--src/mongo/db/pipeline/change_stream_topology_change_info.h61
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp101
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_topology_change.h87
-rw-r--r--src/mongo/db/pipeline/document_source_update_on_add_shard.cpp16
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp12
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.h6
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp3
-rw-r--r--src/mongo/s/query/cluster_find.cpp3
-rw-r--r--src/mongo/shell/SConscript2
16 files changed, 379 insertions, 14 deletions
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
index 75e8f2a4388..b44fb6c1f88 100644
--- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js
+++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
@@ -6,6 +6,9 @@
// @tags: [
// requires_majority_read_concern,
// uses_change_streams,
+// # TODO SERVER-30784: Remove 'multiversion_incompatible' tag and
+// # 'throwChangeStreamTopologyChangeExceptionToClient'.
+// multiversion_incompatible
// ]
(function() {
"use strict";
@@ -172,6 +175,24 @@ function testUnshardedBecomesSharded(collToWatch) {
]
});
+ // Verify that the kNewShardDetected event is successfully delivered to mongoS even in cases
+ // where the event does not match the user's filter.
+ // TODO SERVER-30784: remove this test-case, or rework it without the failpoint, when the
+ // kNewShardDetected event is the only way we detect a new shard for the collection.
+ mongosDB.adminCommand(
+ {configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "alwaysOn"});
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ db: mongosDB,
+ collName: collToWatch,
+ pipeline: [
+ {$changeStream: {resumeAfter: preShardCollectionResumeToken}},
+ {$match: {operationType: "delete"}}
+ ],
+ expectedCode: ErrorCodes.ChangeStreamTopologyChange
+ });
+ mongosDB.adminCommand(
+ {configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "off"});
+
cst.cleanUp();
}
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 083c75dbd9f..061dc16d8ff 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -437,6 +437,8 @@ error_codes:
- {code: 346,name: ChangeStreamInvalidated, extra: ChangeStreamInvalidationInfo}
- {code: 347, name: APIMismatchError, categories: [VersionedAPIError,VoteAbortError]}
+
+ - {code: 348,name: ChangeStreamTopologyChange, extra: ChangeStreamTopologyChangeInfo}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 1772d5b3a25..b2b407c65d6 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -352,9 +352,7 @@ public:
// to the resume token of the invalidating event, and mark the cursor response as
// invalidated. We always expect to have ExtraInfo for this error code.
const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
- tassert(5493700,
- "Missing ChangeStreamInvalidationInfo on exception",
- extraInfo != nullptr);
+ tassert(5493700, "Missing ChangeStreamInvalidationInfo on exception", extraInfo);
nextBatch->setPostBatchResumeToken(extraInfo->getInvalidateResumeToken());
nextBatch->setInvalidated();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index e458871d695..74874ee9e93 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -195,8 +195,7 @@ bool handleCursorCommand(OperationContext* opCtx,
// to the resume token of the invalidating event, and mark the cursor response as
// invalidated. We expect ExtraInfo to always be present for this exception.
const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
- tassert(
- 5493701, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr);
+ tassert(5493701, "Missing ChangeStreamInvalidationInfo on exception", extraInfo);
responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken());
responseBuilder.setInvalidated();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 57e731d200b..af8c34fcbf3 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -213,8 +213,9 @@ env.Library(
)
env.Library(
- target="change_stream_invalidation_info",
+ target="change_stream_error_extra_info",
source=[
+ 'change_stream_topology_change_info.cpp',
'change_stream_invalidation_info.cpp',
],
LIBDEPS=[
@@ -298,7 +299,7 @@ pipelineEnv.Library(
'$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
'$BUILD_DIR/mongo/db/matcher/expressions',
- '$BUILD_DIR/mongo/db/pipeline/change_stream_invalidation_info',
+ '$BUILD_DIR/mongo/db/pipeline/change_stream_error_extra_info',
'$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source',
'$BUILD_DIR/mongo/db/query/collation/collator_factory_interface',
'$BUILD_DIR/mongo/db/query/collation/collator_interface',
@@ -343,6 +344,7 @@ env.Library(
'document_source_change_stream_close_cursor.cpp',
'document_source_change_stream_ensure_resume_token_present.cpp',
'document_source_change_stream_oplog_match.cpp',
+ 'document_source_change_stream_topology_change.cpp',
'document_source_change_stream_transform.cpp',
'document_source_change_stream_unwind_transactions.cpp',
'document_source_check_invalidate.cpp',
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 9e29df6c279..5d83629a751 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
+#include "mongo/db/pipeline/document_source_change_stream_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h"
#include "mongo/db/pipeline/document_source_check_invalidate.h"
@@ -119,6 +120,14 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx));
stages.push_back(transformStage);
+ const bool csOptFeatureFlag =
+ feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV();
+
+ // The 'DocumentSourceChangeStreamTopologyChange' only runs in a cluster, and will be dispatched
+ // by mongoS to the shards.
+ if (csOptFeatureFlag && expCtx->inMongos) {
+ stages.push_back(DocumentSourceChangeStreamTopologyChange::create(expCtx));
+ }
// The resume stage must come after the check invalidate stage so that the former can determine
// whether the event that matches the resume token should be followed by an "invalidate" event.
@@ -145,7 +154,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
}
if (!expCtx->needsMerge) {
- if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ if (!csOptFeatureFlag) {
// There should only be one close cursor stage. If we're on the shards and producing
// input to be merged, do not add a close cursor stage, since the mongos will already
// have one.
@@ -226,4 +235,9 @@ Value DocumentSourceLookupChangePostImage::serializeLegacy(
return (explain ? Value{Document{{kStageName, Document()}}} : Value());
}
+Value DocumentSourceChangeStreamTopologyChange::serializeLegacy(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ return (explain ? Value{Document{{kStageName, Document()}}} : Value());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_topology_change_info.cpp b/src/mongo/db/pipeline/change_stream_topology_change_info.cpp
new file mode 100644
index 00000000000..e64743b671e
--- /dev/null
+++ b/src/mongo/db/pipeline/change_stream_topology_change_info.cpp
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/change_stream_topology_change_info.h"
+
+#include "mongo/base/init.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+namespace {
+
+MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(ChangeStreamTopologyChangeInfo);
+
+} // namespace
+
+std::shared_ptr<const ErrorExtraInfo> ChangeStreamTopologyChangeInfo::parse(const BSONObj& obj) {
+ return std::make_shared<ChangeStreamTopologyChangeInfo>(obj["topologyChangeEvent"].Obj());
+}
+
+void ChangeStreamTopologyChangeInfo::serialize(BSONObjBuilder* bob) const {
+ bob->append("topologyChangeEvent", _topologyChangeEvent);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_topology_change_info.h b/src/mongo/db/pipeline/change_stream_topology_change_info.h
new file mode 100644
index 00000000000..18fd9b24838
--- /dev/null
+++ b/src/mongo/db/pipeline/change_stream_topology_change_info.h
@@ -0,0 +1,61 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/error_extra_info.h"
+#include "mongo/bson/bsonobj.h"
+
+namespace mongo {
+
+class BSONObjBuilder;
+
+/**
+ * Contains a BSONObj the stores the topology change event in a sharded-cluster deployment.
+ */
+class ChangeStreamTopologyChangeInfo final : public ErrorExtraInfo {
+public:
+ static constexpr auto code = ErrorCodes::ChangeStreamTopologyChange;
+
+ static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj& obj);
+
+ explicit ChangeStreamTopologyChangeInfo(BSONObj topologyChangeEvent)
+ : _topologyChangeEvent{topologyChangeEvent.getOwned()} {}
+
+ BSONObj getTopologyChangeEvent() const {
+ return _topologyChangeEvent;
+ }
+
+ void serialize(BSONObjBuilder* bob) const final;
+
+private:
+ BSONObj _topologyChangeEvent;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp
new file mode 100644
index 00000000000..ce46192ac4a
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp
@@ -0,0 +1,101 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_change_stream_topology_change.h"
+
+#include "mongo/db/pipeline/change_stream_topology_change_info.h"
+
+namespace mongo {
+
+REGISTER_INTERNAL_DOCUMENT_SOURCE(
+ _internalChangeStreamTopologyChange,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamTopologyChange::createFromBson,
+ feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+
+StageConstraints DocumentSourceChangeStreamTopologyChange::constraints(
+ Pipeline::SplitState pipeState) const {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kNotAllowed,
+ UnionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+}
+
+
+boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange>
+DocumentSourceChangeStreamTopologyChange::createFromBson(
+ const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(5669601,
+ str::stream() << "the '" << kStageName << "' spec must be an object",
+ elem.type() == Object && elem.Obj().isEmpty());
+ return new DocumentSourceChangeStreamTopologyChange(expCtx);
+}
+
+DocumentSource::GetNextResult DocumentSourceChangeStreamTopologyChange::doGetNext() {
+ auto nextInput = pSource->getNext();
+
+ if (!nextInput.isAdvanced()) {
+ return nextInput;
+ }
+
+ auto eventDoc = nextInput.getDocument();
+
+ const StringData eventOpType =
+ eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData();
+
+ // Throw the 'ChangeStreamTopologyChangeInfo' exception, wrapping the topology change event
+ // along with its metadata. This will bypass the remainder of the pipeline and will be passed
+ // directly up to mongoS.
+ if (eventOpType == DocumentSourceChangeStream::kNewShardDetectedOpType) {
+ uasserted(ChangeStreamTopologyChangeInfo(eventDoc.toBsonWithMetaData()),
+ "Collection migrated to new shard");
+ }
+
+ return nextInput;
+}
+
+Value DocumentSourceChangeStreamTopologyChange::serializeLatest(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ if (explain) {
+ return Value(DOC(DocumentSourceChangeStream::kStageName
+ << DOC("stage"
+ << "internalChangeStreamTopologyChange"_sd)));
+ }
+
+ return Value(Document{{kStageName, Document()}});
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_topology_change.h
new file mode 100644
index 00000000000..8e03f75ca15
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_change_stream_topology_change.h
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
+
+namespace mongo {
+
+/**
+ * This stage detects change stream topology changes in the form of 'kNewShardDetectedOpType' events
+ * and forwards them directly to the executor via an exception. Using an exception bypasses the rest
+ * of the pipeline, ensuring that the event cannot be filtered out or modified by user-specified
+ * stages and that it will ultimately be available to the mongoS.
+ *
+ * The mongoS must see all 'kNewShardDetectedOpType' events, so that it knows when it needs to open
+ * cursors on newly active shards. These events are generated when a chunk is migrated to a shard
+ * that previously may not have held any data for the collection being watched, and they contain the
+ * information necessary for the mongoS to include the new shard in the merged change stream.
+ */
+class DocumentSourceChangeStreamTopologyChange final
+ : public DocumentSource,
+ public ChangeStreamStageSerializationInterface {
+public:
+ static constexpr StringData kStageName = "$_internalChangeStreamTopologyChange"_sd;
+
+ static boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> createFromBson(
+ const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ static boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceChangeStreamTopologyChange(expCtx);
+ }
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final;
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
+ return boost::none;
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
+ return ChangeStreamStageSerializationInterface::serializeToValue(explain);
+ }
+
+private:
+ DocumentSourceChangeStreamTopologyChange(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(kStageName, expCtx) {}
+
+ GetNextResult doGetNext() final;
+
+ Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
+
+ Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp
index 7aec2272d25..35dff608271 100644
--- a/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp
+++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp
@@ -31,6 +31,7 @@
#include <algorithm>
+#include "mongo/db/pipeline/change_stream_topology_change_info.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/query_feature_flags_gen.h"
@@ -40,10 +41,14 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/s/query/establish_cursors.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
namespace {
+// Failpoint to throw an exception when the 'kNewShardDetected' event is observed.
+MONGO_FAIL_POINT_DEFINE(throwChangeStreamTopologyChangeExceptionToClient);
+
// Returns true if the change stream document is an event in 'config.shards'.
bool isShardConfigEvent(const Document& eventDoc) {
// TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility
@@ -52,8 +57,19 @@ bool isShardConfigEvent(const Document& eventDoc) {
// where a change stream is targeted to a subset of shards. See SERVER-44039 for details.
if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
DocumentSourceChangeStream::kNewShardDetectedOpType) {
+ // If the failpoint is enabled, throw the 'ChangeStreamToplogyChange' exception to the
+ // client. This is used in testing to confirm that the swallowed 'kNewShardDetected' event
+ // has reached the mongoS.
+ // TODO SERVER-30784: remove this failpoint when the 'kNewShardDetected' event is the only
+ // way we detect a new shard.
+ if (MONGO_unlikely(throwChangeStreamTopologyChangeExceptionToClient.shouldFail())) {
+ uasserted(ChangeStreamTopologyChangeInfo(eventDoc.toBsonWithMetaData()),
+ "Collection migrated to new shard");
+ }
+
return true;
}
+
auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField];
return nsObj.getType() == BSONType::Object &&
nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() &&
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index 01d31be0ff8..e491d99afdc 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/plan_executor_pipeline.h"
+#include "mongo/db/pipeline/change_stream_topology_change_info.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/pipeline/plan_explainer_pipeline.h"
#include "mongo/db/pipeline/resume_token.h"
@@ -112,7 +113,7 @@ bool PlanExecutorPipeline::isEOF() {
}
boost::optional<Document> PlanExecutorPipeline::_getNext() {
- auto nextDoc = _pipeline->getNext();
+ auto nextDoc = _tryGetNext();
if (!nextDoc) {
_pipelineIsEof = true;
}
@@ -123,6 +124,15 @@ boost::optional<Document> PlanExecutorPipeline::_getNext() {
return nextDoc;
}
+boost::optional<Document> PlanExecutorPipeline::_tryGetNext() try {
+ return _pipeline->getNext();
+} catch (const ExceptionFor<ErrorCodes::ChangeStreamTopologyChange>& ex) {
+ // This exception contains the next document to be returned by the pipeline.
+ const auto extraInfo = ex.extraInfo<ChangeStreamTopologyChangeInfo>();
+ tassert(5669600, "Missing ChangeStreamTopologyChangeInfo on exception", extraInfo);
+ return Document::fromBsonWithMetaData(extraInfo->getTopologyChangeEvent());
+}
+
void PlanExecutorPipeline::_updateResumableScanState(const boost::optional<Document>& document) {
switch (_resumableScanType) {
case ResumableScanType::kChangeStream:
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h
index 7fbb0d677b4..ed77e3cc8c1 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.h
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.h
@@ -157,6 +157,12 @@ private:
boost::optional<Document> _getNext();
/**
+ * Obtains the next result from the pipeline, gracefully handling any known exceptions which may
+ * be thrown.
+ */
+ boost::optional<Document> _tryGetNext();
+
+ /**
* For a change stream or resumable oplog scan, updates the scan state based on the latest
* document returned by the underlying pipeline.
*/
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 8714c6b1aad..a9686f15920 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -296,8 +296,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
// to the resume token of the invalidating event, and mark the cursor response as
// invalidated. We always expect to have ExtraInfo for this error code.
const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
- tassert(
- 5493706, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr);
+ tassert(5493706, "Missing ChangeStreamInvalidationInfo on exception", extraInfo);
responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken());
responseBuilder.setInvalidated();
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 9f2665d95ac..ff937c5837a 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -804,8 +804,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
// to the resume token of the invalidating event, and mark the cursor response as
// invalidated. We always expect to have ExtraInfo for this error code.
const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
- tassert(
- 5493707, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr);
+ tassert(5493707, "Missing ChangeStreamInvalidationInfo on exception", extraInfo);
postBatchResumeToken = extraInfo->getInvalidateResumeToken();
cursorState = ClusterCursorManager::CursorState::Exhausted;
diff --git a/src/mongo/shell/SConscript b/src/mongo/shell/SConscript
index 367e59e1e8a..0ba234a9947 100644
--- a/src/mongo/shell/SConscript
+++ b/src/mongo/shell/SConscript
@@ -237,7 +237,7 @@ if not has_option('noshell') and usemozjs:
"$BUILD_DIR/mongo/db/catalog/index_key_validate",
"$BUILD_DIR/mongo/db/logical_session_id_helpers",
"$BUILD_DIR/mongo/db/mongohasher",
- "$BUILD_DIR/mongo/db/pipeline/change_stream_invalidation_info",
+ "$BUILD_DIR/mongo/db/pipeline/change_stream_error_extra_info",
"$BUILD_DIR/mongo/db/query/command_request_response",
"$BUILD_DIR/mongo/db/query/query_request",
"$BUILD_DIR/mongo/db/server_options_core",