summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2021-08-03 17:09:08 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-08-03 21:46:00 +0000
commitc34976fc8f460bdc53440820a5b4d5de035de968 (patch)
tree091b0b191ca66b7918dc85faf381f571b6cb1827 /src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
parent663e90bbe862969572246b7b442845ed83a2472d (diff)
downloadmongo-c34976fc8f460bdc53440820a5b4d5de035de968.tar.gz
SERVER-56872 Optimize $match in change stream pipeline
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream_oplog_match.h')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.h43
1 files changed, 38 insertions, 5 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
index 68755f811b9..ef2e66e0153 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
@@ -42,17 +42,24 @@ public:
static constexpr StringData kStageName = "$_internalChangeStreamOplogMatch"_sd;
DocumentSourceChangeStreamOplogMatch(BSONObj filter,
+ Timestamp clusterTime,
+ bool showMigrationEvents,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceMatch(std::move(filter), expCtx) {
- // A change stream pipeline should always create a tailable + awaitData cursor.
+ : DocumentSourceMatch(std::move(filter), expCtx),
+ _clusterTime(clusterTime),
+ _showMigrationEvents(showMigrationEvents) {
expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
}
DocumentSourceChangeStreamOplogMatch(const DocumentSourceChangeStreamOplogMatch& other)
- : DocumentSourceMatch(other) {}
+ : DocumentSourceMatch(other) {
+ _clusterTime = other._clusterTime;
+ _showMigrationEvents = other._showMigrationEvents;
+ _optimizedEndOfPipeline = other._optimizedEndOfPipeline;
+ }
- virtual boost::intrusive_ptr<DocumentSourceMatch> clone() const {
- return make_intrusive<std::decay_t<decltype(*this)>>(*this);
+ boost::intrusive_ptr<DocumentSourceMatch> clone() const final {
+ return new auto(*this);
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
@@ -78,8 +85,34 @@ public:
return ChangeStreamStageSerializationInterface::serializeToValue(explain);
}
+protected:
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+
private:
+ /**
+ * This constructor is only used for deserializing from BSON, in which case there are no values
+ * for the '_clusterTime' and '_showMigrationEvents' fields. We leave those fields as
+ * boost::none and assume that they will not be needed. We also assume that optimizations have
+ * have already been applied.
+ */
+ DocumentSourceChangeStreamOplogMatch(BSONObj filter,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMatch(std::move(filter), expCtx), _optimizedEndOfPipeline(true) {
+ expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
+ }
+
Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
+
+ // Needed for re-creating the filter during optimization. Note that we do not serialize these
+ // fields. The filter in a serialized DocumentSourceOplogMatch is considered final, so there is
+ // no need to re-create it.
+ boost::optional<Timestamp> _clusterTime;
+ OptionalBool _showMigrationEvents;
+
+ // Used to avoid infinte optimization loops. Note that we do not serialize this field, because
+ // we assume that DocumentSourceOplogMatch is always serialized after optimization.
+ bool _optimizedEndOfPipeline = false;
};
} // namespace mongo