diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.cpp | 84 |
1 files changed, 60 insertions, 24 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index c3383dfb9d9..719be15ea2b 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -78,6 +78,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte : ResumeStatus::kCheckNextDoc; } + // If the document's 'txnIndex' sorts before that of the client token, we must keep looking. if (tokenDataFromResumedStream.txnOpIndex < tokenDataFromClient.txnOpIndex) { return ResumeStatus::kCheckNextDoc; } else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) { @@ -90,6 +91,12 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte return ResumeStatus::kSurpassedToken; } + // If 'fromInvalidate' exceeds the client's token value, then we have passed the resume point. + if (tokenDataFromResumedStream.fromInvalidate != tokenDataFromClient.fromInvalidate) { + return tokenDataFromResumedStream.fromInvalidate ? ResumeStatus::kSurpassedToken + : ResumeStatus::kCheckNextDoc; + } + // It is acceptable for the stream UUID to differ from the client's, if this is a whole-database // or cluster-wide stream and we are comparing operations from different shards at the same // clusterTime. If the stream UUID sorts after the client's, however, then the stream is not @@ -184,39 +191,68 @@ DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() { pExpCtx->checkForInterrupt(); - if (_resumeStatus == ResumeStatus::kFoundToken) { + if (_resumeStatus == ResumeStatus::kSurpassedToken) { // We've already verified the resume token is present. return pSource->getNext(); } - Document documentFromResumedStream; - - // Keep iterating the stream until we see either the resume token we're looking for, - // or a change with a higher timestamp than our resume token. - while (_resumeStatus == ResumeStatus::kCheckNextDoc) { + // The incoming documents are sorted by resume token. We examine a range of documents that have + // the same clusterTime as the client's resume token, until we either find (and swallow) a match + // for the token or pass the point in the stream where it should have been. + while (_resumeStatus != ResumeStatus::kSurpassedToken) { auto nextInput = pSource->getNext(); - if (!nextInput.isAdvanced()) + // If there are no more results, return EOF. We will continue checking for the client's + // resume token the next time the getNext method is called. + if (!nextInput.isAdvanced()) { return nextInput; - - // The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range - // of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided - // token would sort before this received document we cannot resume the change stream. - _resumeStatus = compareAgainstClientResumeToken( - pExpCtx, (documentFromResumedStream = nextInput.getDocument()), _tokenFromClient); + } + // Check the current event. If we found and swallowed the resume token, then the result will + // be the first event in the stream which should be returned to the user. Otherwise, we keep + // iterating the stream until we find an event matching the client's resume token. + if (auto nextOutput = _checkNextDocAndSwallowResumeToken(nextInput)) { + return *nextOutput; + } } + MONGO_UNREACHABLE; +} - uassert(40585, - str::stream() - << "resume of change stream was not possible, as the resume token was not found. " - << documentFromResumedStream["_id"].getDocument().toString(), - _resumeStatus != ResumeStatus::kSurpassedToken); - - // If we reach this point, then we've seen the resume token. - invariant(_resumeStatus == ResumeStatus::kFoundToken); - - // Don't return the document which has the token; the user has already seen it. - return pSource->getNext(); +boost::optional<DocumentSource::GetNextResult> +DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken( + const DocumentSource::GetNextResult& nextInput) { + // We should only ever call this method when we have a new event to examine. + invariant(nextInput.isAdvanced()); + auto resumeStatus = + compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient); + switch (resumeStatus) { + case ResumeStatus::kCheckNextDoc: + return boost::none; + case ResumeStatus::kFoundToken: + // We found the resume token. If we are starting after an 'invalidate' token and the + // invalidating command (e.g. collection drop) occurred at the same clusterTime on + // more than one shard, then we will see multiple identical 'invalidate' events + // here. We should continue to swallow all of them to ensure that the new stream + // begins after the collection drop, and that it is not immediately re-invalidated. + if (pExpCtx->inMongos && _tokenFromClient.fromInvalidate) { + _resumeStatus = ResumeStatus::kFoundToken; + return boost::none; + } + // If the token is not an invalidate or if we are not running in a cluster, we mark + // the stream as having surpassed the resume token, skip the current event since the + // client has already seen it, and return the next event in the stream. + _resumeStatus = ResumeStatus::kSurpassedToken; + return pSource->getNext(); + case ResumeStatus::kSurpassedToken: + // If we have surpassed the point in the stream where the resume token should have + // been and we did not see the token itself, then this stream cannot be resumed. + uassert(40585, + str::stream() << "cannot resume stream; the resume token was not found. " + << nextInput.getDocument()["_id"].getDocument().toString(), + _resumeStatus == ResumeStatus::kFoundToken); + _resumeStatus = ResumeStatus::kSurpassedToken; + return nextInput; + } + MONGO_UNREACHABLE; } const char* DocumentSourceShardCheckResumability::getSourceName() const { |