diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-04-03 16:55:52 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-04-06 02:47:55 +0100 |
commit | 41084e8f0fa354a9efc28a354321200e94a2fcf6 (patch) | |
tree | 309fed55bfb25682117ca7a4c170c1699cadfbd6 /src/mongo/db/pipeline/document_source_check_resume_token.h | |
parent | 87be281e31034f80723d5299b70e7e956a48c494 (diff) | |
download | mongo-41084e8f0fa354a9efc28a354321200e94a2fcf6.tar.gz |
SERVER-34090 Allow resuming change stream when resume token's document key does not contain the shard key
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.h')
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.h | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 2ce99eae29a..1c04472eb62 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -93,6 +93,14 @@ private: class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource, public SplittableDocumentSource { public: + // Used to record the results of comparing the token data extracted from documents in the + // resumed stream against the client's resume token. + enum class ResumeStatus { + kFoundToken, // The stream produced a document satisfying the client resume token. + kCannotResume, // The stream's latest document is more recent than the resume token. + kCheckNextDoc // The next document produced by the stream may contain the resume token. + }; + GetNextResult getNext() final; const char* getSourceName() const final; @@ -115,7 +123,8 @@ public: * to ensure that each shard has enough oplog history to resume the change stream. */ boost::intrusive_ptr<DocumentSource> getShardSource() final { - return DocumentSourceShardCheckResumability::create(pExpCtx, _token.getClusterTime()); + return DocumentSourceShardCheckResumability::create(pExpCtx, + _tokenFromClient.getClusterTime()); }; std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { @@ -139,7 +148,7 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token); const ResumeToken& getTokenForTest() { - return _token; + return _tokenFromClient; } private: @@ -149,8 +158,8 @@ private: DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token); - ResumeToken _token; - bool _haveSeenResumeToken; + ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; + ResumeToken _tokenFromClient; }; } // namespace mongo |