summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_check_resume_token.h
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-04-03 16:55:52 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2018-04-06 02:47:55 +0100
commit41084e8f0fa354a9efc28a354321200e94a2fcf6 (patch)
tree309fed55bfb25682117ca7a4c170c1699cadfbd6 /src/mongo/db/pipeline/document_source_check_resume_token.h
parent87be281e31034f80723d5299b70e7e956a48c494 (diff)
downloadmongo-41084e8f0fa354a9efc28a354321200e94a2fcf6.tar.gz
SERVER-34090 Allow resuming change stream when resume token's document key does not contain the shard key
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.h')
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h17
1 files changed, 13 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 2ce99eae29a..1c04472eb62 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -93,6 +93,14 @@ private:
class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource,
public SplittableDocumentSource {
public:
+ // Used to record the results of comparing the token data extracted from documents in the
+ // resumed stream against the client's resume token.
+ enum class ResumeStatus {
+ kFoundToken, // The stream produced a document satisfying the client resume token.
+ kCannotResume, // The stream's latest document is more recent than the resume token.
+ kCheckNextDoc // The next document produced by the stream may contain the resume token.
+ };
+
GetNextResult getNext() final;
const char* getSourceName() const final;
@@ -115,7 +123,8 @@ public:
* to ensure that each shard has enough oplog history to resume the change stream.
*/
boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return DocumentSourceShardCheckResumability::create(pExpCtx, _token.getClusterTime());
+ return DocumentSourceShardCheckResumability::create(pExpCtx,
+ _tokenFromClient.getClusterTime());
};
std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
@@ -139,7 +148,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token);
const ResumeToken& getTokenForTest() {
- return _token;
+ return _tokenFromClient;
}
private:
@@ -149,8 +158,8 @@ private:
DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx,
ResumeToken token);
- ResumeToken _token;
- bool _haveSeenResumeToken;
+ ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
+ ResumeToken _tokenFromClient;
};
} // namespace mongo