/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_sources_gen.h"
namespace mongo {
/**
* The $changeStream stage is an alias for a cursor on oplog followed by a $match stage and a
* transform stage on mongod.
*/
class DocumentSourceChangeStream final {
public:
class LiteParsed final : public LiteParsedDocumentSource {
public:
static std::unique_ptr parse(const AggregationRequest& request,
const BSONElement& spec) {
return stdx::make_unique();
}
bool isChangeStream() const final {
return true;
}
stdx::unordered_set getInvolvedNamespaces() const final {
// TODO SERVER-29138: we need to communicate that this stage will need to look up
// documents from different collections.
return stdx::unordered_set();
}
// TODO SERVER-29138: Add required privileges.
PrivilegeVector requiredPrivileges(bool isMongos) const final {
return {};
}
bool allowedToForwardFromMongos() const final {
return false;
}
};
class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface {
public:
Transformation(BSONObj changeStreamSpec) : _changeStreamSpec(changeStreamSpec.getOwned()) {}
~Transformation() = default;
Document applyTransformation(const Document& input) final;
TransformerType getType() const final {
return TransformerType::kChangeStreamTransformation;
};
void optimize() final{};
Document serializeStageOptions(
boost::optional explain) const final;
DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const final;
DocumentSource::GetModPathsReturn getModifiedPaths() const final;
private:
BSONObj _changeStreamSpec;
};
// The name of the field where the document key (_id and shard key, if present) will be found
// after the transformation.
static constexpr StringData kDocumentKeyField = "documentKey"_sd;
// The name of the field where the full document will be found after the transformation. The
// full document is only present for certain types of operations, such as an insert.
static constexpr StringData kFullDocumentField = "fullDocument"_sd;
// The name of the field where the change identifier will be located after the transformation.
static constexpr StringData kIdField = "_id"_sd;
// The name of the field where the namespace of the change will be located after the
// transformation.
static constexpr StringData kNamespaceField = "ns"_sd;
// The name of the field where the type of the operation will be located after the
// transformation.
static constexpr StringData kOperationTypeField = "operationType"_sd;
// The name of this stage.
static constexpr StringData kStageName = "$changeStream"_sd;
// The name of the field where the clusterTime of the change will be located after the
// transformation. The cluster time will be located inside the change identifier, so the full
// path to the cluster time will be kIdField + "." + kClusterTimeField.
static constexpr StringData kClusterTimeField = "clusterTime"_sd;
// The name of the field where the timestamp of the change will be located after the
// transformation. The timestamp will be located inside the cluster time, so the full path
// to the timestamp will be kIdField + "." + kClusterTimeField + "." + kTimestampField.
static constexpr StringData kTimestampField = "ts"_sd;
// The different types of operations we can use for the operation type.
static constexpr StringData kUpdateOpType = "update"_sd;
static constexpr StringData kDeleteOpType = "delete"_sd;
static constexpr StringData kReplaceOpType = "replace"_sd;
static constexpr StringData kInsertOpType = "insert"_sd;
static constexpr StringData kInvalidateOpType = "invalidate"_sd;
/**
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
* to only those relevant for this $changeStream stage.
*/
static BSONObj buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume);
/**
* Parses a $changeStream stage from 'elem' and produces the $match and transformation
* stages required.
*/
static std::list> createFromBson(
BSONElement elem, const boost::intrusive_ptr& expCtx);
static boost::intrusive_ptr createTransformationStage(
BSONObj changeStreamSpec, const boost::intrusive_ptr& pExpCtx);
private:
// It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson()
// instead.
DocumentSourceChangeStream() = default;
};
} // namespace mongo