diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-06-22 06:25:30 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-06-29 17:12:16 +0100 |
commit | 498b7851cfa4d4860fd6d865647c24f680b32bbd (patch) | |
tree | 4adaf8b49e60de88f1d2a19db014ee39eb4f2afe /src | |
parent | e6513a6b1d45045f735f207ea78f35efaac7967e (diff) | |
download | mongo-498b7851cfa4d4860fd6d865647c24f680b32bbd.tar.gz |
SERVER-41196 Integrate 'invalidate' tokens into change stream's standard resume logic
(cherry picked from commit 7e4423b458fcefd37a62ebecf168716166b7dc4c)
Diffstat (limited to 'src')
6 files changed, 236 insertions, 49 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index b9225fbe8cf..5e5861f2971 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -341,7 +341,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression list<intrusive_ptr<DocumentSource>> stages; boost::optional<Timestamp> startFrom; intrusive_ptr<DocumentSource> resumeStage = nullptr; - bool ignoreFirstInvalidate = false; + boost::optional<ResumeTokenData> startAfterInvalidate; bool showMigrationEvents = spec.getShowMigrationEvents(); uassert(31123, "Change streams from mongos may not show migration events.", @@ -357,9 +357,11 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression ResumeToken token = resumeAfter ? resumeAfter.get() : startAfter.get(); ResumeTokenData tokenData = token.getData(); - // If resuming from an "invalidate" using "startAfter", set this bit to indicate to the - // DocumentSourceCheckInvalidate stage that a second invalidate should not be generated. - ignoreFirstInvalidate = startAfter && tokenData.fromInvalidate; + // If resuming from an "invalidate" using "startAfter", pass along the resume token data to + // DocumentSourceCheckInvalidate to signify that another invalidate should not be generated. + if (startAfter && tokenData.fromInvalidate) { + startAfterInvalidate = tokenData; + } uassert(ErrorCodes::InvalidResumeToken, "Attempting to resume a change stream using 'resumeAfter' is not allowed from an " @@ -432,7 +434,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression // The resume stage must come after the check invalidate stage so that the former can determine // whether the event that matches the resume token should be followed by an "invalidate" event. - stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate)); + stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, startAfterInvalidate)); if (resumeStage) { stages.push_back(resumeStage); } diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp index ff15060d44f..bc7d8993045 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp @@ -81,14 +81,22 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() { // identical resume token to the notification for the command, except with an extra flag // indicating that the token is from an invalidate. This flag is necessary to disambiguate // the two tokens, and thus preserve a total ordering on the stream. - - // As a special case, if a client receives an invalidate like this one and then wants to - // start a new stream after the invalidate, they can use the "startAfter" option, in which - // case '_ignoreFirstInvalidate' will be set, and we should ignore (AKA not generate) the - // very first invalidation. - if (isInvalidatingCommand(pExpCtx, operationType) && !_ignoreFirstInvalidate) { + if (isInvalidatingCommand(pExpCtx, operationType)) { auto resumeTokenData = ResumeToken::parse(doc[DSCS::kIdField].getDocument()).getData(); resumeTokenData.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate; + + // If a client receives an invalidate and wants to start a new stream after the invalidate, + // they can use the 'startAfter' option. In this case, '_startAfterInvalidate' will be set + // to the resume token with which the client restarted the stream. We must be sure to avoid + // re-invalidating the new stream, and so we will swallow the first invalidate we see on + // each shard. The one exception is the invalidate which matches the 'startAfter' resume + // token. We must re-generate this invalidate, since DSEnsureResumeTokenPresent needs to see + // (and will take care of swallowing) the event which exactly matches the client's token. + if (_startAfterInvalidate && resumeTokenData != _startAfterInvalidate) { + _startAfterInvalidate.reset(); + return nextInput; + } + auto resumeTokenDoc = ResumeToken(resumeTokenData).toDocument(); MutableDocument result(Document{{DSCS::kIdField, resumeTokenDoc}, @@ -113,7 +121,7 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() { // Regardless of whether the first document we see is an invalidating command, we only skip the // first invalidate for streams with the 'startAfter' option, so we should not skip any // invalidates that come after the first one. - _ignoreFirstInvalidate = false; + _startAfterInvalidate.reset(); return nextInput; } diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h index 66b289bb36b..349ad68c589 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -69,8 +69,9 @@ public: } static boost::intrusive_ptr<DocumentSourceCheckInvalidate> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, bool ignoreFirstInvalidate) { - return new DocumentSourceCheckInvalidate(expCtx, ignoreFirstInvalidate); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<ResumeTokenData> startAfterInvalidate) { + return new DocumentSourceCheckInvalidate(expCtx, std::move(startAfterInvalidate)); } private: @@ -78,11 +79,14 @@ private: * Use the create static method to create a DocumentSourceCheckInvalidate. */ DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx, - bool ignoreFirstInvalidate) - : DocumentSource(expCtx), _ignoreFirstInvalidate(ignoreFirstInvalidate) {} + boost::optional<ResumeTokenData> startAfterInvalidate) + : DocumentSource(expCtx), _startAfterInvalidate(std::move(startAfterInvalidate)) { + invariant(!_startAfterInvalidate || + _startAfterInvalidate->fromInvalidate == ResumeTokenData::kFromInvalidate); + } + boost::optional<ResumeTokenData> _startAfterInvalidate; boost::optional<Document> _queuedInvalidate; - bool _ignoreFirstInvalidate; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index c3383dfb9d9..719be15ea2b 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -78,6 +78,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte : ResumeStatus::kCheckNextDoc; } + // If the document's 'txnIndex' sorts before that of the client token, we must keep looking. if (tokenDataFromResumedStream.txnOpIndex < tokenDataFromClient.txnOpIndex) { return ResumeStatus::kCheckNextDoc; } else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) { @@ -90,6 +91,12 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte return ResumeStatus::kSurpassedToken; } + // If 'fromInvalidate' exceeds the client's token value, then we have passed the resume point. + if (tokenDataFromResumedStream.fromInvalidate != tokenDataFromClient.fromInvalidate) { + return tokenDataFromResumedStream.fromInvalidate ? ResumeStatus::kSurpassedToken + : ResumeStatus::kCheckNextDoc; + } + // It is acceptable for the stream UUID to differ from the client's, if this is a whole-database // or cluster-wide stream and we are comparing operations from different shards at the same // clusterTime. If the stream UUID sorts after the client's, however, then the stream is not @@ -184,39 +191,68 @@ DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() { pExpCtx->checkForInterrupt(); - if (_resumeStatus == ResumeStatus::kFoundToken) { + if (_resumeStatus == ResumeStatus::kSurpassedToken) { // We've already verified the resume token is present. return pSource->getNext(); } - Document documentFromResumedStream; - - // Keep iterating the stream until we see either the resume token we're looking for, - // or a change with a higher timestamp than our resume token. - while (_resumeStatus == ResumeStatus::kCheckNextDoc) { + // The incoming documents are sorted by resume token. We examine a range of documents that have + // the same clusterTime as the client's resume token, until we either find (and swallow) a match + // for the token or pass the point in the stream where it should have been. + while (_resumeStatus != ResumeStatus::kSurpassedToken) { auto nextInput = pSource->getNext(); - if (!nextInput.isAdvanced()) + // If there are no more results, return EOF. We will continue checking for the client's + // resume token the next time the getNext method is called. + if (!nextInput.isAdvanced()) { return nextInput; - - // The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range - // of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided - // token would sort before this received document we cannot resume the change stream. - _resumeStatus = compareAgainstClientResumeToken( - pExpCtx, (documentFromResumedStream = nextInput.getDocument()), _tokenFromClient); + } + // Check the current event. If we found and swallowed the resume token, then the result will + // be the first event in the stream which should be returned to the user. Otherwise, we keep + // iterating the stream until we find an event matching the client's resume token. + if (auto nextOutput = _checkNextDocAndSwallowResumeToken(nextInput)) { + return *nextOutput; + } } + MONGO_UNREACHABLE; +} - uassert(40585, - str::stream() - << "resume of change stream was not possible, as the resume token was not found. " - << documentFromResumedStream["_id"].getDocument().toString(), - _resumeStatus != ResumeStatus::kSurpassedToken); - - // If we reach this point, then we've seen the resume token. - invariant(_resumeStatus == ResumeStatus::kFoundToken); - - // Don't return the document which has the token; the user has already seen it. - return pSource->getNext(); +boost::optional<DocumentSource::GetNextResult> +DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken( + const DocumentSource::GetNextResult& nextInput) { + // We should only ever call this method when we have a new event to examine. + invariant(nextInput.isAdvanced()); + auto resumeStatus = + compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient); + switch (resumeStatus) { + case ResumeStatus::kCheckNextDoc: + return boost::none; + case ResumeStatus::kFoundToken: + // We found the resume token. If we are starting after an 'invalidate' token and the + // invalidating command (e.g. collection drop) occurred at the same clusterTime on + // more than one shard, then we will see multiple identical 'invalidate' events + // here. We should continue to swallow all of them to ensure that the new stream + // begins after the collection drop, and that it is not immediately re-invalidated. + if (pExpCtx->inMongos && _tokenFromClient.fromInvalidate) { + _resumeStatus = ResumeStatus::kFoundToken; + return boost::none; + } + // If the token is not an invalidate or if we are not running in a cluster, we mark + // the stream as having surpassed the resume token, skip the current event since the + // client has already seen it, and return the next event in the stream. + _resumeStatus = ResumeStatus::kSurpassedToken; + return pSource->getNext(); + case ResumeStatus::kSurpassedToken: + // If we have surpassed the point in the stream where the resume token should have + // been and we did not see the token itself, then this stream cannot be resumed. + uassert(40585, + str::stream() << "cannot resume stream; the resume token was not found. " + << nextInput.getDocument()["_id"].getDocument().toString(), + _resumeStatus == ResumeStatus::kFoundToken); + _resumeStatus = ResumeStatus::kSurpassedToken; + return nextInput; + } + MONGO_UNREACHABLE; } const char* DocumentSourceShardCheckResumability::getSourceName() const { diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index c4cd53bb2cf..8c90a88b564 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -154,6 +154,13 @@ private: DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); + /** + * Check the given event to determine whether it matches the client's resume token. If so, we + * swallow this event and return the next event in the stream. Otherwise, return boost::none. + */ + boost::optional<DocumentSource::GetNextResult> _checkNextDocAndSwallowResumeToken( + const DocumentSource::GetNextResult& nextInput); + ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; ResumeTokenData _tokenFromClient; }; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 85d424b3bdd..a63579fa9fb 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -62,15 +62,20 @@ public: protected: /** + * Pushes a document with a resume token corresponding to the given ResumeTokenData into the + * mock queue. + */ + void addDocument(ResumeTokenData tokenData) { + _mock->push_back(Document{{"_id", ResumeToken(std::move(tokenData)).toDocument()}}); + } + + /** * Pushes a document with a resume token corresponding to the given timestamp, version, * txnOpIndex, docKey, and namespace into the mock queue. */ void addDocument( Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) { - _mock->push_back( - Document{{"_id", - ResumeToken(ResumeTokenData(ts, version, txnOpIndex, uuid, Value(docKey))) - .toDocument()}}); + return addDocument({ts, version, txnOpIndex, uuid, Value(docKey)}); } /** @@ -93,6 +98,17 @@ protected: } /** + * Convenience method to create the class under test with a given ResumeTokenData. + */ + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + ResumeTokenData tokenData) { + auto checkResumeToken = + DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), std::move(tokenData)); + checkResumeToken->setSource(_mock.get()); + return checkResumeToken; + } + + /** * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ @@ -102,10 +118,8 @@ protected: std::size_t txnOpIndex, boost::optional<Document> docKey, UUID uuid) { - auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create( - getExpCtx(), {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()}); - checkResumeToken->setSource(_mock.get()); - return checkResumeToken; + return createCheckResumeToken( + {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()}); } /** @@ -409,6 +423,122 @@ TEST_F(CheckResumeTokenTest, ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585); } +TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterInvalidate) { + Timestamp resumeTimestamp(100, 1); + Timestamp firstEventAfter(100, 2); + + // Create an array of 2 UUIDs. The first represents the UUID of the namespace before it was + // dropped. The second is the UUID of the collection after it is recreated. + UUID uuids[2] = {UUID::gen(), UUID::gen()}; + + // This behaviour is only relevant when DSEnsureResumeTokenPresent is running on mongoS. + getExpCtx()->inMongos = true; + + // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A + // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new + // stream after the old stream is invalidated. + ResumeTokenData invalidateToken; + invalidateToken.clusterTime = resumeTimestamp; + invalidateToken.uuid = uuids[0]; + invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; + auto checkResumeToken = createCheckResumeToken(invalidateToken); + + // Add three documents which each have the invalidate resume token. We expect to see this in the + // event that we are starting after an invalidate and the invalidating event occurred on several + // shards at the same clusterTime. + addDocument(invalidateToken); + addDocument(invalidateToken); + addDocument(invalidateToken); + + // Add a document representing an insert which recreated the collection after it was dropped. + auto expectedDocKey = Document{{"_id"_sd, 1}}; + addDocument(Timestamp{100, 2}, expectedDocKey, uuids[1]); + + // DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it + // and the next two invalidates, and return the insert event after the collection drop. + const auto firstDocAfterResume = checkResumeToken->getNext(); + const auto tokenFromFirstDocAfterResume = + ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData(); + + ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, firstEventAfter); + ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey); +} + +TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInvalidate) { + Timestamp resumeTimestamp(100, 1); + + // This behaviour is only relevant when DSEnsureResumeTokenPresent is running on mongoS. + getExpCtx()->inMongos = true; + + // Create an ordered array of of 2 UUIDs. + std::vector<UUID> uuids = {UUID::gen(), UUID::gen()}; + std::sort(uuids.begin(), uuids.end()); + + // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A + // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new + // stream after the old stream is invalidated. + ResumeTokenData invalidateToken; + invalidateToken.clusterTime = resumeTimestamp; + invalidateToken.uuid = uuids[0]; + invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; + auto checkResumeToken = createCheckResumeToken(invalidateToken); + + // Create a second invalidate token with the same clusterTime but a different UUID. + auto unrelatedInvalidateToken = invalidateToken; + unrelatedInvalidateToken.uuid = uuids[1]; + + // Add three documents which each have the invalidate resume token. We expect to see this in the + // event that we are starting after an invalidate and the invalidating event occurred on several + // shards at the same clusterTime. + addDocument(invalidateToken); + addDocument(invalidateToken); + addDocument(invalidateToken); + + // Add a fourth document which has the unrelated invalidate at the same clusterTime. + addDocument(unrelatedInvalidateToken); + + // DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it + // and the next two invalidates, but decline to swallow the unrelated invalidate. + const auto firstDocAfterResume = checkResumeToken->getNext(); + const auto tokenFromFirstDocAfterResume = + ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData(); + + ASSERT_EQ(tokenFromFirstDocAfterResume, unrelatedInvalidateToken); +} + +TEST_F(CheckResumeTokenTest, ShouldSwallowOnlyFirstInvalidateForStartAfterInvalidateInReplSet) { + Timestamp resumeTimestamp(100, 1); + + // We only swallow multiple invalidates when DSEnsureResumeTokenPresent is running on mongoS. + // Set {inMongos:false} to verify that we do not swallow additional invalidates on a replica + // set, since this should never occur. + getExpCtx()->inMongos = false; + + // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A + // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new + // stream after the old stream is invalidated. + ResumeTokenData invalidateToken; + invalidateToken.clusterTime = resumeTimestamp; + invalidateToken.uuid = testUuid(); + invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; + auto checkResumeToken = createCheckResumeToken(invalidateToken); + + // Add three documents which each have the invalidate resume token. + addDocument(invalidateToken); + addDocument(invalidateToken); + addDocument(invalidateToken); + + // DSEnsureResumeTokenPresent should confirm that the invalidate event is present and swallow + // it. However, it should not swallow the subsequent two invalidates. + for (size_t i = 0; i < 2; ++i) { + const auto nextDocAfterResume = checkResumeToken->getNext(); + const auto tokenFromNextDocAfterResume = + ResumeToken::parse(nextDocAfterResume.getDocument()["_id"].getDocument()).getData(); + ASSERT_EQ(tokenFromNextDocAfterResume, invalidateToken); + } + ASSERT(checkResumeToken->getNext().isEOF()); +} + TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { Timestamp resumeTimestamp(100, 1); |