summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_check_resume_token.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_check_resume_token.h')
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h28
1 files changed, 14 insertions, 14 deletions
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 6dd3c949420..7489e156029 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -73,10 +73,6 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
@@ -97,7 +93,8 @@ private:
* This stage is used internally for change streams to ensure that the resume token is in the
* stream. It is not intended to be created by the user.
*/
-class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource {
+class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource,
+ public NeedsMergerDocumentSource {
public:
// Used to record the results of comparing the token data extracted from documents in the
// resumed stream against the client's resume token.
@@ -122,18 +119,21 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<MergingLogic> mergingLogic() final {
- MergingLogic logic;
+ /**
+ * NeedsMergerDocumentSource methods; this has to run on the merger, since the resume point
+ * could be at any shard. Also add a DocumentSourceShardCheckResumability stage on the shards
+ * pipeline to ensure that each shard has enough oplog history to resume the change stream.
+ */
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return DocumentSourceShardCheckResumability::create(pExpCtx,
+ _tokenFromClient.getClusterTime());
+ };
+
+ MergingLogic mergingLogic() final {
// This stage must run on mongos to ensure it sees the resume token, which could have come
// from any shard. We also must include a mergingPresorted $sort stage to communicate to
// the AsyncResultsMerger that we need to merge the streams in a particular order.
- logic.mergingStage = this;
- // Also add logic to the shards to ensure that each shard has enough oplog history to resume
- // the change stream.
- logic.shardsStage = DocumentSourceShardCheckResumability::create(
- pExpCtx, _tokenFromClient.getClusterTime());
- logic.inputSortPattern = change_stream_constants::kSortSpec;
- return logic;
+ return {this, change_stream_constants::kSortSpec};
};
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;