diff options
12 files changed, 343 insertions, 140 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js index ef17b2b4249..a0f074f1357 100644 --- a/jstests/change_streams/metadata_notifications.js +++ b/jstests/change_streams/metadata_notifications.js @@ -79,30 +79,21 @@ coll = assertCreateCollection(db, collName); assert.writeOK(coll.insert({_id: 0})); - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to - // be sure that it doesn't crash the server, but the ability to resume a change stream after an - // invalidate is a bug, not a feature. - // Test resuming the change stream from the collection drop. - assert.doesNotThrow(function() { - const resumeTokenDrop = changes[2]._id; - const resumeCursor = - coll.watch([], {resumeAfter: resumeTokenDrop, collation: {locale: "simple"}}); - assert.soon(() => resumeCursor.hasNext()); - // Not checking the contents of the document returned, because we do not technically - // support this behavior. - resumeCursor.next(); + cursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: changes[2]._id}}], + collection: collName, + aggregateOptions: {cursor: {batchSize: 0}, collation: {locale: "simple"}}, }); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]}); + // Test resuming the change stream from the invalidate after the drop. - assert.doesNotThrow(function() { - const resumeTokenInvalidate = changes[3]._id; - const resumeCursor = - coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}}); - assert.soon(() => resumeCursor.hasNext()); - // Not checking the contents of the document returned, because we do not technically - // support this behavior. - resumeCursor.next(); + cursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: changes[3]._id}}], + collection: collName, + aggregateOptions: {cursor: {batchSize: 0}, collation: {locale: "simple"}}, }); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]}); // Test that renaming a collection being watched generates a "rename" entry followed by an // "invalidate". This is true if the change stream is on the source or target collection of the diff --git a/jstests/change_streams/whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js index 25f03943112..f8f8d3fb64a 100644 --- a/jstests/change_streams/whole_db_resumability.js +++ b/jstests/change_streams/whole_db_resumability.js @@ -97,30 +97,22 @@ assert.writeOK(coll.insert({_id: 0})); // Test resuming from the 'dropDatabase' entry. - // TODO SERVER-34789: Resuming from the 'dropDatabase' should return a single invalidate - // notification. resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: dropDbChanges[1]._id}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); - let change = cst.getOneChange(resumeCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.fullDocument, {_id: 0}, tojson(change)); - assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}, tojson(change)); + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); // Test resuming from the 'invalidate' entry. - // TODO SERVER-34789: Resuming from an invalidate should error or return an invalidate - // notification. resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); - change = cst.getOneChange(resumeCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.fullDocument, {_id: 0}, tojson(change)); - assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}, tojson(change)); + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); cst.cleanUp(); })(); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 71a56f3ffe8..8c779cbf0f5 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -193,6 +193,13 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { "Expected change's size must match expected number of changes"); } + // Convert 'expectedChanges' to an array, even if it contains just a single element. + if (expectedChanges !== undefined && !(expectedChanges instanceof Array)) { + let arrayVersion = new Array; + arrayVersion.push(expectedChanges); + expectedChanges = arrayVersion; + } + // Set the expected number of changes based on the size of the expected change list. if (expectedNumChanges === undefined) { assert.neq(expectedChanges, undefined); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 3d73e724af4..2508af7eda6 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -272,6 +272,7 @@ pipelineeEnv.Library( 'document_source_change_stream.cpp', 'document_source_change_stream_close_cursor.cpp', 'document_source_change_stream_transform.cpp', + 'document_source_check_invalidate.cpp', 'document_source_check_resume_token.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 9edf48c4e4d..9dee6a5db97 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -38,6 +38,7 @@ #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" +#include "mongo/db/pipeline/document_source_check_invalidate.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" @@ -318,25 +319,25 @@ void assertResumeAllowed(const intrusive_ptr<ExpressionContext>& expCtx, expCtx->uuid && tokenData.uuid && expCtx->uuid.get() == tokenData.uuid.get()); } -/** - * Parses the resume options in 'spec', optionally populating the resume stage and cluster time to - * start from. Throws an AssertionException if not running on a replica set or multiple resume - * options are specified. - */ -void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceChangeStreamSpec& spec, - ServerGlobalParams::FeatureCompatibility::Version fcv, - intrusive_ptr<DocumentSource>* resumeStageOut, - boost::optional<Timestamp>* startFromOut) { - if (!expCtx->inMongos) { - auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); - uassert(40573, - "The $changeStream stage is only supported on replica sets", - replCoord && - replCoord->getReplicationMode() == - repl::ReplicationCoordinator::Mode::modeReplSet); - *startFromOut = replCoord->getMyLastAppliedOpTime().getTimestamp(); - } +intrusive_ptr<DocumentSource> createTransformationStage( + const intrusive_ptr<ExpressionContext>& expCtx, + BSONObj changeStreamSpec, + ServerGlobalParams::FeatureCompatibility::Version fcv) { + // Mark the transformation stage as independent of any collection if the change stream is + // watching all collections in the database. + const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS(); + return intrusive_ptr<DocumentSource>(new DocumentSourceChangeStreamTransform( + expCtx, changeStreamSpec, fcv, isIndependentOfAnyCollection)); +} + +list<intrusive_ptr<DocumentSource>> buildPipeline( + const intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec spec, + ServerGlobalParams::FeatureCompatibility::Version fcv, + BSONElement elem) { + list<intrusive_ptr<DocumentSource>> stages; + boost::optional<Timestamp> startFrom; + intrusive_ptr<DocumentSource> resumeStage = nullptr; if (auto resumeAfter = spec.getResumeAfter()) { ResumeToken token = resumeAfter.get(); @@ -346,13 +347,12 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, // token UUID, and collation. assertResumeAllowed(expCtx, tokenData); - *startFromOut = tokenData.clusterTime; + startFrom = tokenData.clusterTime; if (expCtx->needsMerge) { - *resumeStageOut = + resumeStage = DocumentSourceShardCheckResumability::create(expCtx, tokenData.clusterTime); } else { - *resumeStageOut = - DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token)); + resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token)); } } @@ -361,14 +361,14 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, uassert(40674, "Only one type of resume option is allowed, but multiple were found.", - !(*resumeStageOut) || (!resumeAfterClusterTime && !startAtOperationTime)); + !resumeStage || (!resumeAfterClusterTime && !startAtOperationTime)); if (resumeAfterClusterTime) { if (fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) { warning() << "The '$_resumeAfterClusterTime' option is deprecated, please use " "'startAtOperationTime' instead."; } - *startFromOut = resumeAfterClusterTime->getTimestamp(); + startFrom = resumeAfterClusterTime->getTimestamp(); } // New field name starting in 4.0 is 'startAtOperationTime'. @@ -381,9 +381,41 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, << DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName << " in a $changeStream stage.", !resumeAfterClusterTime); - *startFromOut = *startAtOperationTime; - *resumeStageOut = DocumentSourceShardCheckResumability::create(expCtx, **startFromOut); + startFrom = *startAtOperationTime; + resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom); + } + + // There might not be a starting point if we're on mongos, otherwise we should either have a + // 'resumeAfter' starting point, or should start from the latest majority committed operation. + auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); + uassert(40573, + "The $changeStream stage is only supported on replica sets", + expCtx->inMongos || (replCoord && + replCoord->getReplicationMode() == + repl::ReplicationCoordinator::Mode::modeReplSet)); + if (!startFrom && !expCtx->inMongos) { + startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp(); } + + if (startFrom) { + const bool startFromInclusive = (resumeStage != nullptr); + stages.push_back(DocumentSourceOplogMatch::create( + DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, startFromInclusive), + expCtx)); + } + + stages.push_back(createTransformationStage(expCtx, elem.embeddedObject(), fcv)); + stages.push_back(DocumentSourceCheckInvalidate::create(expCtx)); + + // Resume stage must come after the check invalidate stage to ensure that resuming from an + // invalidate or an invalidating command will not ignore the invalidation. Putting the check + // invalidate stage first will see the resume token before it is ignored, thereby remembering + // that the stream cannot continue. + if (resumeStage) { + stages.push_back(resumeStage); + } + + return stages; } } // namespace @@ -404,10 +436,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // Make sure that it is legal to run this $changeStream before proceeding. DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec, fcv); - boost::optional<Timestamp> startFrom; - intrusive_ptr<DocumentSource> resumeStage = nullptr; - parseResumeOptions(expCtx, spec, fcv, &resumeStage, &startFrom); - auto fullDocOption = spec.getFullDocument(); uassert(40575, str::stream() << "unrecognized value for the 'fullDocument' option to the " @@ -419,21 +447,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( const bool shouldLookupPostImage = (fullDocOption == "updateLookup"_sd); - list<intrusive_ptr<DocumentSource>> stages; - - // There might not be a starting point if we're on mongos, otherwise we should either have a - // 'resumeAfter' starting point, or should start from the latest majority committed operation. - invariant(expCtx->inMongos || static_cast<bool>(startFrom)); - if (startFrom) { - const bool startFromInclusive = (resumeStage != nullptr); - stages.push_back(DocumentSourceOplogMatch::create( - buildMatchFilter(expCtx, *startFrom, startFromInclusive), expCtx)); - } - - stages.push_back(createTransformationStage(expCtx, elem.embeddedObject(), fcv)); - if (resumeStage) { - stages.push_back(resumeStage); - } + auto stages = buildPipeline(expCtx, spec, fcv, elem); if (!expCtx->needsMerge) { // There should only be one close cursor stage. If we're on the shards and producing input // to be merged, do not add a close cursor stage, since the mongos will already have one. @@ -507,14 +521,4 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( !expCtx->ns.isSystem()); } -intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage( - const intrusive_ptr<ExpressionContext>& expCtx, - BSONObj changeStreamSpec, - ServerGlobalParams::FeatureCompatibility::Version fcv) { - // Mark the transformation stage as independent of any collection if the change stream is - // watching all collections in the database. - const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS(); - return intrusive_ptr<DocumentSource>(new DocumentSourceChangeStreamTransform( - expCtx, changeStreamSpec, fcv, isIndependentOfAnyCollection)); -} } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 0d6e84c509a..16175fdcee2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -175,11 +175,6 @@ public: static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - static boost::intrusive_ptr<DocumentSource> createTransformationStage( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - BSONObj changeStreamSpec, - ServerGlobalParams::FeatureCompatibility::Version fcv); - /** * Given a BSON object containing an aggregation command with a $changeStream stage, and a * resume token, returns a new BSON object with the same command except with the addition of a diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp index 207971dbb76..a0d87d1baa9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp @@ -59,11 +59,6 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated"); } - if (_queuedInvalidate) { - _shouldCloseCursor = true; - return DocumentSource::GetNextResult(std::move(_queuedInvalidate.get())); - } - auto nextInput = pSource->getNext(); if (!nextInput.isAdvanced()) return nextInput; @@ -80,15 +75,6 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { _shouldCloseCursor = true; } - // Check if this is an invalidating command and the next entry should be an "invalidate". - if (isInvalidatingCommand(pExpCtx, operationType)) { - _queuedInvalidate = Document{ - {DocumentSourceChangeStream::kIdField, doc[DocumentSourceChangeStream::kIdField]}, - {DocumentSourceChangeStream::kClusterTimeField, - doc[DocumentSourceChangeStream::kClusterTimeField]}, - {DocumentSourceChangeStream::kOperationTypeField, "invalidate"_sd}}; - } - return nextInput; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index bc60687deae..065b15e7ccc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -101,7 +101,6 @@ private: : DocumentSource(expCtx) {} bool _shouldCloseCursor = false; - boost::optional<Document> _queuedInvalidate; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 49738f02cfe..b8734cc8d26 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" +#include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -155,21 +156,29 @@ public: auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get()); ASSERT(match); auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx()); - - auto mock = DocumentSourceMock::create(D(entry)); - executableMatch->setSource(mock.get()); + // Replace the original match with the executable one. + stages[0] = executableMatch; // Check the oplog entry is transformed correctly. auto transform = stages[1].get(); ASSERT(transform); ASSERT_EQ(string(transform->getSourceName()), DSChangeStream::kStageName); - transform->setSource(executableMatch.get()); - auto closeCursor = stages.back().get(); - ASSERT(closeCursor); - closeCursor->setSource(transform); - - return {mock, executableMatch, transform, closeCursor}; + // Create mock stage and insert at the front of the stages. + auto mock = DocumentSourceMock::create(D(entry)); + stages.insert(stages.begin(), mock); + + // Wire up the stages by setting the source stage. + auto prevStage = stages[0].get(); + for (auto stageIt = stages.begin() + 1; stageIt != stages.end(); stageIt++) { + auto stage = (*stageIt).get(); + // Do not include the check resume token stage since it will swallow the result. + if (dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage)) + continue; + stage->setSource(prevStage); + prevStage = stage; + } + return stages; } vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { @@ -395,21 +404,6 @@ TEST_F(ChangeStreamStageTestNoSetup, FailsWithNoReplicationCoordinator) { 40573); } -TEST_F(ChangeStreamStageTest, StagesGeneratedCorrectly) { - const auto spec = fromjson("{$changeStream: {}}"); - - list<intrusive_ptr<DocumentSource>> result = - DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); - vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); - ASSERT_EQUALS(stages.size(), 3UL); - ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(stages.front().get())); - ASSERT_EQUALS(string(stages[0]->getSourceName()), DSChangeStream::kStageName); - ASSERT_EQUALS(string(stages[1]->getSourceName()), DSChangeStream::kStageName); - ASSERT_EQUALS(string(stages[2]->getSourceName()), DSChangeStream::kStageName); - - // TODO: Check explain result. -} - TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type nss, // namespace @@ -637,8 +631,8 @@ TEST_F(ChangeStreamStageTest, TransformDrop) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(dropColl, expectedDrop, {}, kDefaultSpec, expectedInvalidate); @@ -659,8 +653,8 @@ TEST_F(ChangeStreamStageTest, TransformRename) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate); @@ -699,8 +693,8 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate); @@ -987,7 +981,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj()); auto result = DSChangeStream::createFromBson(originalSpec.firstElement(), expCtx); vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); - ASSERT_EQ(allStages.size(), 3UL); + ASSERT_EQ(allStages.size(), 4UL); auto stage = allStages[1]; ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get())); @@ -1028,8 +1022,8 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; auto next = closeCursor->getNext(); @@ -1180,6 +1174,34 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif checkTransformation(rename, expectedRename); } +TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromDropShouldReturnInvalidate) { + auto expCtx = getExpCtx(); + + // Need to put the collection in the UUID catalog so the resume token is valid. + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid()); + + OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid()); + const auto resumeTokenDrop = makeResumeToken(kDefaultTs, testUuid()); + + Document expectedDrop{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + }; + Document expectedInvalidate{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + }; + checkTransformation(dropColl, + expectedDrop, + {{"_id"}}, // Mock the 'collectDocumentKeyFields' response. + BSON("$changeStream" << BSON("resumeAfter" << resumeTokenDrop)), + expectedInvalidate); +} + // // Test class for change stream of a single database. // @@ -1422,8 +1444,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(dropDB, expectedDropDatabase, {}, kDefaultSpec, expectedInvalidate); @@ -1605,5 +1627,34 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); } +TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabaseShouldReturnInvalidate) { + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry. + auto resumeToken = makeResumeToken(kDefaultTs); + OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false); + + Document expectedDropDatabase{ + {DSChangeStream::kIdField, resumeToken}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}}, + }; + Document expectedInvalidate{ + {DSChangeStream::kIdField, resumeToken}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + }; + + checkTransformation(dropDB, + expectedDropDatabase, + {{"_id"}}, // Mock the 'collectDocumentKeyFields' response. + BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + expectedInvalidate); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp new file mode 100644 index 00000000000..edfb16d7bae --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_check_invalidate.h" +#include "mongo/util/log.h" + +namespace mongo { + +using DSCS = DocumentSourceChangeStream; + +namespace { + +// Returns true if the given 'operationType' should invalidate the change stream based on the +// namespace in 'pExpCtx'. +bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + StringData operationType) { + if (pExpCtx->isSingleNamespaceAggregation()) { + return operationType == DSCS::kDropCollectionOpType || + operationType == DSCS::kRenameCollectionOpType || + operationType == DSCS::kDropDatabaseOpType; + } else if (!pExpCtx->isClusterAggregation()) { + return operationType == DSCS::kDropDatabaseOpType; + } else { + return false; + } +}; + +} // namespace + +DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() { + pExpCtx->checkForInterrupt(); + + invariant(!pExpCtx->inMongos); + + if (_queuedInvalidate) { + const auto res = DocumentSource::GetNextResult(std::move(_queuedInvalidate.get())); + _queuedInvalidate.reset(); + return res; + } + + auto nextInput = pSource->getNext(); + if (!nextInput.isAdvanced()) + return nextInput; + + auto doc = nextInput.getDocument(); + const auto& kOperationTypeField = DSCS::kOperationTypeField; + DSCS::checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String); + auto operationType = doc[kOperationTypeField].getString(); + + // If this command should invalidate the stream, generate an invalidate entry and queue it up + // to be returned after the notification of this command. + if (isInvalidatingCommand(pExpCtx, operationType)) { + MutableDocument result(Document{{DSCS::kIdField, doc[DSCS::kIdField]}, + {DSCS::kOperationTypeField, DSCS::kInvalidateOpType}, + {DSCS::kClusterTimeField, doc[DSCS::kClusterTimeField]}}); + + // If we're in a sharded environment, we'll need to merge the results by their sort key, so + // add that as metadata. + result.copyMetaDataFrom(doc); + + _queuedInvalidate = result.freeze(); + } + + return nextInput; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h new file mode 100644 index 00000000000..b689ea6f510 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * This stage is used internally for change stream notifications to artifically generate an + * "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop + * for a single-collection change stream). It is not intended to be created by the user. + */ +class DocumentSourceCheckInvalidate final : public DocumentSource { +public: + GetNextResult getNext() final; + + const char* getSourceName() const final { + // This is used in error reporting. + return "$_checkInvalidate"; + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + // This stage is created by the DocumentSourceChangeStream stage, so serializing it here + // would result in it being created twice. + return Value(); + } + + static boost::intrusive_ptr<DocumentSourceCheckInvalidate> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceCheckInvalidate(expCtx); + } + +private: + /** + * Use the create static method to create a DocumentSourceCheckInvalidate. + */ + DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx) {} + + boost::optional<Document> _queuedInvalidate; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index c53262cfc47..acb80b6b00c 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1536,7 +1536,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { auto spec = BSON("$changeStream" << BSON("fullDocument" << "updateLookup")); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); - ASSERT_EQ(stages.size(), 4UL); + ASSERT_EQ(stages.size(), 5UL); // Make sure the change lookup is at the end. ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); @@ -1561,7 +1561,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage auto spec = BSON("$changeStream" << BSON("fullDocument" << "updateLookup")); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); - ASSERT_EQ(stages.size(), 4UL); + ASSERT_EQ(stages.size(), 5UL); // Make sure the change lookup is at the end. ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); |