summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_check_resume_token.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-02-11 13:48:00 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-02-20 22:47:27 +0000
commit5fe1af5f8e845f736f4e76d7a8d47675b6507a6c (patch)
treec8261e1671ee32e1069555e6b4fa23c6962643b5 /src/mongo/db/pipeline/document_source_check_resume_token.cpp
parent9fe77b9f6f61bc0eda2a51a98d2238017cb5f4d5 (diff)
downloadmongo-5fe1af5f8e845f736f4e76d7a8d47675b6507a6c.tar.gz
SERVER-38413 DSShardCheckResumability should swallow events that precede the client token during resume
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp106
1 files changed, 68 insertions, 38 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 9e2a7414c70..3c6eb438ad8 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -55,9 +55,8 @@ using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus;
// tokens, and that we purposefully avoid the user's requested collation if present.
ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionContext>& expCtx,
const Document& documentFromResumedStream,
- const ResumeToken& tokenFromClient) {
- // Parse both the stream doc and the client's resume token into comprehensible ResumeTokenData.
- auto tokenDataFromClient = tokenFromClient.getData();
+ const ResumeTokenData& tokenDataFromClient) {
+ // Parse the stream doc into comprehensible ResumeTokenData.
auto tokenDataFromResumedStream =
ResumeToken::parse(documentFromResumedStream["_id"].getDocument()).getData();
@@ -67,7 +66,14 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
// If the clusterTime differs from the client's token, this stream cannot be resumed.
if (tokenDataFromResumedStream.clusterTime != tokenDataFromClient.clusterTime) {
- return ResumeStatus::kCannotResume;
+ return ResumeStatus::kSurpassedToken;
+ }
+
+ // If the tokenType exceeds the client token's type, then we have passed the resume token point.
+ // This can happen if the client resumes from a synthetic 'high water mark' token from another
+ // shard which happens to have the same clusterTime as an actual change on this shard.
+ if (tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType) {
+ return ResumeStatus::kSurpassedToken;
}
if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) {
@@ -87,7 +93,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
// If we're not in mongos then this must be a replica set deployment, in which case we don't
// ever expect to see identical timestamps and we reject the resume attempt immediately.
return !expCtx->inMongos || tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid
- ? ResumeStatus::kCannotResume
+ ? ResumeStatus::kSurpassedToken
: ResumeStatus::kCheckNextDoc;
}
// If all the fields match exactly, then we have found the token.
@@ -103,7 +109,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
const auto defaultResumeStatus =
ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey >
tokenDataFromClient.documentKey)
- ? ResumeStatus::kCannotResume
+ ? ResumeStatus::kSurpassedToken
: ResumeStatus::kCheckNextDoc;
// If we're not running in a sharded context, we don't need to proceed any further.
@@ -156,12 +162,12 @@ Value DocumentSourceEnsureResumeTokenPresent::serialize(
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent>
DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx,
- ResumeToken token) {
+ ResumeTokenData token) {
return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token));
}
DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
- const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token)
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
: DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
@@ -193,7 +199,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext()
str::stream()
<< "resume of change stream was not possible, as the resume token was not found. "
<< documentFromResumedStream["_id"].getDocument().toString(),
- _resumeStatus != ResumeStatus::kCannotResume);
+ _resumeStatus != ResumeStatus::kSurpassedToken);
// If we reach this point, then we've seen the resume token.
invariant(_resumeStatus == ResumeStatus::kFoundToken);
@@ -215,34 +221,64 @@ Value DocumentSourceShardCheckResumability::serialize(
intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) {
- return new DocumentSourceShardCheckResumability(expCtx, ts);
+ // We are resuming from a point in time, not an event. Seed the stage with a high water mark.
+ return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts, expCtx->uuid).getData());
+}
+
+intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) {
+ return new DocumentSourceShardCheckResumability(expCtx, std::move(token));
}
DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
- const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts)
- : DocumentSource(expCtx), _resumeTimestamp(ts), _verifiedResumability(false) {}
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
+ : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
pExpCtx->checkForInterrupt();
- auto nextInput = pSource->getNext();
- if (_verifiedResumability)
- return nextInput;
+ if (_surpassedResumeToken)
+ return pSource->getNext();
- _verifiedResumability = true;
- if (nextInput.isAdvanced()) {
- auto doc = nextInput.getDocument();
+ while (!_surpassedResumeToken) {
+ auto nextInput = pSource->getNext();
- auto receivedTimestamp = ResumeToken::parse(doc["_id"].getDocument()).getClusterTime();
- if (receivedTimestamp == _resumeTimestamp) {
- // Pass along the document, as the DocumentSourceEnsureResumeTokenPresent stage on the
- // merger will need to see it.
+ // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us
+ // from continually returning EOF in cases where the resume point has fallen off the oplog.
+ if (!nextInput.isAdvanced()) {
+ _assertOplogHasEnoughHistory(nextInput);
return nextInput;
}
+ // Determine whether the current event sorts before, equal to or after the resume token.
+ auto resumeStatus =
+ compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
+ switch (resumeStatus) {
+ case ResumeStatus::kCheckNextDoc:
+ // If the result was kCheckNextDoc, we are resumable but must swallow this event.
+ _verifiedOplogHasEnoughHistory = true;
+ continue;
+ case ResumeStatus::kSurpassedToken:
+ // In this case the resume token wasn't found; it must be on another shard. We must
+ // examine the oplog to ensure that its history reaches back to before the resume
+ // token, otherwise we may have missed events that fell off the oplog. If we can
+ // resume, fall through into the following case and set _surpassedResumeToken.
+ _assertOplogHasEnoughHistory(nextInput);
+ case ResumeStatus::kFoundToken:
+ // We found the actual token! Set _surpassedResumeToken and return the result.
+ _surpassedResumeToken = true;
+ return nextInput;
+ }
}
+ MONGO_UNREACHABLE;
+}
- // If we make it here, we need to look up the first document in the oplog and compare it
- // with the resume token.
+void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory(
+ const GetNextResult& nextInput) {
+ // If we have already verified that this stream is resumable, return immediately.
+ if (_verifiedOplogHasEnoughHistory) {
+ return;
+ }
+ // Look up the first document in the oplog and compare it with the resume token's clusterTime.
auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
auto matchSpec = BSON("$match" << BSONObj());
auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx);
@@ -251,21 +287,15 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
uassert(40576,
"Resume of change stream was not possible, as the resume point may no longer "
"be in the oplog. ",
- firstOplogEntry["ts"].getTimestamp() < _resumeTimestamp);
+ firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime);
} else {
- // Very unusual case: the oplog is empty. We can always resume. It should never be
- // possible that the oplog is empty and we got a document matching the filter, however.
- invariant(nextInput.isEOF());
- }
-
- // The query on the oplog above will overwrite the namespace in current op which is used in
- // profiling, reset it back to the original namespace. This is a generic problem with any
- // aggregation that involves sub-operations on different namespaces, and is being tracked by
- // SERVER-31098.
- {
- stdx::lock_guard<Client> lk(*pExpCtx->opCtx->getClient());
- CurOp::get(pExpCtx->opCtx)->setNS_inlock(pExpCtx->ns.ns());
+ // Very unusual case: the oplog is empty. We can always resume. However, it should never be
+ // possible to have obtained a document that matched the filter if the oplog is empty.
+ uassert(51087,
+ "Oplog was empty but found an event in the change stream pipeline. It should not "
+ "be possible for this to happen",
+ nextInput.isEOF());
}
- return nextInput;
+ _verifiedOplogHasEnoughHistory = true;
}
} // namespace mongo