diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.h')
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.h | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 6dd3c949420..7489e156029 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -73,10 +73,6 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( @@ -97,7 +93,8 @@ private: * This stage is used internally for change streams to ensure that the resume token is in the * stream. It is not intended to be created by the user. */ -class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource { +class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource, + public NeedsMergerDocumentSource { public: // Used to record the results of comparing the token data extracted from documents in the // resumed stream against the client's resume token. @@ -122,18 +119,21 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { - MergingLogic logic; + /** + * NeedsMergerDocumentSource methods; this has to run on the merger, since the resume point + * could be at any shard. Also add a DocumentSourceShardCheckResumability stage on the shards + * pipeline to ensure that each shard has enough oplog history to resume the change stream. + */ + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return DocumentSourceShardCheckResumability::create(pExpCtx, + _tokenFromClient.getClusterTime()); + }; + + MergingLogic mergingLogic() final { // This stage must run on mongos to ensure it sees the resume token, which could have come // from any shard. We also must include a mergingPresorted $sort stage to communicate to // the AsyncResultsMerger that we need to merge the streams in a particular order. - logic.mergingStage = this; - // Also add logic to the shards to ensure that each shard has enough oplog history to resume - // the change stream. - logic.shardsStage = DocumentSourceShardCheckResumability::create( - pExpCtx, _tokenFromClient.getClusterTime()); - logic.inputSortPattern = change_stream_constants::kSortSpec; - return logic; + return {this, change_stream_constants::kSortSpec}; }; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; |