summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/metadata_notifications.js14
-rw-r--r--jstests/change_streams/whole_db_metadata_notifications.js18
-rw-r--r--jstests/libs/change_stream_util.js10
-rw-r--r--jstests/sharding/change_stream_metadata_notifications.js41
-rw-r--r--jstests/sharding/change_streams.js1
-rw-r--r--src/mongo/base/error_codes.yml4
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp13
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp15
-rw-r--r--src/mongo/db/pipeline/SConscript11
-rw-r--r--src/mongo/db/pipeline/change_stream_invalidation_info.cpp50
-rw-r--r--src/mongo/db/pipeline/change_stream_invalidation_info.h62
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h54
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp60
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.h2
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp13
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp7
-rw-r--r--src/mongo/db/query/cursor_response.cpp29
-rw-r--r--src/mongo/db/query/cursor_response.h13
-rw-r--r--src/mongo/db/query/cursor_response.idl3
-rw-r--r--src/mongo/s/query/async_results_merger.cpp28
-rw-r--r--src/mongo/s/query/async_results_merger.h9
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp13
-rw-r--r--src/mongo/s/query/cluster_find.cpp12
-rw-r--r--src/mongo/shell/SConscript1
26 files changed, 485 insertions, 36 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
index a6d33b0335d..f51dacc5b51 100644
--- a/jstests/change_streams/metadata_notifications.js
+++ b/jstests/change_streams/metadata_notifications.js
@@ -5,7 +5,7 @@
(function() {
"use strict";
-load("jstests/libs/change_stream_util.js");
+load("jstests/libs/change_stream_util.js"); // For isChangeStreamOptimizationEnabled.
load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/fixture_helpers.js"); // For isSharded.
@@ -126,6 +126,18 @@ assert.commandFailedWithCode(db.runCommand({
}),
ErrorCodes.InvalidResumeToken);
+// Test that if change stream optimization is enabled, then even after the 'invalidate' event has
+// been filtered out, the cursor should hold the resume token of the 'invalidate' event.
+if (isChangeStreamOptimizationEnabled(db)) {
+ const resumeStream =
+ coll.watch([{$match: {operationType: "DummyOperationType"}}], {resumeAfter: resumeToken});
+ assert.soon(() => {
+ assert(!resumeStream.hasNext());
+ return resumeStream.isExhausted();
+ });
+ assert.eq(resumeStream.getResumeToken(), resumeTokenInvalidate);
+}
+
// Test resuming the change stream from the collection drop using 'startAfter'.
assertResumeExpected({
coll: coll.getName(),
diff --git a/jstests/change_streams/whole_db_metadata_notifications.js b/jstests/change_streams/whole_db_metadata_notifications.js
index 45f751c9aa0..ff557cd0469 100644
--- a/jstests/change_streams/whole_db_metadata_notifications.js
+++ b/jstests/change_streams/whole_db_metadata_notifications.js
@@ -5,7 +5,8 @@
(function() {
"use strict";
-load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest,
+ // isChangeStreamOptimizationEnabled.
load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
@@ -198,7 +199,20 @@ assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});
// 'invalidate'.
assert.commandWorked(testDB.dropDatabase());
cst.assertDatabaseDrop({cursor: aggCursor, db: testDB});
-cst.assertNextChangesEqual({cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}]});
+const invalidateEvent = cst.assertNextChangesEqual(
+ {cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}]});
+
+// Test that if change stream optimization is enabled, then even after the 'invalidate' event has
+// been filtered out, the cursor should hold the resume token of the 'invalidate' event.
+if (isChangeStreamOptimizationEnabled(testDB)) {
+ const resumeStream =
+ testDB.watch([{$match: {operationType: "DummyOperationType"}}], {resumeAfter: change._id});
+ assert.soon(() => {
+ assert(!resumeStream.hasNext());
+ return resumeStream.isExhausted();
+ });
+ assert.eq(resumeStream.getResumeToken(), invalidateEvent[0]._id);
+}
cst.cleanUp();
}());
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 349165e9dc0..c7176c7f112 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -15,6 +15,15 @@ const ChangeStreamWatchMode = Object.freeze({
});
/**
+ * Returns true if feature flag 'featureFlagChangeStreamsOptimization' is enabled, false otherwise.
+ */
+function isChangeStreamOptimizationEnabled(db) {
+ const getParam = db.adminCommand({getParameter: 1, featureFlagChangeStreamsOptimization: 1});
+ return getParam.hasOwnProperty("featureFlagChangeStreamsOptimization") &&
+ getParam.featureFlagChangeStreamsOptimization.value;
+}
+
+/**
* Helper function used internally by ChangeStreamTest. If no passthrough is active, it is exactly
* the same as calling db.runCommand. If a passthrough is active and has defined a function
* 'changeStreamPassthroughAwareRunCommand', then this method will be overridden to allow individual
@@ -52,6 +61,7 @@ function assertInvalidateOp({cursor, opType}) {
assert.soon(() => cursor.hasNext());
const invalidate = cursor.next();
assert.eq(invalidate.operationType, "invalidate");
+ assert(!cursor.hasNext());
assert(cursor.isExhausted());
assert(cursor.isClosed());
return invalidate;
diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js
index d5fa409b97e..2af508b217c 100644
--- a/jstests/sharding/change_stream_metadata_notifications.js
+++ b/jstests/sharding/change_stream_metadata_notifications.js
@@ -6,7 +6,7 @@
// ]
(function() {
"use strict";
-
+load("jstests/libs/change_stream_util.js"); // For isChangeStreamOptimizationEnabled.
load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
@@ -20,6 +20,7 @@ const st = new ShardingTest({
const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];
+const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB);
assert.commandWorked(mongosDB.dropDatabase());
@@ -79,8 +80,25 @@ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "invalidate");
+
+// Store the collection drop invalidate token for for subsequent tests.
+const collectionDropinvalidateToken = next._id;
+
+assert(!changeStream.hasNext());
assert(changeStream.isExhausted());
+// If change stream optimization feature flag is enabled, verify that even after filtering out all
+// events, the cursor still returns the invalidate resume token of the dropped collection.
+if (isChangeStreamOptimized) {
+ const resumeStream = mongosColl.watch([{$match: {operationType: "DummyOperationType"}}],
+ {resumeAfter: resumeTokenFromFirstUpdate});
+ assert.soon(() => {
+ assert(!resumeStream.hasNext());
+ return resumeStream.isExhausted();
+ });
+ assert.eq(resumeStream.getResumeToken(), collectionDropinvalidateToken);
+}
+
// With an explicit collation, test that we can resume from before the collection drop.
changeStream =
mongosColl.watch([], {resumeAfter: resumeTokenFromFirstUpdate, collation: {locale: "simple"}});
@@ -103,6 +121,7 @@ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "invalidate");
+assert(!changeStream.hasNext());
assert(changeStream.isExhausted());
// Test that we can resume the change stream without specifying an explicit collation.
@@ -138,13 +157,33 @@ assert.commandWorked(mongosDB.dropDatabase());
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
+
+// Store the token to be used as 'resumeAfter' token by other change streams.
+const resumeTokenAfterDbDrop = next._id;
+
assert.eq(next.operationType, "drop");
assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "invalidate");
+assert(!changeStream.hasNext());
assert(changeStream.isExhausted());
+// Store the database drop invalidate token for other change streams.
+const dbDropInvalidateToken = next._id;
+
+// If change stream optimization feature flag is enabled, verify that even after filtering out all
+// events, the cursor still returns the invalidate resume token of the dropped database.
+if (isChangeStreamOptimized) {
+ const resumeStream = mongosColl.watch([{$match: {operationType: "DummyOperationType"}}],
+ {resumeAfter: resumeTokenAfterDbDrop});
+ assert.soon(() => {
+ assert(!resumeStream.hasNext());
+ return resumeStream.isExhausted();
+ });
+ assert.eq(resumeStream.getResumeToken(), dbDropInvalidateToken);
+}
+
st.stop();
})();
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index 9ea3357aecb..ad99d6caf95 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -190,6 +190,7 @@ function runTest(collName, shardKey) {
assert.eq(changeStream.next().operationType, "drop");
assert.soon(() => changeStream.hasNext());
assert.eq(changeStream.next().operationType, "invalidate");
+ assert(!changeStream.hasNext());
assert(changeStream.isExhausted());
jsTestLog('Testing aggregate command closes cursor for invalidate entries with shard key' +
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 948745052fd..9352da0f7a1 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -430,9 +430,11 @@ error_codes:
# case where it would be possible for MongoS to retry where MongoD couldn't.
- {code: 343, name: ShardCannotRefreshDueToLocksHeld,
extra: ShardCannotRefreshDueToLocksHeldInfo}
-
+
- {code: 344, name: AuditingNotEnabled}
- {code: 345, name: RuntimeAuditConfigurationNotEnabled}
+
+ - {code: 346,name: ChangeStreamInvalidated, extra: ChangeStreamInvalidationInfo}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index f3fc638cb63..5edf6fcc34a 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/find.h"
#include "mongo/db/query/find_common.h"
@@ -339,6 +340,18 @@ public:
// This exception indicates that we should close the cursor without reporting an
// error.
return false;
+ } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) {
+ // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT
+ // to the resume token of the invalidating event, and mark the cursor response as
+ // invalidated. We always expect to have ExtraInfo for this error code.
+ const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
+ tassert(5493700,
+ "Missing ChangeStreamInvalidationInfo on exception",
+ extraInfo != nullptr);
+
+ nextBatch->setPostBatchResumeToken(extraInfo->getInvalidateResumeToken());
+ nextBatch->setInvalidated();
+ return false;
} catch (DBException& exception) {
nextBatch->abandon();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 7ba72ab079d..25a87ca9d06 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
+#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_exchange.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
@@ -189,6 +190,20 @@ bool handleCursorCommand(OperationContext* opCtx,
cursor = nullptr;
exec = nullptr;
break;
+ } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) {
+ // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT
+ // to the resume token of the invalidating event, and mark the cursor response as
+ // invalidated. We expect ExtraInfo to always be present for this exception.
+ const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
+ tassert(
+ 5493701, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr);
+
+ responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken());
+ responseBuilder.setInvalidated();
+
+ cursor = nullptr;
+ exec = nullptr;
+ break;
} catch (DBException& exception) {
auto&& explainer = exec->getPlanExplainer();
auto&& [stats, _] =
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 5fde0b777e8..4a1bc56dc38 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -206,6 +206,16 @@ env.Library(
],
)
+env.Library(
+ target="change_stream_invalidation_info",
+ source=[
+ 'change_stream_invalidation_info.cpp',
+ ],
+ LIBDEPS=[
+ "$BUILD_DIR/mongo/base",
+ ],
+)
+
pipelineEnv = env.Clone()
pipelineEnv.InjectThirdParty(libraries=['snappy'])
pipelineEnv.Library(
@@ -290,6 +300,7 @@ pipelineEnv.Library(
'$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
'$BUILD_DIR/mongo/db/matcher/expressions',
+ '$BUILD_DIR/mongo/db/pipeline/change_stream_invalidation_info',
'$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source',
'$BUILD_DIR/mongo/db/query/collation/collator_factory_interface',
'$BUILD_DIR/mongo/db/query/collation/collator_interface',
diff --git a/src/mongo/db/pipeline/change_stream_invalidation_info.cpp b/src/mongo/db/pipeline/change_stream_invalidation_info.cpp
new file mode 100644
index 00000000000..7be3bf8e614
--- /dev/null
+++ b/src/mongo/db/pipeline/change_stream_invalidation_info.cpp
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#include "mongo/db/pipeline/change_stream_invalidation_info.h"
+#include "mongo/base/init.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+namespace {
+
+MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(ChangeStreamInvalidationInfo);
+
+} // namespace
+
+std::shared_ptr<const ErrorExtraInfo> ChangeStreamInvalidationInfo::parse(const BSONObj& obj) {
+ return std::make_shared<ChangeStreamInvalidationInfo>(obj["invalidateToken"].Obj());
+}
+
+void ChangeStreamInvalidationInfo::serialize(BSONObjBuilder* bob) const {
+ bob->append("invalidateToken", _invalidateToken);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_invalidation_info.h b/src/mongo/db/pipeline/change_stream_invalidation_info.h
new file mode 100644
index 00000000000..5f962009308
--- /dev/null
+++ b/src/mongo/db/pipeline/change_stream_invalidation_info.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/error_extra_info.h"
+#include "mongo/bson/bsonobj.h"
+
+namespace mongo {
+
+class BSONObjBuilder;
+
+/**
+ * Contains information to augment the 'ChangeStreamInvalidated' error code. In particular, this
+ * class holds the resume token of the "invalidate" event which gave rise to the exception.
+ */
+class ChangeStreamInvalidationInfo final : public ErrorExtraInfo {
+public:
+ static constexpr auto code = ErrorCodes::ChangeStreamInvalidated;
+
+ static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj& obj);
+
+ explicit ChangeStreamInvalidationInfo(BSONObj invalidateToken)
+ : _invalidateToken{invalidateToken.getOwned()} {}
+
+ BSONObj getInvalidateResumeToken() const {
+ return _invalidateToken;
+ }
+
+ void serialize(BSONObjBuilder* bob) const final;
+
+private:
+ BSONObj _invalidateToken;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index f3db4b1de3d..64a0c7c6cbd 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/resume_token.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_gen.h"
@@ -490,10 +491,21 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
auto stages = buildPipeline(expCtx, spec, elem);
+ const bool csOptFeatureFlag =
+ feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV();
+
+ if (expCtx->inMongos && csOptFeatureFlag) {
+ // TODO SERVER-55491: replace with DocumentSourceUpdateOnAddShard.
+ stages.push_back(DocumentSourceChangeStreamPipelineSplitter::create(expCtx));
+ }
+
if (!expCtx->needsMerge) {
- // There should only be one close cursor stage. If we're on the shards and producing input
- // to be merged, do not add a close cursor stage, since the mongos will already have one.
- stages.push_back(DocumentSourceCloseCursor::create(expCtx));
+ if (!csOptFeatureFlag) {
+ // There should only be one close cursor stage. If we're on the shards and producing
+ // input to be merged, do not add a close cursor stage, since the mongos will already
+ // have one.
+ stages.push_back(DocumentSourceCloseCursor::create(expCtx));
+ }
// We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here
// so that any $match stages which follow the $changeStream pipeline prefix may be able to
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index daa7cbb597a..eb40d2908ab 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -31,6 +31,7 @@
#include <type_traits>
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/pipeline/document_source_match.h"
@@ -268,4 +269,57 @@ private:
using DocumentSourceMatch::DocumentSourceMatch;
};
+/**
+ * A DocumentSource that if part of the pipeline, directly passes on the received documents to the
+ * next stages without interpreting it and marks where a sharded change streams pipeline should be
+ * split. This stage should only ever be created by a mongoS.
+ *
+ * TODO SERVER-55491: replace this class with DocumentSourceUpdateOnAddShard.
+ */
+class DocumentSourceChangeStreamPipelineSplitter final : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$_internalChangeStreamPipelineSplitter"_sd;
+
+ static boost::intrusive_ptr<DocumentSourceChangeStreamPipelineSplitter> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceChangeStreamPipelineSplitter(expCtx);
+ }
+
+ const char* getSourceName() const final {
+ return DocumentSourceChangeStreamPipelineSplitter::kStageName.rawData();
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kMongoS,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kNotAllowed,
+ UnionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
+ return (explain ? Value(Document{{kStageName, Document{}}}) : Value());
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
+ return DistributedPlanLogic{nullptr, nullptr, change_stream_constants::kSortSpec};
+ }
+
+private:
+ DocumentSourceChangeStreamPipelineSplitter(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(kStageName, expCtx) {
+ invariant(expCtx->inMongos);
+ }
+
+ GetNextResult doGetNext() final {
+ // Pass on the document to the next stage without interpreting.
+ return pSource->getNext();
+ }
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 3d645027f0e..df87736286c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/transaction_history_iterator.h"
@@ -206,12 +207,12 @@ public:
const std::vector<repl::OplogEntry> transactionEntries = {},
std::vector<Document> documentsForLookup = {}) {
vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.getEntry().toBSON(), spec);
- auto closeCursor = stages.back();
+ auto lastStage = stages.back();
getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
docKeyFields, transactionEntries, std::move(documentsForLookup));
- auto next = closeCursor->getNext();
+ auto next = lastStage->getNext();
// Match stage should pass the doc down if expectedDoc is given.
ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc));
if (expectedDoc) {
@@ -219,11 +220,17 @@ public:
}
if (expectedInvalidate) {
- next = closeCursor->getNext();
+ next = lastStage->getNext();
ASSERT_TRUE(next.isAdvanced());
ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedInvalidate);
+
// Then throw an exception on the next call of getNext().
- ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
+ if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
+ } else {
+ ASSERT_THROWS(lastStage->getNext(),
+ ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
+ }
}
}
@@ -259,16 +266,20 @@ public:
auto mock = DocumentSourceMock::createForTest(D(entry), getExpCtx());
stages.insert(stages.begin(), mock);
+ // Remove the DSEnsureResumeTokenPresent stage since it will swallow the result.
+ auto newEnd = std::remove_if(stages.begin(), stages.end(), [](auto& stage) {
+ return dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage.get());
+ });
+ stages.erase(newEnd, stages.end());
+
// Wire up the stages by setting the source stage.
- auto prevStage = stages[0].get();
+ auto prevIt = stages.begin();
for (auto stageIt = stages.begin() + 1; stageIt != stages.end(); stageIt++) {
auto stage = (*stageIt).get();
- // Do not include the check resume token stage since it will swallow the result.
- if (dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage))
- continue;
- stage->setSource(prevStage);
- prevStage = stage;
+ stage->setSource((*prevIt).get());
+ prevIt = stageIt;
}
+
return stages;
}
@@ -1964,7 +1975,11 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage
auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj());
auto result = DSChangeStream::createFromBson(originalSpec.firstElement(), expCtx);
vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
- ASSERT_EQ(allStages.size(), 5UL);
+
+ const size_t changeStreamStageSize =
+ (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 4 : 5);
+ ASSERT_EQ(allStages.size(), changeStreamStageSize);
+
auto stage = allStages[2];
ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get()));
@@ -1995,7 +2010,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage
TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid());
auto stages = makeStages(dropColl);
- auto closeCursor = stages.back();
+ auto lastStage = stages.back();
Document expectedDrop{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
@@ -2011,26 +2026,35 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
- auto next = closeCursor->getNext();
+ auto next = lastStage->getNext();
// Transform into drop entry.
ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedDrop);
- next = closeCursor->getNext();
+ next = lastStage->getNext();
// Transform into invalidate entry.
ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedInvalidate);
+
// Then throw an exception on the next call of getNext().
- ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
+ if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
+ } else {
+ ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
+ }
}
TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) {
OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid());
auto stages = makeStages(dropColl);
- auto closeCursor = stages.back();
+ auto lastStage = stages.back();
// Add a match stage after change stream to filter out the invalidate entries.
auto match = DocumentSourceMatch::create(fromjson("{operationType: 'insert'}"), getExpCtx());
- match->setSource(closeCursor.get());
+ match->setSource(lastStage.get());
// Throw an exception on the call of getNext().
- ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
+ if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
+ } else {
+ ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
+ }
}
TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
index 02dd0cd6a61..ed98653b7f2 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_check_invalidate.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
@@ -58,12 +60,21 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt
} // namespace
DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() {
+ // To declare a change stream as invalidated, this stage first emits an invalidate event and
+ // then throws a 'ChangeStreamInvalidated' exception on the next call to this method.
+
if (_queuedInvalidate) {
const auto res = DocumentSource::GetNextResult(std::move(_queuedInvalidate.get()));
_queuedInvalidate.reset();
return res;
}
+ if (_queuedException &&
+ feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ uasserted(static_cast<ChangeStreamInvalidationInfo>(*_queuedException),
+ "Change stream invalidated");
+ }
+
auto nextInput = pSource->getNext();
if (!nextInput.isAdvanced())
return nextInput;
@@ -108,6 +119,15 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() {
result.metadata().setSortKey(Value{resumeTokenDoc}, isSingleElementKey);
_queuedInvalidate = result.freeze();
+
+ // By this point, either the '_startAfterInvalidate' is absent or it is present and the
+ // current event matches the resume token. In the latter case, we do not want to close the
+ // change stream and should not throw an exception. Therefore, we only queue up an exception
+ // if '_startAfterInvalidate' is absent.
+ if (!_startAfterInvalidate) {
+ _queuedException = ChangeStreamInvalidationInfo(
+ _queuedInvalidate->metadata().getSortKey().getDocument().toBson());
+ }
}
// Regardless of whether the first document we see is an invalidating command, we only skip the
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h
index a60fb991ead..7326461b1fa 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.h
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/pipeline/document_source.h"
namespace mongo {
@@ -90,6 +91,7 @@ private:
boost::optional<ResumeTokenData> _startAfterInvalidate;
boost::optional<Document> _queuedInvalidate;
+ boost::optional<ChangeStreamInvalidationInfo> _queuedException;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 4f5553022ee..2f6e05a0fc8 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/pipeline/semantic_analysis.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/dbtests/dbtests.h"
@@ -74,6 +75,10 @@ using std::vector;
const NamespaceString kTestNss = NamespaceString("a.collection");
+size_t getChangeStreamStageSize() {
+ return (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 5 : 6);
+}
+
void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) {
repl::ReplicationCoordinator::set(
opCtx->getServiceContext(),
@@ -2054,7 +2059,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
auto spec = BSON("$changeStream" << BSON("fullDocument"
<< "updateLookup"));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
- ASSERT_EQ(stages.size(), 6UL);
+ ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the change lookup is at the end.
ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
@@ -2080,7 +2085,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage
auto spec = BSON("$changeStream" << BSON("fullDocument"
<< "updateLookup"));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
- ASSERT_EQ(stages.size(), 6UL);
+ ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the change lookup is at the end.
ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
@@ -2105,7 +2110,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen
auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
- ASSERT_EQ(stages.size(), 6UL);
+ ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the pre-image lookup is at the end.
ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get()));
@@ -2131,7 +2136,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPre
auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
- ASSERT_EQ(stages.size(), 6UL);
+ ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the pre-image lookup is at the end.
ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get()));
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 678d5b24caf..9fe9104da25 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -273,8 +273,11 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi
continue;
}
- // A source may not simultaneously be present on both sides of the split.
- invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage);
+ // TODO SERVER-55491: remove this 'if' to make the invariant unconditional.
+ if (distributedPlanLogic->shardsStage && distributedPlanLogic->mergingStage) {
+ // A source may not simultaneously be present on both sides of the split.
+ invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage);
+ }
if (distributedPlanLogic->shardsStage)
shardPipe->push_back(std::move(distributedPlanLogic->shardsStage));
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index fe5b3bc0175..06c62c3091f 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -51,6 +51,7 @@ const char kBatchDocSequenceField[] = "cursor.nextBatch";
const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch";
const char kPostBatchResumeTokenField[] = "postBatchResumeToken";
const char kPartialResultsReturnedField[] = "partialResultsReturned";
+const char kInvalidatedField[] = "invalidated";
} // namespace
@@ -73,6 +74,11 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace)
if (_partialResultsReturned) {
_cursorObject->append(kPartialResultsReturnedField, true);
}
+
+ if (_invalidated) {
+ _cursorObject->append(kInvalidatedField, _invalidated);
+ }
+
_cursorObject->append(kIdField, cursorId);
_cursorObject->append(kNsField, cursorNamespace);
if (_options.atClusterTime) {
@@ -123,7 +129,8 @@ CursorResponse::CursorResponse(NamespaceString nss,
boost::optional<long long> numReturnedSoFar,
boost::optional<BSONObj> postBatchResumeToken,
boost::optional<BSONObj> writeConcernError,
- bool partialResultsReturned)
+ bool partialResultsReturned,
+ bool invalidated)
: _nss(std::move(nss)),
_cursorId(cursorId),
_batch(std::move(batch)),
@@ -131,7 +138,8 @@ CursorResponse::CursorResponse(NamespaceString nss,
_numReturnedSoFar(numReturnedSoFar),
_postBatchResumeToken(std::move(postBatchResumeToken)),
_writeConcernError(std::move(writeConcernError)),
- _partialResultsReturned(partialResultsReturned) {}
+ _partialResultsReturned(partialResultsReturned),
+ _invalidated(invalidated) {}
std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany(
const BSONObj& cmdResponse) {
@@ -246,6 +254,16 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
}
}
+ auto invalidatedElem = cursorObj[kInvalidatedField];
+ if (invalidatedElem) {
+ if (invalidatedElem.type() != BSONType::Bool) {
+ return {ErrorCodes::BadValue,
+ str::stream() << kInvalidatedField
+ << " format is invalid; expected Bool, but found: "
+ << invalidatedElem.type()};
+ }
+ }
+
auto writeConcernError = cmdResponse["writeConcernError"];
if (writeConcernError && writeConcernError.type() != BSONType::Object) {
@@ -262,7 +280,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned()
: boost::optional<BSONObj>{},
writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{},
- partialResultsReturned.trueValue()}};
+ partialResultsReturned.trueValue(),
+ invalidatedElem.trueValue()}};
}
void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
@@ -292,6 +311,10 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
cursorBuilder.append(kPartialResultsReturnedField, true);
}
+ if (_invalidated) {
+ cursorBuilder.append(kInvalidatedField, _invalidated);
+ }
+
cursorBuilder.doneFast();
builder->append("ok", 1.0);
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 6c7081fa1ec..0bd028825ed 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -95,6 +95,10 @@ public:
_partialResultsReturned = partialResults;
}
+ void setInvalidated() {
+ _invalidated = true;
+ }
+
long long numDocs() const {
return _numDocs;
}
@@ -124,6 +128,7 @@ private:
long long _numDocs = 0;
BSONObj _postBatchResumeToken;
bool _partialResultsReturned = false;
+ bool _invalidated = false;
};
/**
@@ -194,7 +199,8 @@ public:
boost::optional<long long> numReturnedSoFar = boost::none,
boost::optional<BSONObj> postBatchResumeToken = boost::none,
boost::optional<BSONObj> writeConcernError = boost::none,
- bool partialResultsReturned = false);
+ bool partialResultsReturned = false,
+ bool invalidated = false);
CursorResponse(CursorResponse&& other) = default;
CursorResponse& operator=(CursorResponse&& other) = default;
@@ -239,6 +245,10 @@ public:
return _partialResultsReturned;
}
+ bool getInvalidated() const {
+ return _invalidated;
+ }
+
/**
* Converts this response to its raw BSON representation.
*/
@@ -257,6 +267,7 @@ private:
boost::optional<BSONObj> _postBatchResumeToken;
boost::optional<BSONObj> _writeConcernError;
bool _partialResultsReturned = false;
+ bool _invalidated = false;
};
} // namespace mongo
diff --git a/src/mongo/db/query/cursor_response.idl b/src/mongo/db/query/cursor_response.idl
index 0679cb6ce5b..66c118ff5f6 100644
--- a/src/mongo/db/query/cursor_response.idl
+++ b/src/mongo/db/query/cursor_response.idl
@@ -59,6 +59,9 @@ structs:
partialResultsReturned:
description: "Boolean represents whether partial results are being returned."
type: optionalBool
+ invalidated:
+ description: "Boolean represents if the cursor has been invalidated."
+ type: optionalBool
InitialResponseCursor:
description: "A struct representing an initial response cursor."
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 71fd81ef848..0199878764a 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -36,9 +36,11 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/pipeline/change_stream_constants.h"
+#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/kill_cursors_gen.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
#include "mongo/s/catalog/type_shard.h"
@@ -122,6 +124,10 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
// A remote cannot be flagged as 'partialResultsReturned' if 'allowPartialResults' is false.
invariant(!(_remotes.back().partialResultsReturned && !_params.getAllowPartialResults()));
+ // For the first batch, cursor should never be invalidated.
+ tassert(
+ 5493704, "Found invalidated cursor on the first batch", !_remotes.back().invalidated);
+
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
_addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
@@ -532,6 +538,11 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
"nextEvent() called before an outstanding event was signaled");
}
+ // Check if the cursor should be invalidated.
+ if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ _assertNotInvalidated(lk);
+ }
+
auto getMoresStatus = _scheduleGetMores(lk);
if (!getMoresStatus.isOK()) {
return getMoresStatus;
@@ -552,6 +563,15 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
return eventToReturn;
}
+void AsyncResultsMerger::_assertNotInvalidated(WithLock lk) {
+ if (auto minPromisedSortKey = _getMinPromisedSortKey(lk)) {
+ const auto& minRemote = _remotes[minPromisedSortKey->second];
+ uassert(ChangeStreamInvalidationInfo{minPromisedSortKey->first.firstElement().Obj()},
+ "Change stream invalidated",
+ !(minRemote.invalidated && !_ready(lk)));
+ }
+}
+
StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse(
const BSONObj& responseObj, const RemoteCursorData& remote) {
@@ -579,6 +599,14 @@ void AsyncResultsMerger::_updateRemoteMetadata(WithLock lk,
// Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard.
auto& remote = _remotes[remoteIndex];
remote.cursorId = response.getCursorId();
+
+ // If the response indicates that the cursor has been invalidated, mark the corresponding
+ // remote as invalidated. This also signifies that the shard cursor has been closed.
+ remote.invalidated = response.getInvalidated();
+ tassert(5493705,
+ "Unexpectedly encountered invalidated cursor with non-zero ID",
+ !(remote.invalidated && remote.cursorId > 0));
+
if (response.getPostBatchResumeToken()) {
// We only expect to see this for change streams.
invariant(_params.getSort());
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 11fc6eb2569..3fde29e141e 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -316,6 +316,9 @@ private:
// Count of fetched docs during ARM processing of the current batch. Used to reduce the
// batchSize in getMore when mongod returned less docs than the requested batchSize.
long long fetchedCount = 0;
+
+ // If set to 'true', the cursor on this shard has been invalidated.
+ bool invalidated = false;
};
class MergingComparator {
@@ -439,6 +442,12 @@ private:
bool _haveOutstandingBatchRequests(WithLock);
/**
+ * Called internally when attempting to get a new event for the caller to wait on. Throws if
+ * the shard cursor from which the next result is due has already been invalidated.
+ */
+ void _assertNotInvalidated(WithLock);
+
+ /**
* If a promisedMinSortKey has been received from all remotes, returns the lowest such key.
* Otherwise, returns boost::none.
*/
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 90e2b822178..8827907d5a2 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -35,6 +35,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connpool.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
@@ -290,6 +291,18 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
// error.
cursorState = ClusterCursorManager::CursorState::Exhausted;
break;
+ } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) {
+ // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT
+ // to the resume token of the invalidating event, and mark the cursor response as
+ // invalidated. We always expect to have ExtraInfo for this error code.
+ const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
+ tassert(
+ 5493706, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr);
+
+ responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken());
+ responseBuilder.setInvalidated();
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ break;
}
// Check whether we have exhausted the pipeline's results.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index f8e9c420f37..9fd94d4ec96 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/curop.h"
#include "mongo/db/curop_failpoint_helpers.h"
+#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/getmore_request.h"
@@ -790,6 +791,17 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
// error.
cursorState = ClusterCursorManager::CursorState::Exhausted;
break;
+ } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) {
+ // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT
+ // to the resume token of the invalidating event, and mark the cursor response as
+ // invalidated. We always expect to have ExtraInfo for this error code.
+ const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
+ tassert(
+ 5493707, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr);
+
+ postBatchResumeToken = extraInfo->getInvalidateResumeToken();
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ break;
}
if (!next.isOK()) {
diff --git a/src/mongo/shell/SConscript b/src/mongo/shell/SConscript
index 8a201b1a59e..367e59e1e8a 100644
--- a/src/mongo/shell/SConscript
+++ b/src/mongo/shell/SConscript
@@ -237,6 +237,7 @@ if not has_option('noshell') and usemozjs:
"$BUILD_DIR/mongo/db/catalog/index_key_validate",
"$BUILD_DIR/mongo/db/logical_session_id_helpers",
"$BUILD_DIR/mongo/db/mongohasher",
+ "$BUILD_DIR/mongo/db/pipeline/change_stream_invalidation_info",
"$BUILD_DIR/mongo/db/query/command_request_response",
"$BUILD_DIR/mongo/db/query/query_request",
"$BUILD_DIR/mongo/db/server_options_core",