summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_check_resume_token.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp84
1 files changed, 60 insertions, 24 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index c3383dfb9d9..719be15ea2b 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -78,6 +78,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
: ResumeStatus::kCheckNextDoc;
}
+ // If the document's 'txnIndex' sorts before that of the client token, we must keep looking.
if (tokenDataFromResumedStream.txnOpIndex < tokenDataFromClient.txnOpIndex) {
return ResumeStatus::kCheckNextDoc;
} else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) {
@@ -90,6 +91,12 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
return ResumeStatus::kSurpassedToken;
}
+ // If 'fromInvalidate' exceeds the client's token value, then we have passed the resume point.
+ if (tokenDataFromResumedStream.fromInvalidate != tokenDataFromClient.fromInvalidate) {
+ return tokenDataFromResumedStream.fromInvalidate ? ResumeStatus::kSurpassedToken
+ : ResumeStatus::kCheckNextDoc;
+ }
+
// It is acceptable for the stream UUID to differ from the client's, if this is a whole-database
// or cluster-wide stream and we are comparing operations from different shards at the same
// clusterTime. If the stream UUID sorts after the client's, however, then the stream is not
@@ -184,39 +191,68 @@ DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
pExpCtx->checkForInterrupt();
- if (_resumeStatus == ResumeStatus::kFoundToken) {
+ if (_resumeStatus == ResumeStatus::kSurpassedToken) {
// We've already verified the resume token is present.
return pSource->getNext();
}
- Document documentFromResumedStream;
-
- // Keep iterating the stream until we see either the resume token we're looking for,
- // or a change with a higher timestamp than our resume token.
- while (_resumeStatus == ResumeStatus::kCheckNextDoc) {
+ // The incoming documents are sorted by resume token. We examine a range of documents that have
+ // the same clusterTime as the client's resume token, until we either find (and swallow) a match
+ // for the token or pass the point in the stream where it should have been.
+ while (_resumeStatus != ResumeStatus::kSurpassedToken) {
auto nextInput = pSource->getNext();
- if (!nextInput.isAdvanced())
+ // If there are no more results, return EOF. We will continue checking for the client's
+ // resume token the next time the getNext method is called.
+ if (!nextInput.isAdvanced()) {
return nextInput;
-
- // The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range
- // of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided
- // token would sort before this received document we cannot resume the change stream.
- _resumeStatus = compareAgainstClientResumeToken(
- pExpCtx, (documentFromResumedStream = nextInput.getDocument()), _tokenFromClient);
+ }
+ // Check the current event. If we found and swallowed the resume token, then the result will
+ // be the first event in the stream which should be returned to the user. Otherwise, we keep
+ // iterating the stream until we find an event matching the client's resume token.
+ if (auto nextOutput = _checkNextDocAndSwallowResumeToken(nextInput)) {
+ return *nextOutput;
+ }
}
+ MONGO_UNREACHABLE;
+}
- uassert(40585,
- str::stream()
- << "resume of change stream was not possible, as the resume token was not found. "
- << documentFromResumedStream["_id"].getDocument().toString(),
- _resumeStatus != ResumeStatus::kSurpassedToken);
-
- // If we reach this point, then we've seen the resume token.
- invariant(_resumeStatus == ResumeStatus::kFoundToken);
-
- // Don't return the document which has the token; the user has already seen it.
- return pSource->getNext();
+boost::optional<DocumentSource::GetNextResult>
+DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken(
+ const DocumentSource::GetNextResult& nextInput) {
+ // We should only ever call this method when we have a new event to examine.
+ invariant(nextInput.isAdvanced());
+ auto resumeStatus =
+ compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
+ switch (resumeStatus) {
+ case ResumeStatus::kCheckNextDoc:
+ return boost::none;
+ case ResumeStatus::kFoundToken:
+ // We found the resume token. If we are starting after an 'invalidate' token and the
+ // invalidating command (e.g. collection drop) occurred at the same clusterTime on
+ // more than one shard, then we will see multiple identical 'invalidate' events
+ // here. We should continue to swallow all of them to ensure that the new stream
+ // begins after the collection drop, and that it is not immediately re-invalidated.
+ if (pExpCtx->inMongos && _tokenFromClient.fromInvalidate) {
+ _resumeStatus = ResumeStatus::kFoundToken;
+ return boost::none;
+ }
+ // If the token is not an invalidate or if we are not running in a cluster, we mark
+ // the stream as having surpassed the resume token, skip the current event since the
+ // client has already seen it, and return the next event in the stream.
+ _resumeStatus = ResumeStatus::kSurpassedToken;
+ return pSource->getNext();
+ case ResumeStatus::kSurpassedToken:
+ // If we have surpassed the point in the stream where the resume token should have
+ // been and we did not see the token itself, then this stream cannot be resumed.
+ uassert(40585,
+ str::stream() << "cannot resume stream; the resume token was not found. "
+ << nextInput.getDocument()["_id"].getDocument().toString(),
+ _resumeStatus == ResumeStatus::kFoundToken);
+ _resumeStatus = ResumeStatus::kSurpassedToken;
+ return nextInput;
+ }
+ MONGO_UNREACHABLE;
}
const char* DocumentSourceShardCheckResumability::getSourceName() const {