diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-18 17:27:43 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-27 09:38:59 -0400 |
commit | bbe67b5bdd596f4720b19f9f4c8c38cfadb9c1dd (patch) | |
tree | 594dba7d8898c73ab488137e20112cc99a2d5dda | |
parent | 752069dbc79f22b6ae4691073d455d76c9bbf18d (diff) | |
download | mongo-bbe67b5bdd596f4720b19f9f4c8c38cfadb9c1dd.tar.gz |
SERVER-34789: Using resume token from 'invalidate' notification for 'resumeAfter' should error
-rw-r--r-- | jstests/change_streams/metadata_notifications.js | 78 | ||||
-rw-r--r-- | jstests/change_streams/whole_db_resumability.js | 43 | ||||
-rw-r--r-- | jstests/libs/change_stream_util.js | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 102 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_close_cursor.h | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_test.cpp | 190 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_invalidate.cpp | 113 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_invalidate.h | 82 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.h | 12 |
14 files changed, 503 insertions, 163 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js index f1d4373606a..be892c114da 100644 --- a/jstests/change_streams/metadata_notifications.js +++ b/jstests/change_streams/metadata_notifications.js @@ -70,7 +70,7 @@ {operationType: "drop"}, {operationType: "invalidate"}, ]; - const changes = cst.assertNextChangesEqual( + let changes = cst.assertNextChangesEqual( {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true}); const resumeToken = changes[0]._id; const resumeTokenDrop = changes[3]._id; @@ -89,33 +89,36 @@ coll = assertCreateCollection(db, collName); assert.writeOK(coll.insert({_id: "after recreate"})); - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to - // be sure that it doesn't crash the server, but the ability to resume a change stream using - // 'resumeAfter' with a resume token from an invalidate is a bug, not a feature. - // Test resuming the change stream from the collection drop using 'resumeAfter'. - expectedChanges = [{ - operationType: "insert", - ns: {db: db.getName(), coll: coll.getName()}, - fullDocument: {_id: "after recreate"}, - documentKey: {_id: "after recreate"} - }]; - assertResumeExpected( - {coll: coll.getName(), spec: {resumeAfter: resumeTokenDrop}, expected: expectedChanges}); - - // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'. assertResumeExpected({ coll: coll.getName(), - spec: {resumeAfter: resumeTokenInvalidate}, - expected: expectedChanges + spec: {resumeAfter: resumeTokenDrop}, + expected: [{operationType: "invalidate"}] }); + // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'. + assert.commandFailedWithCode(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}], + cursor: {}, + collation: {locale: "simple"}, + }), + ErrorCodes.InvalidResumeToken); + // Test resuming the change stream from the collection drop using 'startAfter'. - assertResumeExpected( - {coll: coll.getName(), spec: {startAfter: resumeTokenDrop}, expected: expectedChanges}); + assertResumeExpected({ + coll: coll.getName(), + spec: {startAfter: resumeTokenDrop}, + expected: [{operationType: "invalidate"}] + }); - // Test resuming the change stream from the 'invalidate' notification using 'startAfter'. This - // is expected to behave identical to resuming from the drop. + // Test resuming the change stream from the 'invalidate' notification using 'startAfter'. + expectedChanges = [{ + operationType: "insert", + ns: {db: db.getName(), coll: coll.getName()}, + fullDocument: {_id: "after recreate"}, + documentKey: {_id: "after recreate"} + }]; assertResumeExpected({ coll: coll.getName(), spec: {startAfter: resumeTokenInvalidate}, @@ -161,36 +164,35 @@ coll = db[collName]; assert.writeOK(coll.insert({_id: "after rename"})); - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here - // to be sure that it doesn't crash the server, but the ability to resume a change stream - // after an invalidate using 'resumeAfter' is a bug, not a feature. - // Test resuming the change stream from the collection rename using 'resumeAfter'. - expectedChanges = [{ - operationType: "insert", - ns: {db: db.getName(), coll: coll.getName()}, - fullDocument: {_id: "after rename"}, - documentKey: {_id: "after rename"} - }]; assertResumeExpected({ coll: coll.getName(), spec: {resumeAfter: resumeTokenRename}, - expected: expectedChanges + expected: [{operationType: "invalidate"}] }); // Test resuming the change stream from the invalidate after the rename using 'resumeAfter'. - assertResumeExpected({ - coll: coll.getName(), - spec: {resumeAfter: resumeTokenInvalidate}, - expected: expectedChanges - }); + assert.commandFailedWithCode(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}], + cursor: {}, + collation: {locale: "simple"}, + }), + ErrorCodes.InvalidResumeToken); // Test resuming the change stream from the rename using 'startAfter'. assertResumeExpected({ coll: coll.getName(), spec: {startAfter: resumeTokenRename}, - expected: expectedChanges + expected: [{operationType: "invalidate"}] }); + // Test resuming the change stream from the invalidate after the rename using 'startAfter'. + expectedChanges = [{ + operationType: "insert", + ns: {db: db.getName(), coll: coll.getName()}, + fullDocument: {_id: "after rename"}, + documentKey: {_id: "after rename"} + }]; assertResumeExpected({ coll: coll.getName(), spec: {startAfter: resumeTokenInvalidate}, diff --git a/jstests/change_streams/whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js index ba311400c87..c34d4067af2 100644 --- a/jstests/change_streams/whole_db_resumability.js +++ b/jstests/change_streams/whole_db_resumability.js @@ -131,6 +131,9 @@ ]; const dropDbChanges = cst.assertNextChangesEqual( {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop}); + const resumeTokenDrop = dropDbChanges[0]._id; + const resumeTokenDropDb = dropDbChanges[1]._id; + const resumeTokenInvalidate = dropDbChanges[2]._id; // Resume from the first collection drop. resumeCursor = cst.startWatchingChanges({ @@ -142,7 +145,7 @@ // Resume from the second collection drop using 'resumeAfter'. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: dropDbChanges[0]._id}}], + pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}], collection: 1, }); cst.assertNextChangesEqual( @@ -150,55 +153,53 @@ // Resume from the second collection drop using 'startAfter'. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {startAfter: dropDbChanges[0]._id}}], + pipeline: [{$changeStream: {startAfter: resumeTokenDrop}}], collection: 1, }); cst.assertNextChangesEqual( {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop.slice(1)}); // Recreate the test collection. - coll = assertCreateCollection(testDB, coll.getName()); assert.writeOK(coll.insert({_id: "after recreate"})); - let expectedInsert = { + let expectedInsert = [{ operationType: "insert", - ns: {db: db.getName(), coll: coll.getName()}, + ns: {db: testDB.getName(), coll: coll.getName()}, fullDocument: {_id: "after recreate"}, documentKey: {_id: "after recreate"} - }; - - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to - // be sure that it doesn't crash the server, but the ability to resume a change stream using - // 'resumeAfter' with a resume token from an invalidate is a bug, not a feature. + }]; // Test resuming from the 'dropDatabase' entry using 'resumeAfter'. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: dropDbChanges[1]._id}}], + pipeline: [{$changeStream: {resumeAfter: resumeTokenDropDb}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); - cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert}); + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); // Test resuming from the 'invalidate' entry using 'resumeAfter'. - resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}], - collection: 1, - aggregateOptions: {cursor: {batchSize: 0}}, - }); - cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert}); + assert.commandFailedWithCode(db.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}], + cursor: {}, + collation: {locale: "simple"}, + }), + ErrorCodes.InvalidResumeToken); // Test resuming from the 'dropDatabase' entry using 'startAfter'. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {startAfter: dropDbChanges[1]._id}}], + pipeline: [{$changeStream: {startAfter: resumeTokenDropDb}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); - cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert}); + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); // Test resuming from the 'invalidate' entry using 'startAfter' and verifies it picks up the // insert after recreating the db/collection. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}], + pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 71a56f3ffe8..8c779cbf0f5 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -193,6 +193,13 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { "Expected change's size must match expected number of changes"); } + // Convert 'expectedChanges' to an array, even if it contains just a single element. + if (expectedChanges !== undefined && !(expectedChanges instanceof Array)) { + let arrayVersion = new Array; + arrayVersion.push(expectedChanges); + expectedChanges = arrayVersion; + } + // Set the expected number of changes based on the size of the expected change list. if (expectedNumChanges === undefined) { assert.neq(expectedChanges, undefined); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 120996f96ff..fbdc846e838 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -271,6 +271,7 @@ pipelineeEnv.Library( 'document_source_change_stream.cpp', 'document_source_change_stream_close_cursor.cpp', 'document_source_change_stream_transform.cpp', + 'document_source_check_invalidate.cpp', 'document_source_check_resume_token.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index b123820e981..6aaec438148 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -38,6 +38,7 @@ #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" +#include "mongo/db/pipeline/document_source_check_invalidate.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" @@ -318,24 +319,13 @@ void assertResumeAllowed(const intrusive_ptr<ExpressionContext>& expCtx, expCtx->uuid && tokenData.uuid && expCtx->uuid.get() == tokenData.uuid.get()); } -/** - * Parses the resume options in 'spec', optionally populating the resume stage and cluster time to - * start from. Throws an AssertionException if not running on a replica set or multiple resume - * options are specified. - */ -void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceChangeStreamSpec& spec, - intrusive_ptr<DocumentSource>* resumeStageOut, - boost::optional<Timestamp>* startFromOut) { - if (!expCtx->inMongos) { - auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); - uassert(40573, - "The $changeStream stage is only supported on replica sets", - replCoord && - replCoord->getReplicationMode() == - repl::ReplicationCoordinator::Mode::modeReplSet); - *startFromOut = replCoord->getMyLastAppliedOpTime().getTimestamp(); - } +list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec spec, + BSONElement elem) { + list<intrusive_ptr<DocumentSource>> stages; + boost::optional<Timestamp> startFrom; + intrusive_ptr<DocumentSource> resumeStage = nullptr; + bool ignoreFirstInvalidate = false; auto resumeAfter = spec.getResumeAfter(); auto startAfter = spec.getStartAfter(); @@ -347,29 +337,65 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token = resumeAfter ? resumeAfter.get() : startAfter.get(); ResumeTokenData tokenData = token.getData(); + // If resuming from an "invalidate" using "startAfter", set this bit to indicate to the + // DocumentSourceCheckInvalidate stage that a second invalidate should not be generated. + ignoreFirstInvalidate = startAfter && tokenData.fromInvalidate; + + uassert(ErrorCodes::InvalidResumeToken, + "Attempting to resume a change stream using 'resumeAfter' is not allowed from an " + "invalidate notification.", + !resumeAfter || !tokenData.fromInvalidate); // Verify that the requested resume attempt is possible based on the stream type, resume // token UUID, and collation. assertResumeAllowed(expCtx, tokenData); - *startFromOut = tokenData.clusterTime; + startFrom = tokenData.clusterTime; if (expCtx->needsMerge) { - *resumeStageOut = + resumeStage = DocumentSourceShardCheckResumability::create(expCtx, tokenData.clusterTime); } else { - *resumeStageOut = - DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token)); + resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token)); } } - auto startAtOperationTime = spec.getStartAtOperationTime(); - - if (startAtOperationTime) { + if (auto startAtOperationTime = spec.getStartAtOperationTime()) { uassert(40674, "Only one type of resume option is allowed, but multiple were found.", - !*resumeStageOut); - *startFromOut = *startAtOperationTime; - *resumeStageOut = DocumentSourceShardCheckResumability::create(expCtx, **startFromOut); + !resumeStage); + startFrom = *startAtOperationTime; + resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom); } + + // There might not be a starting point if we're on mongos, otherwise we should either have a + // 'resumeAfter' starting point, or should start from the latest majority committed operation. + auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); + uassert(40573, + "The $changeStream stage is only supported on replica sets", + expCtx->inMongos || (replCoord && + replCoord->getReplicationMode() == + repl::ReplicationCoordinator::Mode::modeReplSet)); + if (!startFrom && !expCtx->inMongos) { + startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp(); + } + + if (startFrom) { + const bool startFromInclusive = (resumeStage != nullptr); + stages.push_back(DocumentSourceOplogMatch::create( + DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, startFromInclusive), + expCtx)); + } + + stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, elem.embeddedObject())); + stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate)); + + // The resume stage must come after the check invalidate stage to allow the check invalidate + // stage to determine whether the oplog entry matching the resume token should be followed by an + // "invalidate" entry. + if (resumeStage) { + stages.push_back(resumeStage); + } + + return stages; } } // namespace @@ -389,10 +415,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // Make sure that it is legal to run this $changeStream before proceeding. DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec); - boost::optional<Timestamp> startFrom; - intrusive_ptr<DocumentSource> resumeStage = nullptr; - parseResumeOptions(expCtx, spec, &resumeStage, &startFrom); - auto fullDocOption = spec.getFullDocument(); uassert(40575, str::stream() << "unrecognized value for the 'fullDocument' option to the " @@ -404,21 +426,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( const bool shouldLookupPostImage = (fullDocOption == "updateLookup"_sd); - list<intrusive_ptr<DocumentSource>> stages; - - // There might not be a starting point if we're on mongos, otherwise we should either have a - // 'resumeAfter' starting point, or should start from the latest majority committed operation. - invariant(expCtx->inMongos || static_cast<bool>(startFrom)); - if (startFrom) { - const bool startFromInclusive = (resumeStage != nullptr); - stages.push_back(DocumentSourceOplogMatch::create( - buildMatchFilter(expCtx, *startFrom, startFromInclusive), expCtx)); - } - - stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, elem.embeddedObject())); - if (resumeStage) { - stages.push_back(resumeStage); - } + auto stages = buildPipeline(expCtx, spec, elem); if (!expCtx->needsMerge) { // There should only be one close cursor stage. If we're on the shards and producing input // to be merged, do not add a close cursor stage, since the mongos will already have one. diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp index 207971dbb76..a0d87d1baa9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp @@ -59,11 +59,6 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated"); } - if (_queuedInvalidate) { - _shouldCloseCursor = true; - return DocumentSource::GetNextResult(std::move(_queuedInvalidate.get())); - } - auto nextInput = pSource->getNext(); if (!nextInput.isAdvanced()) return nextInput; @@ -80,15 +75,6 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { _shouldCloseCursor = true; } - // Check if this is an invalidating command and the next entry should be an "invalidate". - if (isInvalidatingCommand(pExpCtx, operationType)) { - _queuedInvalidate = Document{ - {DocumentSourceChangeStream::kIdField, doc[DocumentSourceChangeStream::kIdField]}, - {DocumentSourceChangeStream::kClusterTimeField, - doc[DocumentSourceChangeStream::kClusterTimeField]}, - {DocumentSourceChangeStream::kOperationTypeField, "invalidate"_sd}}; - } - return nextInput; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index bc60687deae..065b15e7ccc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -101,7 +101,6 @@ private: : DocumentSource(expCtx) {} bool _shouldCloseCursor = false; - boost::optional<Document> _queuedInvalidate; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 8f522b26d4e..3d3e4bbb36d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" +#include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -155,21 +156,29 @@ public: auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get()); ASSERT(match); auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx()); - - auto mock = DocumentSourceMock::create(D(entry)); - executableMatch->setSource(mock.get()); + // Replace the original match with the executable one. + stages[0] = executableMatch; // Check the oplog entry is transformed correctly. auto transform = stages[1].get(); ASSERT(transform); ASSERT_EQ(string(transform->getSourceName()), DSChangeStream::kStageName); - transform->setSource(executableMatch.get()); - auto closeCursor = stages.back().get(); - ASSERT(closeCursor); - closeCursor->setSource(transform); - - return {mock, executableMatch, transform, closeCursor}; + // Create mock stage and insert at the front of the stages. + auto mock = DocumentSourceMock::create(D(entry)); + stages.insert(stages.begin(), mock); + + // Wire up the stages by setting the source stage. + auto prevStage = stages[0].get(); + for (auto stageIt = stages.begin() + 1; stageIt != stages.end(); stageIt++) { + auto stage = (*stageIt).get(); + // Do not include the check resume token stage since it will swallow the result. + if (dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage)) + continue; + stage->setSource(prevStage); + prevStage = stage; + } + return stages; } vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { @@ -191,10 +200,13 @@ public: Document makeResumeToken(Timestamp ts, ImplicitValue uuid = Value(), - ImplicitValue docKey = Value()) { + ImplicitValue docKey = Value(), + ResumeTokenData::FromInvalidate fromInvalidate = + ResumeTokenData::FromInvalidate::kNotFromInvalidate) { ResumeTokenData tokenData; tokenData.clusterTime = ts; tokenData.documentKey = docKey; + tokenData.fromInvalidate = fromInvalidate; if (!uuid.missing()) tokenData.uuid = uuid.getUuid(); return ResumeToken(tokenData).toDocument(); @@ -422,21 +434,6 @@ TEST_F(ChangeStreamStageTestNoSetup, FailsWithNoReplicationCoordinator) { 40573); } -TEST_F(ChangeStreamStageTest, StagesGeneratedCorrectly) { - const auto spec = fromjson("{$changeStream: {}}"); - - list<intrusive_ptr<DocumentSource>> result = - DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); - vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); - ASSERT_EQUALS(stages.size(), 3UL); - ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(stages.front().get())); - ASSERT_EQUALS(string(stages[0]->getSourceName()), DSChangeStream::kStageName); - ASSERT_EQUALS(string(stages[1]->getSourceName()), DSChangeStream::kStageName); - ASSERT_EQUALS(string(stages[2]->getSourceName()), DSChangeStream::kStageName); - - // TODO: Check explain result. -} - TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type nss, // namespace @@ -663,9 +660,11 @@ TEST_F(ChangeStreamStageTest, TransformDrop) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kIdField, + makeResumeToken( + kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(dropColl, expectedDrop, {}, kDefaultSpec, expectedInvalidate); @@ -685,9 +684,11 @@ TEST_F(ChangeStreamStageTest, TransformRename) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kIdField, + makeResumeToken( + kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate); @@ -725,9 +726,11 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) { {DSChangeStream::kNamespaceField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, }; Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kIdField, + makeResumeToken( + kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate); @@ -1014,7 +1017,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj()); auto result = DSChangeStream::createFromBson(originalSpec.firstElement(), expCtx); vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); - ASSERT_EQ(allStages.size(), 3UL); + ASSERT_EQ(allStages.size(), 4UL); auto stage = allStages[1]; ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get())); @@ -1054,9 +1057,11 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kIdField, + makeResumeToken( + kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; auto next = closeCursor->getNext(); @@ -1116,6 +1121,12 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { expectedInsert, {{"_id"}}, // Mock the 'collectDocumentKeyFields' response. BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + + // Verify the same behavior with resuming using 'startAfter'. + checkTransformation(insertEntry, + expectedInsert, + {{"_id"}}, // Mock the 'collectDocumentKeyFields' response. + BSON("$changeStream" << BSON("startAfter" << resumeToken))); } TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { @@ -1152,6 +1163,12 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres expectedInsert, {{"_id"}}, // Mock the 'collectDocumentKeyFields' response. BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + + // Verify the same behavior with resuming using 'startAfter'. + checkTransformation(insertEntry, + expectedInsert, + {{"_id"}}, // Mock the 'collectDocumentKeyFields' response. + BSON("$changeStream" << BSON("startAfter" << resumeToken))); } TEST_F(ChangeStreamStageTest, ResumeAfterFailsIfResumeTokenDoesNotContainUUID) { @@ -1207,6 +1224,29 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif checkTransformation(rename, expectedRename); } +TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) { + auto expCtx = getExpCtx(); + + // Need to put the collection in the UUID catalog so the resume token is valid. + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid()); + + const auto resumeTokenInvalidate = + makeResumeToken(kDefaultTs, + testUuid(), + BSON("x" << 2 << "_id" << 1), + ResumeTokenData::FromInvalidate::kFromInvalidate); + + ASSERT_THROWS_CODE(DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON( + "resumeAfter" << resumeTokenInvalidate << "startAtOperationTime" + << kDefaultTs)) + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::InvalidResumeToken); +} + // // Test class for change stream of a single database. // @@ -1448,9 +1488,11 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}}, }; Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kIdField, + makeResumeToken( + kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(dropDB, expectedDropDatabase, {}, kDefaultSpec, expectedInvalidate); @@ -1632,5 +1674,81 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); } +TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromInvalidateShouldFail) { + auto expCtx = getExpCtx(); + + // Need to put the collection in the UUID catalog so the resume token is valid. + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid()); + + const auto resumeTokenInvalidate = + makeResumeToken(kDefaultTs, + testUuid(), + BSON("x" << 2 << "_id" << 1), + ResumeTokenData::FromInvalidate::kFromInvalidate); + + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeTokenInvalidate)) + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::InvalidResumeToken); +} + +TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) { + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry. + auto resumeToken = makeResumeToken( + kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kNotFromInvalidate); + + BSONObj insertDoc = BSON("_id" << 2); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc); + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {{"_id"}}, // Mock the 'collectDocumentKeyFields' response. + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + + +TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContainUUID) { + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry. + auto resumeToken = makeResumeToken(kDefaultTs); + + BSONObj insertDoc = BSON("_id" << 2); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc); + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {{"_id"}}, // Mock the 'collectDocumentKeyFields' response. + BSON("$changeStream" << BSON("startAfter" << resumeToken))); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index efeb1f69bbb..06e3c6230bf 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -86,8 +86,10 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( // If the change stream spec includes a resumeToken with a shard key, populate the document key // cache with the field paths. - if (auto resumeAfter = spec.getResumeAfter()) { - ResumeToken token = resumeAfter.get(); + auto resumeAfter = spec.getResumeAfter(); + auto startAfter = spec.getStartAfter(); + if (resumeAfter || startAfter) { + ResumeToken token = resumeAfter ? resumeAfter.get() : startAfter.get(); ResumeTokenData tokenData = token.getData(); if (!tokenData.documentKey.missing() && tokenData.uuid) { diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp new file mode 100644 index 00000000000..be9d47a1252 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_check_invalidate.h" +#include "mongo/util/log.h" + +namespace mongo { + +using DSCS = DocumentSourceChangeStream; + +namespace { + +// Returns true if the given 'operationType' should invalidate the change stream based on the +// namespace in 'pExpCtx'. +bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + StringData operationType) { + if (pExpCtx->isSingleNamespaceAggregation()) { + return operationType == DSCS::kDropCollectionOpType || + operationType == DSCS::kRenameCollectionOpType || + operationType == DSCS::kDropDatabaseOpType; + } else if (!pExpCtx->isClusterAggregation()) { + return operationType == DSCS::kDropDatabaseOpType; + } else { + return false; + } +}; + +} // namespace + +DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() { + pExpCtx->checkForInterrupt(); + + invariant(!pExpCtx->inMongos); + + if (_queuedInvalidate) { + const auto res = DocumentSource::GetNextResult(std::move(_queuedInvalidate.get())); + _queuedInvalidate.reset(); + return res; + } + + auto nextInput = pSource->getNext(); + if (!nextInput.isAdvanced()) + return nextInput; + + auto doc = nextInput.getDocument(); + const auto& kOperationTypeField = DSCS::kOperationTypeField; + DSCS::checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String); + auto operationType = doc[kOperationTypeField].getString(); + + // If this command should invalidate the stream, generate an invalidate entry and queue it up + // to be returned after the notification of this command. The new entry will have a nearly + // identical resume token to the notification for the command, except with an extra flag + // indicating that the token is from an invalidate. This flag is necessary to disambiguate + // the two tokens, and thus preserve a total ordering on the stream. + + // As a special case, if a client receives an invalidate like this one and then wants to + // start a new stream after the invalidate, they can use the "startAfter" option, in which + // case '_ignoreFirstInvalidate' will be set, and we should ignore (AKA not generate) the + // very first invalidation. + if (isInvalidatingCommand(pExpCtx, operationType) && !_ignoreFirstInvalidate) { + auto resumeTokenData = ResumeToken::parse(doc[DSCS::kIdField].getDocument()).getData(); + resumeTokenData.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate; + + MutableDocument result(Document{{DSCS::kIdField, ResumeToken(resumeTokenData).toDocument()}, + {DSCS::kOperationTypeField, DSCS::kInvalidateOpType}, + {DSCS::kClusterTimeField, doc[DSCS::kClusterTimeField]}}); + + // If we're in a sharded environment, we'll need to merge the results by their sort key, so + // add that as metadata. + result.copyMetaDataFrom(doc); + + _queuedInvalidate = result.freeze(); + } + + // Regardless of whether the first document we see is an invalidating command, we only skip the + // first invalidate for streams with the 'startAfter' option, so we should not skip any + // invalidates that come after the first one. + _ignoreFirstInvalidate = false; + + return nextInput; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h new file mode 100644 index 00000000000..18e51524709 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * This stage is used internally for change stream notifications to artifically generate an + * "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop + * for a single-collection change stream). It is not intended to be created by the user. + */ +class DocumentSourceCheckInvalidate final : public DocumentSource { +public: + GetNextResult getNext() final; + + const char* getSourceName() const final { + // This is used in error reporting. + return "$_checkInvalidate"; + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + // This stage is created by the DocumentSourceChangeStream stage, so serializing it here + // would result in it being created twice. + return Value(); + } + + static boost::intrusive_ptr<DocumentSourceCheckInvalidate> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, bool ignoreFirstInvalidate) { + return new DocumentSourceCheckInvalidate(expCtx, ignoreFirstInvalidate); + } + +private: + /** + * Use the create static method to create a DocumentSourceCheckInvalidate. + */ + DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx, + bool ignoreFirstInvalidate) + : DocumentSource(expCtx), _ignoreFirstInvalidate(ignoreFirstInvalidate) {} + + boost::optional<Document> _queuedInvalidate; + bool _ignoreFirstInvalidate; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index ed71a826072..6df9e9fb41f 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1536,7 +1536,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { auto spec = BSON("$changeStream" << BSON("fullDocument" << "updateLookup")); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); - ASSERT_EQ(stages.size(), 4UL); + ASSERT_EQ(stages.size(), 5UL); // Make sure the change lookup is at the end. ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); @@ -1561,7 +1561,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage auto spec = BSON("$changeStream" << BSON("fullDocument" << "updateLookup")); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); - ASSERT_EQ(stages.size(), 4UL); + ASSERT_EQ(stages.size(), 5UL); // Make sure the change lookup is at the end. ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index 8ed5e353bf9..32b688c179a 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -70,13 +70,14 @@ ResumeToken::ResumeToken(const Document& resumeDoc) { } // We encode the resume token as a KeyString with the sequence: -// clusterTime, version, applyOpsIndex, uuid, documentKey -// Only the clusterTime, version, and applyOpsIndex are required. +// clusterTime, version, applyOpsIndex, fromInvalidate, uuid, documentKey +// Only the clusterTime, version, applyOpsIndex, and fromInvalidate are required. ResumeToken::ResumeToken(const ResumeTokenData& data) { BSONObjBuilder builder; builder.append("", data.clusterTime); builder.append("", data.version); builder.appendNumber("", data.applyOpsIndex); + builder.appendBool("", data.fromInvalidate); uassert(50788, "Unexpected resume token with a documentKey but no UUID", data.uuid || data.documentKey.missing()); @@ -148,6 +149,13 @@ ResumeTokenData ResumeToken::getData() const { uassert(50794, "Invalid Resume Token: applyOpsIndex should be non-negative", applyOpsInd >= 0); result.applyOpsIndex = applyOpsInd; + uassert(50872, "Resume Token does not contain fromInvalidate", i.more()); + auto fromInvalidate = i.next(); + uassert(50870, + "Resume Token fromInvalidate is not a boolean.", + fromInvalidate.type() == BSONType::Bool); + result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean()); + // The UUID and documentKey are not required. if (!i.more()) { return result; @@ -158,6 +166,7 @@ ResumeTokenData ResumeToken::getData() const { if (i.more()) { result.documentKey = Value(i.next()); } + uassert(40646, "invalid oversized resume token", !i.more()); return result; } diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 42b5b2b9f03..25318719e67 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -40,6 +40,14 @@ namespace mongo { struct ResumeTokenData { + /** + * Flag to indicate if the resume token is from an invalidate notification. + */ + enum FromInvalidate : bool { + kFromInvalidate = true, + kNotFromInvalidate = false, + }; + ResumeTokenData(){}; ResumeTokenData(Timestamp clusterTimeIn, int versionIn, @@ -62,6 +70,10 @@ struct ResumeTokenData { size_t applyOpsIndex = 0; Value documentKey; boost::optional<UUID> uuid; + // Flag to indicate that this resume token is from an "invalidate" entry. This will not be set + // on a token from a command that *would* invalidate a change stream, but rather the invalidate + // notification itself. + FromInvalidate fromInvalidate = FromInvalidate::kNotFromInvalidate; }; std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData); |