summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_change_stream.h
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-08-03 17:41:27 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-08-04 11:21:30 -0400
commit84f0c5215adf3b80178be96722e80255c0cc8f28 (patch)
treeabc5ad0a20db3e6e4bc77044b3dae0b687e56251 /src/mongo/db/pipeline/document_source_change_stream.h
parentbb8a59837b1be306cb42e0a3e84ffe42290f79ea (diff)
downloadmongo-84f0c5215adf3b80178be96722e80255c0cc8f28.tar.gz
SERVER-30506 Rename $changeNotification to $changeStream
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream.h')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h140
1 files changed, 140 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
new file mode 100644
index 00000000000..a0e7a79c71d
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
+
+namespace mongo {
+
+/**
+ * The $changeStream stage is an alias for a cursor on oplog followed by a $match stage and a
+ * transform stage on mongod.
+ */
+class DocumentSourceChangeStream final {
+public:
+ class LiteParsed final : public LiteParsedDocumentSource {
+ public:
+ static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
+ const BSONElement& spec) {
+ return stdx::make_unique<LiteParsed>();
+ }
+
+ bool isChangeStream() const final {
+ return true;
+ }
+
+ stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
+ // TODO SERVER-29138: we need to communicate that this stage will need to look up
+ // documents from different collections.
+ return stdx::unordered_set<NamespaceString>();
+ }
+
+ // TODO SERVER-29138: Add required privileges.
+ PrivilegeVector requiredPrivileges(bool isMongos) const final {
+ return {};
+ }
+ };
+
+ class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface {
+ public:
+ Transformation(BSONObj changeStreamSpec) : _changeStreamSpec(changeStreamSpec.getOwned()) {}
+ ~Transformation() = default;
+ Document applyTransformation(const Document& input) final;
+ TransformerType getType() const final {
+ return TransformerType::kChangeStreamTransformation;
+ };
+ void optimize() final{};
+ Document serializeStageOptions(
+ boost::optional<ExplainOptions::Verbosity> explain) const final;
+ DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const final;
+ DocumentSource::GetModPathsReturn getModifiedPaths() const final;
+
+ private:
+ BSONObj _changeStreamSpec;
+ };
+
+ // The name of the field where the document key (_id and shard key, if present) will be found
+ // after the transformation.
+ static constexpr StringData kDocumentKeyField = "documentKey"_sd;
+
+ // The name of the field where the full document will be found after the transformation. The
+ // full document is only present for certain types of operations, such as an insert.
+ static constexpr StringData kFullDocumentField = "fullDocument"_sd;
+
+ // The name of the field where the change identifier will be located after the transformation.
+ static constexpr StringData kIdField = "_id"_sd;
+
+ // The name of the field where the namespace of the change will be located after the
+ // transformation.
+ static constexpr StringData kNamespaceField = "ns"_sd;
+
+ // The name of the field where the type of the operation will be located after the
+ // transformation.
+ static constexpr StringData kOperationTypeField = "operationType"_sd;
+
+ // The name of this stage.
+ static constexpr StringData kStageName = "$changeStream"_sd;
+
+ // The name of the field where the timestamp of the change will be located after the
+ // transformation. The timestamp will be located inside the change identifier, so the full path
+ // to the timestamp will be kIdField + "." + kTimestampField.
+ static constexpr StringData kTimestmapField = "ts"_sd;
+
+ // The different types of operations we can use for the operation type.
+ static constexpr StringData kUpdateOpType = "update"_sd;
+ static constexpr StringData kDeleteOpType = "delete"_sd;
+ static constexpr StringData kReplaceOpType = "replace"_sd;
+ static constexpr StringData kInsertOpType = "insert"_sd;
+ static constexpr StringData kInvalidateOpType = "invalidate"_sd;
+
+ /**
+ * Produce the BSON object representing the filter for the $match stage to filter oplog entries
+ * to only those relevant for this $changeStream stage.
+ */
+ static BSONObj buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume);
+
+ /**
+ * Parses a $changeStream stage from 'elem' and produces the $match and transformation
+ * stages required.
+ */
+ static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ static boost::intrusive_ptr<DocumentSource> createTransformationStage(
+ BSONObj changeStreamSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ // It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson()
+ // instead.
+ DocumentSourceChangeStream() = default;
+};
+
+} // namespace mongo