summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_check_resume_token.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp25
1 files changed, 18 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index eea032da4f5..fab7fcf2e5f 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -73,8 +73,10 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
// If the tokenType exceeds the client token's type, then we have passed the resume token point.
// This can happen if the client resumes from a synthetic 'high water mark' token from another
// shard which happens to have the same clusterTime as an actual change on this shard.
- if (tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType) {
- return ResumeStatus::kSurpassedToken;
+ if (tokenDataFromResumedStream.tokenType != tokenDataFromClient.tokenType) {
+ return tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType
+ ? ResumeStatus::kSurpassedToken
+ : ResumeStatus::kCheckNextDoc;
}
if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) {
@@ -82,8 +84,11 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
} else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) {
// This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in
// the applyOps was irrelevant (meaning it was an operation on a collection or DB not being
- // watched). This indicates a corrupt resume token.
- uasserted(50792, "Invalid resumeToken: applyOpsIndex was skipped");
+ // watched). If we are looking for the resume token on a shard then this simply means that
+ // the resume token may be on a different shard; otherwise, it indicates a corrupt token.
+ uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge);
+ // We are running on a merging shard. Signal that we have read beyond the resume token.
+ return ResumeStatus::kSurpassedToken;
}
// It is acceptable for the stream UUID to differ from the client's, if this is a whole-database
@@ -91,17 +96,23 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
// clusterTime. If the stream UUID sorts after the client's, however, then the stream is not
// resumable; we are past the point in the stream where the token should have appeared.
if (tokenDataFromResumedStream.uuid != tokenDataFromClient.uuid) {
- // If we're not in mongos then this must be a replica set deployment, in which case we don't
- // ever expect to see identical timestamps and we reject the resume attempt immediately.
- return !expCtx->inMongos || tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid
+ // If we are running on a replica set deployment, we don't ever expect to see identical time
+ // stamps and applyOpsIndex but differing UUIDs, and we reject the resume attempt at once.
+ if (!expCtx->inMongos && !expCtx->needsMerge) {
+ return ResumeStatus::kSurpassedToken;
+ }
+ // Otherwise, return a ResumeStatus based on the sort-order of the client and stream UUIDs.
+ return tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid
? ResumeStatus::kSurpassedToken
: ResumeStatus::kCheckNextDoc;
}
+
// If all the fields match exactly, then we have found the token.
if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey ==
tokenDataFromClient.documentKey)) {
return ResumeStatus::kFoundToken;
}
+
// At this point, we know that the tokens differ only by documentKey. The status we return will
// depend on whether the stream token is logically before or after the client token. If the
// latter, then we will never see the resume token and the stream cannot be resumed. However,