diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.cpp | 25 |
1 files changed, 18 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index eea032da4f5..fab7fcf2e5f 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -73,8 +73,10 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte // If the tokenType exceeds the client token's type, then we have passed the resume token point. // This can happen if the client resumes from a synthetic 'high water mark' token from another // shard which happens to have the same clusterTime as an actual change on this shard. - if (tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType) { - return ResumeStatus::kSurpassedToken; + if (tokenDataFromResumedStream.tokenType != tokenDataFromClient.tokenType) { + return tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType + ? ResumeStatus::kSurpassedToken + : ResumeStatus::kCheckNextDoc; } if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) { @@ -82,8 +84,11 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte } else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) { // This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in // the applyOps was irrelevant (meaning it was an operation on a collection or DB not being - // watched). This indicates a corrupt resume token. - uasserted(50792, "Invalid resumeToken: applyOpsIndex was skipped"); + // watched). If we are looking for the resume token on a shard then this simply means that + // the resume token may be on a different shard; otherwise, it indicates a corrupt token. + uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge); + // We are running on a merging shard. Signal that we have read beyond the resume token. + return ResumeStatus::kSurpassedToken; } // It is acceptable for the stream UUID to differ from the client's, if this is a whole-database @@ -91,17 +96,23 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte // clusterTime. If the stream UUID sorts after the client's, however, then the stream is not // resumable; we are past the point in the stream where the token should have appeared. if (tokenDataFromResumedStream.uuid != tokenDataFromClient.uuid) { - // If we're not in mongos then this must be a replica set deployment, in which case we don't - // ever expect to see identical timestamps and we reject the resume attempt immediately. - return !expCtx->inMongos || tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid + // If we are running on a replica set deployment, we don't ever expect to see identical time + // stamps and applyOpsIndex but differing UUIDs, and we reject the resume attempt at once. + if (!expCtx->inMongos && !expCtx->needsMerge) { + return ResumeStatus::kSurpassedToken; + } + // Otherwise, return a ResumeStatus based on the sort-order of the client and stream UUIDs. + return tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid ? ResumeStatus::kSurpassedToken : ResumeStatus::kCheckNextDoc; } + // If all the fields match exactly, then we have found the token. if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey == tokenDataFromClient.documentKey)) { return ResumeStatus::kFoundToken; } + // At this point, we know that the tokens differ only by documentKey. The status we return will // depend on whether the stream token is logically before or after the client token. If the // latter, then we will never see the resume token and the stream cannot be resumed. However, |