diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-11 13:48:00 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-20 22:47:27 +0000 |
commit | 5fe1af5f8e845f736f4e76d7a8d47675b6507a6c (patch) | |
tree | c8261e1671ee32e1069555e6b4fa23c6962643b5 /src/mongo/db/pipeline/document_source_check_resume_token.cpp | |
parent | 9fe77b9f6f61bc0eda2a51a98d2238017cb5f4d5 (diff) | |
download | mongo-5fe1af5f8e845f736f4e76d7a8d47675b6507a6c.tar.gz |
SERVER-38413 DSShardCheckResumability should swallow events that precede the client token during resume
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.cpp | 106 |
1 files changed, 68 insertions, 38 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 9e2a7414c70..3c6eb438ad8 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -55,9 +55,8 @@ using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus; // tokens, and that we purposefully avoid the user's requested collation if present. ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionContext>& expCtx, const Document& documentFromResumedStream, - const ResumeToken& tokenFromClient) { - // Parse both the stream doc and the client's resume token into comprehensible ResumeTokenData. - auto tokenDataFromClient = tokenFromClient.getData(); + const ResumeTokenData& tokenDataFromClient) { + // Parse the stream doc into comprehensible ResumeTokenData. auto tokenDataFromResumedStream = ResumeToken::parse(documentFromResumedStream["_id"].getDocument()).getData(); @@ -67,7 +66,14 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte // If the clusterTime differs from the client's token, this stream cannot be resumed. if (tokenDataFromResumedStream.clusterTime != tokenDataFromClient.clusterTime) { - return ResumeStatus::kCannotResume; + return ResumeStatus::kSurpassedToken; + } + + // If the tokenType exceeds the client token's type, then we have passed the resume token point. + // This can happen if the client resumes from a synthetic 'high water mark' token from another + // shard which happens to have the same clusterTime as an actual change on this shard. + if (tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType) { + return ResumeStatus::kSurpassedToken; } if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) { @@ -87,7 +93,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte // If we're not in mongos then this must be a replica set deployment, in which case we don't // ever expect to see identical timestamps and we reject the resume attempt immediately. return !expCtx->inMongos || tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid - ? ResumeStatus::kCannotResume + ? ResumeStatus::kSurpassedToken : ResumeStatus::kCheckNextDoc; } // If all the fields match exactly, then we have found the token. @@ -103,7 +109,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte const auto defaultResumeStatus = ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey > tokenDataFromClient.documentKey) - ? ResumeStatus::kCannotResume + ? ResumeStatus::kSurpassedToken : ResumeStatus::kCheckNextDoc; // If we're not running in a sharded context, we don't need to proceed any further. @@ -156,12 +162,12 @@ Value DocumentSourceEnsureResumeTokenPresent::serialize( intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx, - ResumeToken token) { + ResumeTokenData token) { return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token)); } DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( - const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token) + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() { @@ -193,7 +199,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() str::stream() << "resume of change stream was not possible, as the resume token was not found. " << documentFromResumedStream["_id"].getDocument().toString(), - _resumeStatus != ResumeStatus::kCannotResume); + _resumeStatus != ResumeStatus::kSurpassedToken); // If we reach this point, then we've seen the resume token. invariant(_resumeStatus == ResumeStatus::kFoundToken); @@ -215,34 +221,64 @@ Value DocumentSourceShardCheckResumability::serialize( intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) { - return new DocumentSourceShardCheckResumability(expCtx, ts); + // We are resuming from a point in time, not an event. Seed the stage with a high water mark. + return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts, expCtx->uuid).getData()); +} + +intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) { + return new DocumentSourceShardCheckResumability(expCtx, std::move(token)); } DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability( - const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) - : DocumentSource(expCtx), _resumeTimestamp(ts), _verifiedResumability(false) {} + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) + : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { pExpCtx->checkForInterrupt(); - auto nextInput = pSource->getNext(); - if (_verifiedResumability) - return nextInput; + if (_surpassedResumeToken) + return pSource->getNext(); - _verifiedResumability = true; - if (nextInput.isAdvanced()) { - auto doc = nextInput.getDocument(); + while (!_surpassedResumeToken) { + auto nextInput = pSource->getNext(); - auto receivedTimestamp = ResumeToken::parse(doc["_id"].getDocument()).getClusterTime(); - if (receivedTimestamp == _resumeTimestamp) { - // Pass along the document, as the DocumentSourceEnsureResumeTokenPresent stage on the - // merger will need to see it. + // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us + // from continually returning EOF in cases where the resume point has fallen off the oplog. + if (!nextInput.isAdvanced()) { + _assertOplogHasEnoughHistory(nextInput); return nextInput; } + // Determine whether the current event sorts before, equal to or after the resume token. + auto resumeStatus = + compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient); + switch (resumeStatus) { + case ResumeStatus::kCheckNextDoc: + // If the result was kCheckNextDoc, we are resumable but must swallow this event. + _verifiedOplogHasEnoughHistory = true; + continue; + case ResumeStatus::kSurpassedToken: + // In this case the resume token wasn't found; it must be on another shard. We must + // examine the oplog to ensure that its history reaches back to before the resume + // token, otherwise we may have missed events that fell off the oplog. If we can + // resume, fall through into the following case and set _surpassedResumeToken. + _assertOplogHasEnoughHistory(nextInput); + case ResumeStatus::kFoundToken: + // We found the actual token! Set _surpassedResumeToken and return the result. + _surpassedResumeToken = true; + return nextInput; + } } + MONGO_UNREACHABLE; +} - // If we make it here, we need to look up the first document in the oplog and compare it - // with the resume token. +void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory( + const GetNextResult& nextInput) { + // If we have already verified that this stream is resumable, return immediately. + if (_verifiedOplogHasEnoughHistory) { + return; + } + // Look up the first document in the oplog and compare it with the resume token's clusterTime. auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); auto matchSpec = BSON("$match" << BSONObj()); auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx); @@ -251,21 +287,15 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { uassert(40576, "Resume of change stream was not possible, as the resume point may no longer " "be in the oplog. ", - firstOplogEntry["ts"].getTimestamp() < _resumeTimestamp); + firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime); } else { - // Very unusual case: the oplog is empty. We can always resume. It should never be - // possible that the oplog is empty and we got a document matching the filter, however. - invariant(nextInput.isEOF()); - } - - // The query on the oplog above will overwrite the namespace in current op which is used in - // profiling, reset it back to the original namespace. This is a generic problem with any - // aggregation that involves sub-operations on different namespaces, and is being tracked by - // SERVER-31098. - { - stdx::lock_guard<Client> lk(*pExpCtx->opCtx->getClient()); - CurOp::get(pExpCtx->opCtx)->setNS_inlock(pExpCtx->ns.ns()); + // Very unusual case: the oplog is empty. We can always resume. However, it should never be + // possible to have obtained a document that matched the filter if the oplog is empty. + uassert(51087, + "Oplog was empty but found an event in the change stream pipeline. It should not " + "be possible for this to happen", + nextInput.isEOF()); } - return nextInput; + _verifiedOplogHasEnoughHistory = true; } } // namespace mongo |