summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-04-11 18:40:17 -0400
committerIan Boros <ian.boros@10gen.com>2018-04-12 18:48:07 -0400
commit6841ce738419923002958acc760e150769b6f615 (patch)
treec32259c195861a612b437cbcb71d7e34da4ce2ec /src
parentf98c59b381c74d2e136b2cce1c4436c02ee64741 (diff)
downloadmongo-6841ce738419923002958acc760e150769b6f615.tar.gz
SERVER-33114 make change streams unwind applyOps generated by transactions
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp275
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h56
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp164
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp423
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h129
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h24
7 files changed, 787 insertions, 285 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 88c56ab45b9..9bf8d695a42 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -271,6 +271,7 @@ pipelineeEnv.Library(
'document_source_bucket.cpp',
'document_source_bucket_auto.cpp',
'document_source_change_stream.cpp',
+ 'document_source_change_stream_transform.cpp',
'document_source_check_resume_token.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 2f7f185dcb2..1394cc7f0f8 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
@@ -73,9 +74,12 @@ constexpr StringData DocumentSourceChangeStream::kFullDocumentField;
constexpr StringData DocumentSourceChangeStream::kIdField;
constexpr StringData DocumentSourceChangeStream::kNamespaceField;
constexpr StringData DocumentSourceChangeStream::kUuidField;
+constexpr StringData DocumentSourceChangeStream::kUpdateDescriptionField;
constexpr StringData DocumentSourceChangeStream::kOperationTypeField;
constexpr StringData DocumentSourceChangeStream::kStageName;
constexpr StringData DocumentSourceChangeStream::kClusterTimeField;
+constexpr StringData DocumentSourceChangeStream::kTxnNumberField;
+constexpr StringData DocumentSourceChangeStream::kLsidField;
constexpr StringData DocumentSourceChangeStream::kUpdateOpType;
constexpr StringData DocumentSourceChangeStream::kDeleteOpType;
constexpr StringData DocumentSourceChangeStream::kReplaceOpType;
@@ -83,7 +87,6 @@ constexpr StringData DocumentSourceChangeStream::kInsertOpType;
constexpr StringData DocumentSourceChangeStream::kInvalidateOpType;
constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType;
-
namespace {
static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd;
@@ -130,15 +133,6 @@ DocumentSourceOplogMatch::DocumentSourceOplogMatch(BSONObj filter,
const intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSourceMatch(std::move(filter), expCtx) {}
-void checkValueType(const Value v, const StringData filedName, BSONType expectedType) {
- uassert(40532,
- str::stream() << "Entry field \"" << filedName << "\" should be "
- << typeName(expectedType)
- << ", found: "
- << typeName(v.getType()),
- (v.getType() == expectedType));
-}
-
namespace {
/**
* This stage is used internally for change notifications to close cursor after returning
@@ -222,7 +216,8 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
auto doc = nextInput.getDocument();
const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField;
- checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String);
+ DocumentSourceChangeStream::checkValueType(
+ doc[kOperationTypeField], kOperationTypeField, BSONType::String);
auto operationType = doc[kOperationTypeField].getString();
if (operationType == DocumentSourceChangeStream::kInvalidateOpType) {
// Pass the invalidation forward, so that it can be included in the results, or
@@ -236,6 +231,38 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
} // namespace
+void DocumentSourceChangeStream::checkValueType(const Value v,
+ const StringData filedName,
+ BSONType expectedType) {
+ uassert(40532,
+ str::stream() << "Entry field \"" << filedName << "\" should be "
+ << typeName(expectedType)
+ << ", found: "
+ << typeName(v.getType()),
+ (v.getType() == expectedType));
+}
+
+//
+// Helpers for building the oplog filter.
+//
+namespace {
+
+/**
+ * Constructs the filter which will match 'applyOps' oplog entries that are:
+ * 1) Part of a transaction
+ * 2) Have sub-entries which should be returned in the change stream
+ */
+BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) {
+ BSONObjBuilder applyOpsBuilder;
+ applyOpsBuilder.append("op", "c");
+ applyOpsBuilder.append("lsid", BSON("$exists" << true));
+ applyOpsBuilder.append("txnNumber", BSON("$exists" << true));
+ const std::string& kApplyOpsNs = "o.applyOps.ns";
+ applyOpsBuilder.appendAs(nsMatch, kApplyOpsNs);
+ return applyOpsBuilder.obj();
+}
+} // namespace
+
BSONObj DocumentSourceChangeStream::buildMatchFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp startFrom,
@@ -324,11 +351,14 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
}
auto opMatch = BSON(nsMatch["ns"] << OR(normalOpTypeMatch, chunkMigratedMatch));
+ // 3) Look for 'applyOps' which were created as part of a transaction.
+ BSONObj applyOps = getTxnApplyOpsFilter(nsMatch["ns"], nss);
+
// Match oplog entries after "start" and are either supported (1) commands or (2) operations,
// excepting those tagged "fromMigrate". Include the resume token, if resuming, so we can verify
// it was still present in the oplog.
return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom)
- << BSON(OR(opMatch, commandMatch))
+ << BSON(OR(opMatch, commandMatch, applyOps))
<< BSON("fromMigrate" << NE << true)));
}
@@ -416,6 +446,13 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
+std::string DocumentSourceChangeStream::buildAllCollectionsRegex(const NamespaceString& nss) {
+ // Match all namespaces that start with db name, followed by ".", then not followed by
+ // '$' or 'system.'
+ static const auto regexAllCollections = R"(\.(?!(\$|system\.)))";
+ return "^" + nss.db() + regexAllCollections;
+}
+
list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
// A change stream is a tailable + awaitData cursor.
@@ -542,217 +579,7 @@ intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationSt
// Mark the transformation stage as independent of any collection if the change stream is
// watching all collections in the database.
const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS();
- return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation(
- expCtx,
- stdx::make_unique<Transformation>(expCtx, changeStreamSpec),
- kStageName.toString(),
- isIndependentOfAnyCollection));
+ return intrusive_ptr<DocumentSource>(new DocumentSourceChangeStreamTransform(
+ expCtx, changeStreamSpec, isIndependentOfAnyCollection));
}
-
-Document DocumentSourceChangeStream::Transformation::applyTransformation(const Document& input) {
- // If we're executing a change stream pipeline that was forwarded from mongos, then we expect it
- // to "need merge"---we expect to be executing the shards part of a split pipeline. It is never
- // correct for mongos to pass through the change stream without splitting into into a merging
- // part executed on mongos and a shards part.
- //
- // This is necessary so that mongos can correctly handle "invalidate" and "retryNeeded" change
- // notifications. See SERVER-31978 for an example of why the pipeline must be split.
- //
- // We have to check this invariant at run-time of the change stream rather than parse time,
- // since a mongos may forward a change stream in an invalid position (e.g. in a nested $lookup
- // or $facet pipeline). In this case, mongod is responsible for parsing the pipeline and
- // throwing an error without ever executing the change stream.
- if (_expCtx->fromMongos) {
- invariant(_expCtx->needsMerge);
- }
-
- MutableDocument doc;
-
- // Extract the fields we need.
- checkValueType(input[repl::OplogEntry::kOpTypeFieldName],
- repl::OplogEntry::kOpTypeFieldName,
- BSONType::String);
- string op = input[repl::OplogEntry::kOpTypeFieldName].getString();
- Value ts = input[repl::OplogEntry::kTimestampFieldName];
- Value ns = input[repl::OplogEntry::kNamespaceFieldName];
- checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String);
- Value uuid = input[repl::OplogEntry::kUuidFieldName];
- std::vector<FieldPath> documentKeyFields;
-
- // Deal with CRUD operations and commands.
- auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op);
-
- // Ignore commands in the oplog when looking up the document key fields since a command implies
- // that the change stream is about to be invalidated (e.g. collection drop).
- if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) {
- checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData);
- // We need to retrieve the document key fields if our cache does not have an entry for this
- // UUID or if the cache entry is not definitively final, indicating that the collection was
- // unsharded when the entry was last populated.
- auto it = _documentKeyCache.find(uuid.getUuid());
- if (it == _documentKeyCache.end() || !it->second.isFinal) {
- auto docKeyFields = _expCtx->mongoProcessInterface->collectDocumentKeyFields(
- _expCtx->opCtx, uuid.getUuid());
- if (it == _documentKeyCache.end() || docKeyFields.second) {
- _documentKeyCache[uuid.getUuid()] = DocumentKeyCacheEntry(docKeyFields);
- }
- }
-
- documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields;
- }
- NamespaceString nss(ns.getString());
- Value id = input.getNestedField("o._id");
- // Non-replace updates have the _id in field "o2".
- StringData operationType;
- Value fullDocument;
- Value updateDescription;
- Value documentKey;
-
- switch (opType) {
- case repl::OpTypeEnum::kInsert: {
- operationType = kInsertOpType;
- fullDocument = input[repl::OplogEntry::kObjectFieldName];
- documentKey = Value(document_path_support::extractDocumentKeyFromDoc(
- fullDocument.getDocument(), documentKeyFields));
- break;
- }
- case repl::OpTypeEnum::kDelete: {
- operationType = kDeleteOpType;
- documentKey = input[repl::OplogEntry::kObjectFieldName];
- break;
- }
- case repl::OpTypeEnum::kUpdate: {
- if (id.missing()) {
- operationType = kUpdateOpType;
- checkValueType(input[repl::OplogEntry::kObjectFieldName],
- repl::OplogEntry::kObjectFieldName,
- BSONType::Object);
- Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument();
- Value updatedFields = opObject["$set"];
- Value removedFields = opObject["$unset"];
-
- // Extract the field names of $unset document.
- vector<Value> removedFieldsVector;
- if (removedFields.getType() == BSONType::Object) {
- auto iter = removedFields.getDocument().fieldIterator();
- while (iter.more()) {
- removedFieldsVector.push_back(Value(iter.next().first));
- }
- }
- updateDescription = Value(Document{
- {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields},
- {"removedFields", removedFieldsVector}});
- } else {
- operationType = kReplaceOpType;
- fullDocument = input[repl::OplogEntry::kObjectFieldName];
- }
- documentKey = input[repl::OplogEntry::kObject2FieldName];
- break;
- }
- case repl::OpTypeEnum::kCommand: {
- // Any command that makes it through our filter is an invalidating command such as a
- // drop.
- operationType = kInvalidateOpType;
- // Make sure the result doesn't have a document key.
- documentKey = Value();
- break;
- }
- case repl::OpTypeEnum::kNoop: {
- operationType = kNewShardDetectedOpType;
- // Generate a fake document Id for NewShardDetected operation so that we can resume
- // after this operation.
- documentKey = Value(Document{{kIdField, input[repl::OplogEntry::kObject2FieldName]}});
- break;
- }
- default: { MONGO_UNREACHABLE; }
- }
-
- // UUID should always be present except for invalidate entries. It will not be under
- // FCV 3.4, so we should close the stream as invalid.
- if (operationType != kInvalidateOpType && uuid.missing()) {
- warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get "
- "downgraded after opening the stream?";
- operationType = kInvalidateOpType;
- fullDocument = Value();
- updateDescription = Value();
- documentKey = Value();
- }
-
- // Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will
- // not appear in the output.
- ResumeTokenData resumeTokenData;
- resumeTokenData.clusterTime = ts.getTimestamp();
- resumeTokenData.documentKey = documentKey;
- if (!uuid.missing())
- resumeTokenData.uuid = uuid.getUuid();
- doc.addField(kIdField, Value(ResumeToken(resumeTokenData).toDocument()));
- doc.addField(kOperationTypeField, Value(operationType));
- doc.addField(kClusterTimeField, Value(resumeTokenData.clusterTime));
-
- // If we're in a sharded environment, we'll need to merge the results by their sort key, so add
- // that as metadata.
- if (_expCtx->needsMerge) {
- doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
- }
-
- // "invalidate" and "newShardDetected" entries have fewer fields.
- if (operationType == kInvalidateOpType || operationType == kNewShardDetectedOpType) {
- return doc.freeze();
- }
-
- doc.addField(kFullDocumentField, fullDocument);
- doc.addField(kNamespaceField, Value(Document{{"db", nss.db()}, {"coll", nss.coll()}}));
- doc.addField(kDocumentKeyField, documentKey);
-
- // Note that 'updateDescription' might be the 'missing' value, in which case it will not be
- // serialized.
- doc.addField("updateDescription", updateDescription);
- return doc.freeze();
-}
-
-Document DocumentSourceChangeStream::Transformation::serializeStageOptions(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- Document changeStreamOptions(_changeStreamSpec);
- // If we're on a mongos and no other start time is specified, we want to start at the current
- // cluster time on the mongos. This ensures all shards use the same start time.
- if (_expCtx->inMongos &&
- changeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName].missing() &&
- changeStreamOptions
- [DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName]
- .missing() &&
- changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName]
- .missing()) {
- MutableDocument newChangeStreamOptions(changeStreamOptions);
-
- // Use the current cluster time plus 1 tick since the oplog query will include all
- // operations/commands equal to or greater than the 'startAtClusterTime' timestamp. In
- // particular, avoid including the last operation that went through mongos in an attempt to
- // match the behavior of a replica set more closely.
- auto clusterTime = LogicalClock::get(_expCtx->opCtx)->getClusterTime();
- clusterTime.addTicks(1);
- newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName]
- [ResumeTokenClusterTime::kTimestampFieldName] =
- Value(clusterTime.asTimestamp());
- changeStreamOptions = newChangeStreamOptions.freeze();
- }
- return changeStreamOptions;
-}
-
-DocumentSource::GetDepsReturn DocumentSourceChangeStream::Transformation::addDependencies(
- DepsTracker* deps) const {
- deps->fields.insert(repl::OplogEntry::kOpTypeFieldName.toString());
- deps->fields.insert(repl::OplogEntry::kTimestampFieldName.toString());
- deps->fields.insert(repl::OplogEntry::kNamespaceFieldName.toString());
- deps->fields.insert(repl::OplogEntry::kUuidFieldName.toString());
- deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString());
- deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString());
- return DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL;
-}
-
-DocumentSource::GetModPathsReturn DocumentSourceChangeStream::Transformation::getModifiedPaths()
- const {
- // All paths are modified.
- return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}};
-}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 6d7ac9e3eba..64d2595604c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -86,47 +86,6 @@ public:
const NamespaceString _nss;
};
- class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface {
- public:
- Transformation(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- BSONObj changeStreamSpec)
- : _expCtx(expCtx), _changeStreamSpec(changeStreamSpec.getOwned()) {}
- ~Transformation() = default;
- Document applyTransformation(const Document& input) final;
- TransformerType getType() const final {
- return TransformerType::kChangeStreamTransformation;
- };
- void optimize() final{};
- Document serializeStageOptions(
- boost::optional<ExplainOptions::Verbosity> explain) const final;
- DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const final;
- DocumentSource::GetModPathsReturn getModifiedPaths() const final;
-
- private:
- boost::intrusive_ptr<ExpressionContext> _expCtx;
- BSONObj _changeStreamSpec;
-
- struct DocumentKeyCacheEntry {
- DocumentKeyCacheEntry() = default;
-
- DocumentKeyCacheEntry(std::pair<std::vector<FieldPath>, bool> documentKeyFieldsIn)
- : documentKeyFields(documentKeyFieldsIn.first),
- isFinal(documentKeyFieldsIn.second){};
- // Fields of the document key, in order, including "_id" and the shard key if the
- // collection is sharded. Empty until the first oplog entry with a uuid is encountered.
- // Needed for transforming 'insert' oplog entries.
- std::vector<FieldPath> documentKeyFields;
-
- // Set to true if the document key fields for this entry are definitively known and will
- // not change. This implies that either the collection has become sharded or has been
- // dropped.
- bool isFinal;
- };
-
- // Map of collection UUID to document key fields.
- std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache;
- };
-
// The name of the field where the document key (_id and shard key, if present) will be found
// after the transformation.
static constexpr StringData kDocumentKeyField = "documentKey"_sd;
@@ -142,6 +101,10 @@ public:
// transformation.
static constexpr StringData kNamespaceField = "ns"_sd;
+ // Name of the field which stores information about updates. Only applies when OperationType
+ // is "update".
+ static constexpr StringData kUpdateDescriptionField = "updateDescription"_sd;
+
// The name of the subfield of '_id' where the UUID of the namespace will be located after the
// transformation.
static constexpr StringData kUuidField = "uuid"_sd;
@@ -158,6 +121,9 @@ public:
// The name of this stage.
static constexpr StringData kStageName = "$changeStream"_sd;
+ static constexpr StringData kTxnNumberField = "txnNumber"_sd;
+ static constexpr StringData kLsidField = "lsid"_sd;
+
// The different types of operations we can use for the operation type.
static constexpr StringData kUpdateOpType = "update"_sd;
static constexpr StringData kDeleteOpType = "delete"_sd;
@@ -175,6 +141,8 @@ public:
Timestamp startFrom,
bool startFromInclusive);
+ static std::string buildAllCollectionsRegex(const NamespaceString& nss);
+
/**
* Parses a $changeStream stage from 'elem' and produces the $match and transformation
* stages required.
@@ -194,6 +162,12 @@ public:
static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj,
const BSONObj resumeToken);
+ /**
+ * Helper used by various change stream stages. Used for asserting that a certain Value of a
+ * field has a certain type. Will uassert() if the field does not have the expected type.
+ */
+ static void checkValueType(const Value v, const StringData fieldName, BSONType expectedType);
+
private:
enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster };
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 5e51059853c..45f2c3373d4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -124,7 +125,7 @@ public:
* Returns a list of stages expanded from a $changStream specification, starting with a
* DocumentSourceMock which contains a single document representing 'entry'.
*/
- vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) {
+ vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry) {
const auto spec = fromjson("{$changeStream: {}}");
list<intrusive_ptr<DocumentSource>> result =
DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
@@ -142,7 +143,7 @@ public:
ASSERT(match);
auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx());
- auto mock = DocumentSourceMock::create(D(entry.toBSON()));
+ auto mock = DocumentSourceMock::create(D(entry));
executableMatch->setSource(mock.get());
// Check the oplog entry is transformed correctly.
@@ -158,6 +159,10 @@ public:
return {mock, executableMatch, transform, closeCursor};
}
+ vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) {
+ return makeStages(entry.toBSON());
+ }
+
OplogEntry createCommand(const BSONObj& oField,
const boost::optional<UUID> uuid = boost::none,
const boost::optional<bool> fromMigrate = boost::none,
@@ -183,6 +188,40 @@ public:
}
/**
+ * Helper for running an applyOps through the pipeline, and getting all of the results.
+ */
+ std::vector<Document> getApplyOpsResults(const Document& applyOpsDoc,
+ const LogicalSessionFromClient& lsid) {
+ BSONObj applyOpsObj = applyOpsDoc.toBson();
+
+ // Create an oplog entry and then glue on an lsid and txnNumber
+ auto baseOplogEntry = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOpsObj,
+ testUuid(),
+ boost::none, // fromMigrate
+ BSONObj());
+ BSONObjBuilder builder(baseOplogEntry.toBSON());
+ builder.append("lsid", lsid.toBSON());
+ builder.append("txnNumber", 0LL);
+ BSONObj oplogEntry = builder.done();
+
+ // Create the stages and check that the documents produced matched those in the applyOps.
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry);
+ auto transform = stages[2].get();
+ invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
+
+ std::vector<Document> res;
+ auto next = transform->getNext();
+ while (next.isAdvanced()) {
+ res.push_back(next.releaseDocument());
+ next = transform->getNext();
+ }
+ return res;
+ }
+
+
+ /**
* This method is required to avoid a static initialization fiasco resulting from calling
* UUID::gen() in file static scope.
*/
@@ -191,6 +230,15 @@ public:
return *uuid_gen;
}
+ static LogicalSessionFromClient testLsid() {
+ // Required to avoid static initialization fiasco.
+ static const UUID* uuid = new UUID(UUID::gen());
+ LogicalSessionFromClient lsid{};
+ lsid.setId(*uuid);
+ return lsid;
+ }
+
+
/**
* Creates an OplogEntry with given parameters and preset defaults for this test suite.
*/
@@ -633,6 +681,116 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
checkTransformation(newShardDetected, expectedNewShardDetected);
}
+TEST_F(ChangeStreamStageTest, TransformEmptyApplyOps) {
+ Document applyOpsDoc{{"applyOps", Value{std::vector<Document>{}}}};
+
+ LogicalSessionFromClient lsid = testLsid();
+ vector<Document> results = getApplyOpsResults(applyOpsDoc, lsid);
+
+ // Should not return anything.
+ ASSERT_EQ(results.size(), 0u);
+}
+
+TEST_F(ChangeStreamStageTest, TransformNonTransactionApplyOps) {
+ BSONObj applyOpsObj = Document{{"applyOps",
+ Value{std::vector<Document>{Document{
+ {"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}}
+ .toBson();
+
+ // Don't append lsid or txnNumber
+
+ auto oplogEntry = makeOplogEntry(OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ applyOpsObj,
+ testUuid(),
+ boost::none, // fromMigrate
+ BSONObj());
+
+
+ checkTransformation(oplogEntry, boost::none);
+}
+
+TEST_F(ChangeStreamStageTest, TransformApplyOpsWithEntriesOnDifferentNs) {
+ // Doesn't use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+
+ auto otherUUID = UUID::gen();
+ Document applyOpsDoc{
+ {"applyOps",
+ Value{std::vector<Document>{
+ Document{{"op", "i"_sd},
+ {"ns", "someotherdb.collname"_sd},
+ {"ui", otherUUID},
+ {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}},
+ Document{{"op", "u"_sd},
+ {"ns", "someotherdb.collname"_sd},
+ {"ui", otherUUID},
+ {"o", Value{Document{{"$set", Value{Document{{"x", "hallo 2"_sd}}}}}}},
+ {"o2", Value{Document{{"_id", 123}}}}},
+ }}},
+ };
+ LogicalSessionFromClient lsid = testLsid();
+ vector<Document> results = getApplyOpsResults(applyOpsDoc, lsid);
+
+ // All documents should be skipped.
+ ASSERT_EQ(results.size(), 0u);
+}
+
+
+TEST_F(ChangeStreamStageTest, TransformApplyOps) {
+ // Doesn't use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+
+ Document applyOpsDoc{
+ {"applyOps",
+ Value{std::vector<Document>{
+ Document{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}},
+ Document{{"op", "u"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"$set", Value{Document{{"x", "hallo 2"_sd}}}}}}},
+ {"o2", Value{Document{{"_id", 123}}}}},
+ // Operation on another namespace which should be skipped.
+ Document{{"op", "i"_sd},
+ {"ns", "someotherdb.collname"_sd},
+ {"ui", UUID::gen()},
+ {"o", Value{Document{{"_id", 0}, {"x", "Should not read this!"_sd}}}}},
+ }}},
+ };
+ LogicalSessionFromClient lsid = testLsid();
+ vector<Document> results = getApplyOpsResults(applyOpsDoc, lsid);
+
+ // The third document should be skipped.
+ ASSERT_EQ(results.size(), 2u);
+
+ // Check that the first document is correct.
+ auto nextDoc = results[0];
+ ASSERT_EQ(nextDoc["txnNumber"].getLong(), 0LL);
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["x"].getString(), "hallo");
+ ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0);
+
+ // Check the second document.
+ nextDoc = results[1];
+ ASSERT_EQ(nextDoc["txnNumber"].getLong(), 0LL);
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kUpdateOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kDocumentKeyField]["_id"].getInt(), 123);
+ ASSERT_EQ(nextDoc[DSChangeStream::kUpdateDescriptionField]["updatedFields"]["x"].getString(),
+ "hallo 2");
+ ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0);
+
+ // The third document is skipped.
+}
+
TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
const Timestamp ts(3, 45);
const long long term = 4;
@@ -727,7 +885,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage
vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
ASSERT_EQ(allStages.size(), 3UL);
auto stage = allStages[1];
- ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(stage.get()));
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get()));
//
// Serialize the stage and confirm contents.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
new file mode 100644
index 00000000000..16301b7fcd5
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -0,0 +1,423 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_change_stream_transform.h"
+
+#include "mongo/bson/simple_bsonelement_comparator.h"
+#include "mongo/db/bson/bson_helper.h"
+#include "mongo/db/catalog/uuid_catalog.h"
+#include "mongo/db/commands/feature_compatibility_version_documentation.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/db/pipeline/change_stream_constants.h"
+#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/pipeline/resume_token.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+using boost::optional;
+using std::list;
+using std::string;
+using std::vector;
+
+namespace {
+constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
+
+bool isOpTypeRelevant(const Document& d) {
+ Value op = d["op"];
+ invariant(!op.missing());
+
+ if (op.getString() != "n") {
+ return true;
+ }
+
+ Value type = d.getNestedField("o2.type");
+ if (!type.missing() && type.getString() == "migrateChunkToNewShard") {
+ return true;
+ }
+
+ return false;
+}
+} // namespace
+
+DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ BSONObj changeStreamSpec,
+ bool isIndependentOfAnyCollection)
+ : DocumentSource(expCtx),
+ _changeStreamSpec(changeStreamSpec.getOwned()),
+ _isIndependentOfAnyCollection(isIndependentOfAnyCollection) {
+
+ if (expCtx->ns.isCollectionlessAggregateNS()) {
+ _nsRegex.emplace(DocumentSourceChangeStream::buildAllCollectionsRegex(expCtx->ns));
+ }
+}
+
+DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints(
+ Pipeline::SplitState pipeState) const {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage);
+
+ constraints.canSwapWithMatch = true;
+ constraints.canSwapWithLimit = true;
+ // This transformation could be part of a 'collectionless' change stream on an entire
+ // database or cluster, mark as independent of any collection if so.
+ constraints.isIndependentOfAnyCollection = _isIndependentOfAnyCollection;
+ return constraints;
+}
+
+void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Document& input) {
+ invariant(!_txnContext);
+
+ checkValueType(input["o"], "o", BSONType::Object);
+ Value applyOps = input.getNestedField("o.applyOps");
+ checkValueType(applyOps, "applyOps", BSONType::Array);
+ invariant(applyOps.getArrayLength() > 0);
+
+ Value lsid = input["lsid"];
+ checkValueType(lsid, "lsid", BSONType::Object);
+
+ Value txnNumber = input["txnNumber"];
+ checkValueType(txnNumber, "txnNumber", BSONType::NumberLong);
+
+ _txnContext.emplace(applyOps, lsid.getDocument(), txnNumber.getLong());
+}
+
+Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) {
+ // If we're executing a change stream pipeline that was forwarded from mongos, then we expect it
+ // to "need merge"---we expect to be executing the shards part of a split pipeline. It is never
+ // correct for mongos to pass through the change stream without splitting into into a merging
+ // part executed on mongos and a shards part.
+ //
+ // This is necessary so that mongos can correctly handle "invalidate" and "retryNeeded" change
+ // notifications. See SERVER-31978 for an example of why the pipeline must be split.
+ //
+ // We have to check this invariant at run-time of the change stream rather than parse time,
+ // since a mongos may forward a change stream in an invalid position (e.g. in a nested $lookup
+ // or $facet pipeline). In this case, mongod is responsible for parsing the pipeline and
+ // throwing an error without ever executing the change stream.
+ if (pExpCtx->fromMongos) {
+ invariant(pExpCtx->needsMerge);
+ }
+
+ MutableDocument doc;
+
+ // Extract the fields we need.
+ checkValueType(input[repl::OplogEntry::kOpTypeFieldName],
+ repl::OplogEntry::kOpTypeFieldName,
+ BSONType::String);
+ string op = input[repl::OplogEntry::kOpTypeFieldName].getString();
+ Value ts = input[repl::OplogEntry::kTimestampFieldName];
+ Value ns = input[repl::OplogEntry::kNamespaceFieldName];
+ checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String);
+ Value uuid = input[repl::OplogEntry::kUuidFieldName];
+ std::vector<FieldPath> documentKeyFields;
+
+ // Deal with CRUD operations and commands.
+ auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op);
+
+ // Ignore commands in the oplog when looking up the document key fields since a command implies
+ // that the change stream is about to be invalidated (e.g. collection drop).
+ if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) {
+ checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData);
+ // We need to retrieve the document key fields if our cache does not have an entry for this
+ // UUID or if the cache entry is not definitively final, indicating that the collection was
+ // unsharded when the entry was last populated.
+ auto it = _documentKeyCache.find(uuid.getUuid());
+ if (it == _documentKeyCache.end() || !it->second.isFinal) {
+ auto docKeyFields = pExpCtx->mongoProcessInterface->collectDocumentKeyFields(
+ pExpCtx->opCtx, uuid.getUuid());
+ if (it == _documentKeyCache.end() || docKeyFields.second) {
+ _documentKeyCache[uuid.getUuid()] = DocumentKeyCacheEntry(docKeyFields);
+ }
+ }
+
+ documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields;
+ }
+ NamespaceString nss(ns.getString());
+ Value id = input.getNestedField("o._id");
+ // Non-replace updates have the _id in field "o2".
+ StringData operationType;
+ Value fullDocument;
+ Value updateDescription;
+ Value documentKey;
+
+ switch (opType) {
+ case repl::OpTypeEnum::kInsert: {
+ operationType = DocumentSourceChangeStream::kInsertOpType;
+ fullDocument = input[repl::OplogEntry::kObjectFieldName];
+ documentKey = Value(document_path_support::extractDocumentKeyFromDoc(
+ fullDocument.getDocument(), documentKeyFields));
+ break;
+ }
+ case repl::OpTypeEnum::kDelete: {
+ operationType = DocumentSourceChangeStream::kDeleteOpType;
+ documentKey = input[repl::OplogEntry::kObjectFieldName];
+ break;
+ }
+ case repl::OpTypeEnum::kUpdate: {
+ if (id.missing()) {
+ operationType = DocumentSourceChangeStream::kUpdateOpType;
+ checkValueType(input[repl::OplogEntry::kObjectFieldName],
+ repl::OplogEntry::kObjectFieldName,
+ BSONType::Object);
+ Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument();
+ Value updatedFields = opObject["$set"];
+ Value removedFields = opObject["$unset"];
+
+ // Extract the field names of $unset document.
+ vector<Value> removedFieldsVector;
+ if (removedFields.getType() == BSONType::Object) {
+ auto iter = removedFields.getDocument().fieldIterator();
+ while (iter.more()) {
+ removedFieldsVector.push_back(Value(iter.next().first));
+ }
+ }
+ updateDescription = Value(Document{
+ {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields},
+ {"removedFields", removedFieldsVector}});
+ } else {
+ operationType = DocumentSourceChangeStream::kReplaceOpType;
+ fullDocument = input[repl::OplogEntry::kObjectFieldName];
+ }
+ documentKey = input[repl::OplogEntry::kObject2FieldName];
+ break;
+ }
+ case repl::OpTypeEnum::kCommand: {
+ if (!input.getNestedField("o.applyOps").missing()) {
+ // We should never see an applyOps inside of an applyOps that made it past the
+ // filter. This prevents more than one level of recursion.
+ invariant(!_txnContext);
+
+ initializeTransactionContext(input);
+
+ // Now call applyTransformation on the first relevant entry in the applyOps.
+ boost::optional<Document> nextDoc = extractNextApplyOpsEntry();
+ invariant(nextDoc);
+
+ return applyTransformation(*nextDoc);
+ }
+ // Any command that makes it through our filter is an invalidating command such as a
+ // drop.
+ operationType = DocumentSourceChangeStream::kInvalidateOpType;
+ // Make sure the result doesn't have a document key.
+ documentKey = Value();
+ break;
+ }
+ case repl::OpTypeEnum::kNoop: {
+ operationType = DocumentSourceChangeStream::kNewShardDetectedOpType;
+ // Generate a fake document Id for NewShardDetected operation so that we can resume
+ // after this operation.
+ documentKey = Value(Document{{DocumentSourceChangeStream::kIdField,
+ input[repl::OplogEntry::kObject2FieldName]}});
+ break;
+ }
+ default: { MONGO_UNREACHABLE; }
+ }
+
+ // UUID should always be present except for invalidate entries. It will not be under
+ // FCV 3.4, so we should close the stream as invalid.
+ if (operationType != DocumentSourceChangeStream::kInvalidateOpType && uuid.missing()) {
+ warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get "
+ "downgraded after opening the stream?";
+ operationType = DocumentSourceChangeStream::kInvalidateOpType;
+ fullDocument = Value();
+ updateDescription = Value();
+ documentKey = Value();
+ }
+
+ // Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will
+ // not appear in the output.
+ ResumeTokenData resumeTokenData;
+ if (_txnContext) {
+ // We're in the middle of unwinding an 'applyOps'.
+
+ // TODO: SERVER-34314
+ // For now we return an empty resumeToken.
+ } else {
+ resumeTokenData.clusterTime = ts.getTimestamp();
+ resumeTokenData.documentKey = documentKey;
+ if (!uuid.missing())
+ resumeTokenData.uuid = uuid.getUuid();
+ }
+
+ if (_txnContext) {
+ doc.addField(DocumentSourceChangeStream::kTxnNumberField,
+ Value(static_cast<long long>(_txnContext->txnNumber)));
+ doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnContext->lsid));
+ }
+
+ doc.addField(DocumentSourceChangeStream::kIdField,
+ Value(ResumeToken(resumeTokenData).toDocument()));
+ doc.addField(DocumentSourceChangeStream::kOperationTypeField, Value(operationType));
+ doc.addField(DocumentSourceChangeStream::kClusterTimeField, Value(resumeTokenData.clusterTime));
+
+ // If we're in a sharded environment, we'll need to merge the results by their sort key, so add
+ // that as metadata.
+ if (pExpCtx->needsMerge) {
+ doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
+ }
+
+ // "invalidate" and "newShardDetected" entries have fewer fields.
+ if (operationType == DocumentSourceChangeStream::kInvalidateOpType ||
+ operationType == DocumentSourceChangeStream::kNewShardDetectedOpType) {
+ return doc.freeze();
+ }
+
+ doc.addField(DocumentSourceChangeStream::kFullDocumentField, fullDocument);
+ doc.addField(DocumentSourceChangeStream::kNamespaceField,
+ Value(Document{{"db", nss.db()}, {"coll", nss.coll()}}));
+ doc.addField(DocumentSourceChangeStream::kDocumentKeyField, documentKey);
+
+ // Note that 'updateDescription' might be the 'missing' value, in which case it will not be
+ // serialized.
+ doc.addField("updateDescription", updateDescription);
+ return doc.freeze();
+}
+
+Value DocumentSourceChangeStreamTransform::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ Document changeStreamOptions(_changeStreamSpec);
+ // If we're on a mongos and no other start time is specified, we want to start at the current
+ // cluster time on the mongos. This ensures all shards use the same start time.
+ if (pExpCtx->inMongos &&
+ changeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName].missing() &&
+ changeStreamOptions
+ [DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName]
+ .missing() &&
+ changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName]
+ .missing()) {
+ MutableDocument newChangeStreamOptions(changeStreamOptions);
+
+ // Use the current cluster time plus 1 tick since the oplog query will include all
+ // operations/commands equal to or greater than the 'startAtClusterTime' timestamp. In
+ // particular, avoid including the last operation that went through mongos in an attempt to
+ // match the behavior of a replica set more closely.
+ auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime();
+ clusterTime.addTicks(1);
+ newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName]
+ [ResumeTokenClusterTime::kTimestampFieldName] =
+ Value(clusterTime.asTimestamp());
+ changeStreamOptions = newChangeStreamOptions.freeze();
+ }
+ return Value(Document{{getSourceName(), changeStreamOptions}});
+}
+
+DocumentSource::GetDepsReturn DocumentSourceChangeStreamTransform::getDependencies(
+ DepsTracker* deps) const {
+ deps->fields.insert(repl::OplogEntry::kOpTypeFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kTimestampFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kNamespaceFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kUuidFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString());
+ return DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL;
+}
+
+DocumentSource::GetModPathsReturn DocumentSourceChangeStreamTransform::getModifiedPaths() const {
+ // All paths are modified.
+ return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}};
+}
+
+DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ // If we're unwinding an 'applyOps' from a transaction, check if there are any documents we have
+ // stored that can be returned.
+ if (_txnContext) {
+ boost::optional<Document> next = extractNextApplyOpsEntry();
+ if (next) {
+ return applyTransformation(*next);
+ }
+ }
+
+ // Get the next input document.
+ auto input = pSource->getNext();
+ if (!input.isAdvanced()) {
+ return input;
+ }
+
+ // Apply the transform and return the document with added fields.
+ return applyTransformation(input.releaseDocument());
+}
+
+bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) {
+ if (!isOpTypeRelevant(d)) {
+ return false;
+ }
+
+ Value nsField = d["ns"];
+ invariant(!nsField.missing());
+
+ if (_nsRegex) {
+ // Match all namespaces that start with db name, followed by ".", then not followed by
+ // '$' or 'system.'
+ return _nsRegex->PartialMatch(nsField.getString());
+ }
+
+ return nsField.getString() == pExpCtx->ns.ns();
+}
+
+boost::optional<Document> DocumentSourceChangeStreamTransform::extractNextApplyOpsEntry() {
+
+ while (_txnContext && _txnContext->pos < _txnContext->arr.size()) {
+ Document d = _txnContext->arr[_txnContext->pos++].getDocument();
+ if (isDocumentRelevant(d)) {
+ return d;
+ }
+ }
+
+ _txnContext = boost::none;
+
+ return boost::none;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
new file mode 100644
index 00000000000..61e6bfb25eb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -0,0 +1,129 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
+#include "mongo/db/pipeline/field_path.h"
+
+namespace mongo {
+
+class DocumentSourceChangeStreamTransform : public DocumentSource {
+public:
+ DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ BSONObj changeStreamSpec,
+ bool isIndependentOfAnyCollection);
+ Document applyTransformation(const Document& input);
+ DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
+ DocumentSource::GetModPathsReturn getModifiedPaths() const final;
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const;
+ DocumentSource::StageConstraints constraints(Pipeline::SplitState pipeState) const final;
+ DocumentSource::GetNextResult getNext();
+ const char* getSourceName() const {
+ return DocumentSourceChangeStream::kStageName.rawData();
+ }
+
+private:
+ struct DocumentKeyCacheEntry {
+ DocumentKeyCacheEntry() = default;
+
+ DocumentKeyCacheEntry(std::pair<std::vector<FieldPath>, bool> documentKeyFieldsIn)
+ : documentKeyFields(documentKeyFieldsIn.first), isFinal(documentKeyFieldsIn.second){};
+ // Fields of the document key, in order, including "_id" and the shard key if the
+ // collection is sharded. Empty until the first oplog entry with a uuid is encountered.
+ // Needed for transforming 'insert' oplog entries.
+ std::vector<FieldPath> documentKeyFields;
+
+ // Set to true if the document key fields for this entry are definitively known and will
+ // not change. This implies that either the collection has become sharded or has been
+ // dropped.
+ bool isFinal;
+ };
+
+ /**
+ * Represents the DocumentSource's state if it's currently reading from an 'applyOps' entry
+ * which was created as part of a transaction.
+ */
+ struct TransactionContext {
+ MONGO_DISALLOW_COPYING(TransactionContext);
+
+ // The array of oplog entries from an 'applyOps' representing the transaction. Only kept
+ // around so that the underlying memory of 'arr' isn't freed.
+ Value opArray;
+
+ // Array representation of the 'opArray' field. Stored like this to avoid re-typechecking
+ // each call to next(), or copying the entire array.
+ const std::vector<Value>& arr;
+
+ // Our current place in the 'opArray'.
+ size_t pos;
+
+ // Fields that were taken from the 'applyOps' oplog entry.
+ Document lsid;
+ TxnNumber txnNumber;
+
+ TransactionContext(const Value& applyOpsVal, const Document& lsidDoc, TxnNumber n)
+ : opArray(applyOpsVal), arr(opArray.getArray()), pos(0), lsid(lsidDoc), txnNumber(n) {}
+ };
+
+ void initializeTransactionContext(const Document& input);
+
+ /**
+ * Gets the next relevant applyOps entry that should be returned. If there is none, returns
+ * empty document.
+ */
+ boost::optional<Document> extractNextApplyOpsEntry();
+
+ /**
+ * Helper for extractNextApplyOpsEntry(). Checks the namespace of the given document to see
+ * if it should be returned in the change stream.
+ */
+ bool isDocumentRelevant(const Document& d);
+
+ BSONObj _changeStreamSpec;
+
+ // Map of collection UUID to document key fields.
+ std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache;
+
+ // Regex for matching the "ns" field in applyOps sub-entries. Only used when we have a
+ // change stream on the entire DB. When watching just a single collection, this field is
+ // boost::none, and an exact string equality check is used instead.
+ boost::optional<pcrecpp::RE> _nsRegex;
+
+ // Represents if the current 'applyOps' we're unwinding, if any.
+ boost::optional<TransactionContext> _txnContext;
+
+ // Set to true if this transformation stage can be run on the collectionless namespace.
+ bool _isIndependentOfAnyCollection;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 5d0e850d3fd..477972ffdde 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -57,7 +57,6 @@ public:
kInclusionProjection,
kComputedProjection,
kReplaceRoot,
- kChangeStreamTransformation,
};
virtual ~TransformerInterface() = default;
virtual Document applyTransformation(const Document& input) = 0;
@@ -104,23 +103,14 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
GetModPathsReturn getModifiedPaths() const final;
-
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- StageConstraints constraints(
- StreamType::kStreaming,
- PositionRequirement::kNone,
- HostTypeRequirement::kNone,
- DiskUseRequirement::kNoDiskUse,
- (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
- ? FacetRequirement::kNotAllowed
- : FacetRequirement::kAllowed),
- (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
- ? TransactionRequirement::kNotAllowed
- : TransactionRequirement::kAllowed),
- (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
- ? ChangeStreamRequirement::kChangeStreamStage
- : ChangeStreamRequirement::kWhitelist));
-
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed,
+ ChangeStreamRequirement::kWhitelist);
constraints.canSwapWithMatch = true;
constraints.canSwapWithLimit = true;
// This transformation could be part of a 'collectionless' change stream on an entire