summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorArun Banala <arun.banala@mongodb.com>2022-02-08 13:44:49 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-22 12:52:38 +0000
commitaf85e60d19d1a975a7babf5300c6d336e474e40a (patch)
tree77947d7687a43d749fee2393f78c683fd050135a /src/mongo/db/pipeline
parentb6485b14e3749d1c6bc506fa5a5a3fd9b169af74 (diff)
downloadmongo-af85e60d19d1a975a7babf5300c6d336e474e40a.tar.gz
SERVER-62801 Add change stream event for create operation
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.cpp38
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.h3
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp12
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.h17
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h16
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp348
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp98
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp74
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp12
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp41
-rw-r--r--src/mongo/db/pipeline/resume_token.h19
-rw-r--r--src/mongo/db/pipeline/resume_token_test.cpp42
15 files changed, 505 insertions, 241 deletions
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
index 8ab33cce834..dcab512efb5 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/bson/bson_helper.h"
#include "mongo/db/matcher/expression_always_boolean.h"
#include "mongo/db/matcher/expression_parser.h"
+#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/change_stream_rewrite_helpers.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/pipeline.h"
@@ -104,11 +105,13 @@ std::unique_ptr<MatchExpression> buildOperationFilter(
auto renameFromEvent = BSON("o.renameCollection" << BSONRegEx(nsRegex));
auto renameToEvent =
BSON("o.renameCollection" << BSON("$exists" << true) << "o.to" << BSONRegEx(nsRegex));
+ const auto createEvent = BSON("o.create" << BSONRegEx(collRegex));
auto orCmdEvents = std::make_unique<OrMatchExpression>();
orCmdEvents->add(MatchExpressionParser::parseAndNormalize(dropEvent, expCtx));
orCmdEvents->add(MatchExpressionParser::parseAndNormalize(renameFromEvent, expCtx));
orCmdEvents->add(MatchExpressionParser::parseAndNormalize(renameToEvent, expCtx));
+ orCmdEvents->add(MatchExpressionParser::parseAndNormalize(createEvent, expCtx));
// Omit dropDatabase on single-collection streams. While the stream will be invalidated before
// it sees this event, the user will incorrectly see it if they startAfter the invalidate.
@@ -182,12 +185,20 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
// 'prevOpTime' link to another 'applyOps' command, indicating a multi-entry transaction.
BSONArrayBuilder orBuilder(applyOpsBuilder.subarrayStart("$or"));
{
- {
- BSONObjBuilder nsMatchBuilder(orBuilder.subobjStart());
- nsMatchBuilder.append(
- "o.applyOps.ns"_sd,
- BSONRegEx(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns)));
- }
+ // Regexes for full-namespace, collection, and command-namespace matching.
+ auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns);
+ auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx->ns);
+ auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx->ns);
+
+ // Match relevant CRUD events on the monitored namespaces.
+ orBuilder.append(BSON("o.applyOps.ns" << BSONRegEx(nsRegex)));
+
+ // Match relevant command events on the monitored namespaces.
+ orBuilder.append(BSON(
+ "o.applyOps" << BSON(
+ "$elemMatch" << BSON("ns" << BSONRegEx(cmdNsRegex)
+ << OR(BSON("o.create" << BSONRegEx(collRegex)))))));
+
// The default repl::OpTime is the value used to indicate a null "prevOpTime" link.
orBuilder.append(BSON(repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName
<< BSON("$ne" << repl::OpTime().toBSON())));
@@ -243,18 +254,9 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter(
}
BSONObj getMatchFilterForClassicOperationTypes() {
- return BSON(DocumentSourceChangeStream::kOperationTypeField << BSON(
- "$in" << BSON_ARRAY(DocumentSourceChangeStream::kUpdateOpType
- << DocumentSourceChangeStream::kDeleteOpType
- << DocumentSourceChangeStream::kReplaceOpType
- << DocumentSourceChangeStream::kInsertOpType
- << DocumentSourceChangeStream::kDropCollectionOpType
- << DocumentSourceChangeStream::kRenameCollectionOpType
- << DocumentSourceChangeStream::kDropDatabaseOpType
- << DocumentSourceChangeStream::kInvalidateOpType
- << DocumentSourceChangeStream::kReshardBeginOpType
- << DocumentSourceChangeStream::kReshardDoneCatchUpOpType
- << DocumentSourceChangeStream::kNewShardDetectedOpType)));
+ return BSON(DocumentSourceChangeStream::kOperationTypeField
+ << BSON("$in" << change_stream_legacy::kClassicOperationTypes));
}
+
} // namespace change_stream_filter
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.h b/src/mongo/db/pipeline/change_stream_filter_helpers.h
index 3ef643a9543..1c57c0787ae 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.h
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.h
@@ -94,8 +94,7 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch);
/**
- * Returns the match filter for the classic changestream operationTypes i.e. the operations that
- * are NOT guarded behind the 'showExpandedEvents' flag.
+ * Returns the match filter for the classic changestream operations.
*/
BSONObj getMatchFilterForClassicOperationTypes();
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index ea1df1ae1a1..d207ef22714 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -131,10 +131,16 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo
boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache(
const ResumeTokenData& tokenData) {
- if (!tokenData.documentKey.missing() && tokenData.uuid) {
- std::vector<FieldPath> docKeyFields;
- auto docKey = tokenData.documentKey.getDocument();
+ if (!tokenData.eventIdentifier.missing() && tokenData.uuid) {
+ auto docKey = tokenData.eventIdentifier.getDocument();
+
+ // Newly added events store their operationType and operationDescription as the
+ // eventIdentifier, not a documentKey.
+ if (docKey["_id"].missing()) {
+ return {};
+ }
+ std::vector<FieldPath> docKeyFields;
auto iter = docKey.fieldIterator();
while (iter.more()) {
auto fieldPair = iter.next();
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
index 0ac51bab7ac..2c30cef0a91 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
@@ -55,4 +55,21 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo
boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache(
const ResumeTokenData& data);
+/**
+ * Represents the change stream operation types that are NOT guarded behind the 'showExpandedEvents'
+ * flag.
+ */
+static const std::set<StringData> kClassicOperationTypes =
+ std::set<StringData>{DocumentSourceChangeStream::kUpdateOpType,
+ DocumentSourceChangeStream::kDeleteOpType,
+ DocumentSourceChangeStream::kReplaceOpType,
+ DocumentSourceChangeStream::kInsertOpType,
+ DocumentSourceChangeStream::kDropCollectionOpType,
+ DocumentSourceChangeStream::kRenameCollectionOpType,
+ DocumentSourceChangeStream::kDropDatabaseOpType,
+ DocumentSourceChangeStream::kInvalidateOpType,
+ DocumentSourceChangeStream::kReshardBeginOpType,
+ DocumentSourceChangeStream::kReshardDoneCatchUpOpType,
+ DocumentSourceChangeStream::kNewShardDetectedOpType};
+
} // namespace mongo::change_stream_legacy
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index d080be71902..75ee3ccf1c4 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -108,6 +108,7 @@ std::unique_ptr<MatchExpression> matchRewriteOperationType(
{"update", {{"op", "u"_sd}, {"o._id"_sd, kExistsFalse}}},
{"replace", {{"op", "u"_sd}, {"o._id"_sd, kExistsTrue}}},
{"drop", {{"op", "c"_sd}, {"o.drop"_sd, kExistsTrue}}},
+ {"create", {{"op", "c"_sd}, {"o.create"_sd, kExistsTrue}}},
{"rename", {{"op", "c"_sd}, {"o.renameCollection"_sd, kExistsTrue}}},
{"dropDatabase", {{"op", "c"_sd}, {"o.dropDatabase"_sd, kExistsTrue}}}};
@@ -207,6 +208,7 @@ boost::intrusive_ptr<Expression> exprRewriteOperationType(
fromjson("{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: 'dropDatabase'}"));
opCases.push_back(
fromjson("{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: 'rename'}"));
+ opCases.push_back(fromjson("{case: {$ne: ['$o.create', '$$REMOVE']}, then: 'create'}"));
// The default case, if nothing matches.
auto defaultCase = ExpressionConstant::create(expCtx.get(), Value())->serialize(false);
@@ -1005,6 +1007,12 @@ std::unique_ptr<MatchExpression> matchRewriteNs(
tassert(5554104, "Unexpected rewrite failure", dropNsRewrite);
cmdCases->add(std::move(dropNsRewrite));
+ // The 'create' event is rewritten to the cmdNs in 'ns' and the collection name in 'o.create'.
+ auto createNsRewrite = matchRewriteGenericNamespace(
+ expCtx, predicate, "ns"_sd, true /* nsFieldIsCmdNs */, "o.create"_sd);
+ tassert(6280101, "Unexpected rewrite failure", createNsRewrite);
+ cmdCases->add(std::move(createNsRewrite));
+
// The 'dropDatabase' event is rewritten to the cmdNs in 'ns'. It does not have a collection
// field.
auto dropDbNsRewrite =
@@ -1094,6 +1102,7 @@ boost::intrusive_ptr<Expression> exprRewriteNs(
collCases.push_back(fromjson(str::stream()
<< "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: "
<< getCollFromNSField("o.renameCollection") << "}"));
+ collCases.push_back(fromjson("{case: {$ne: ['$o.create', '$$REMOVE']}, then: '$o.create'}"));
// The default case, if nothing matches.
auto defaultCase = ExpressionConstant::create(expCtx.get(), Value())->serialize(false);
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 5db609a4627..a3dfc14d0dc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -133,6 +133,10 @@ public:
// after the transformation.
static constexpr StringData kDocumentKeyField = "documentKey"_sd;
+ // The name of the field where the operation description of the non-CRUD operations will be
+ // located. This is complementary to the 'documentKey' for CRUD operations.
+ static constexpr StringData kOperationDescriptionField = "operationDescription"_sd;
+
// The name of the field where the pre-image document will be found, if requested and available.
static constexpr StringData kFullDocumentBeforeChangeField = "fullDocumentBeforeChange"_sd;
@@ -199,10 +203,11 @@ public:
// UUID of a collection corresponding to the event (if applicable).
static constexpr StringData kCollectionUuidField = "collectionUUID"_sd;
- // Object with additional description of operation.
- static constexpr StringData kOperationDescriptionField = "operationDescription"_sd;
-
+ //
// The different types of operations we can use for the operation type.
+ //
+
+ // The classic change events.
static constexpr StringData kUpdateOpType = "update"_sd;
static constexpr StringData kDeleteOpType = "delete"_sd;
static constexpr StringData kReplaceOpType = "replace"_sd;
@@ -211,11 +216,16 @@ public:
static constexpr StringData kRenameCollectionOpType = "rename"_sd;
static constexpr StringData kDropDatabaseOpType = "dropDatabase"_sd;
static constexpr StringData kInvalidateOpType = "invalidate"_sd;
+
+ // The internal change events that are not exposed to the users.
static constexpr StringData kReshardBeginOpType = "reshardBegin"_sd;
static constexpr StringData kReshardDoneCatchUpOpType = "reshardDoneCatchUp"_sd;
// Internal op type to signal mongos to open cursors on new shards.
static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd;
+ // These events are guarded behind the 'showExpandedEvents' flag.
+ static constexpr StringData kCreateOpType = "create"_sd;
+
static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd;
static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd;
static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
index abac48c6ae8..cc3787a44ba 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
@@ -109,16 +109,16 @@ DocumentSourceChangeStreamCheckResumability::compareAgainstClientResumeToken(
}
// If all the fields match exactly, then we have found the token.
- if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey ==
- tokenDataFromClient.documentKey)) {
+ if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.eventIdentifier ==
+ tokenDataFromClient.eventIdentifier)) {
return ResumeStatus::kFoundToken;
}
- // At this point, we know that the tokens differ only by documentKey. The status we return will
- // depend on whether the stream token is logically before or after the client token. If the
+ // At this point, we know that the tokens differ only by eventIdentifier. The status we return
+ // will depend on whether the stream token is logically before or after the client token. If the
// latter, then we will never see the resume token and the stream cannot be resumed.
- return ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey >
- tokenDataFromClient.documentKey)
+ return ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.eventIdentifier >
+ tokenDataFromClient.eventIdentifier)
? ResumeStatus::kSurpassedToken
: ResumeStatus::kCheckNextDoc;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 3914a0c72da..b844b7a615a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
+#include <boost/algorithm/string/join.hpp>
#include <boost/intrusive_ptr.hpp>
#include <memory>
#include <vector>
@@ -304,7 +305,7 @@ public:
size_t txnOpIndex = 0) {
ResumeTokenData tokenData;
tokenData.clusterTime = ts;
- tokenData.documentKey = docKey;
+ tokenData.eventIdentifier = docKey;
tokenData.fromInvalidate = fromInvalidate;
tokenData.txnOpIndex = txnOpIndex;
if (!uuid.missing())
@@ -316,7 +317,8 @@ public:
* Helper for running an applyOps through the pipeline, and getting all of the results.
*/
std::vector<Document> getApplyOpsResults(const Document& applyOpsDoc,
- const LogicalSessionFromClient& lsid) {
+ const LogicalSessionFromClient& lsid,
+ BSONObj spec = kDefaultSpec) {
BSONObj applyOpsObj = applyOpsDoc.toBson();
// Create an oplog entry and then glue on an lsid and txnNumber
@@ -332,7 +334,7 @@ public:
BSONObj oplogEntry = builder.done();
// Create the stages and check that the documents produced matched those in the applyOps.
- vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, kDefaultSpec);
+ vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, spec);
auto transform = stages[3].get();
invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr);
@@ -1216,6 +1218,7 @@ TEST_F(ChangeStreamStageTest, TransformDropShowExpandedEvents) {
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
+
Document expectedInvalidate{
{DSChangeStream::kIdField,
makeResumeToken(
@@ -1228,6 +1231,33 @@ TEST_F(ChangeStreamStageTest, TransformDropShowExpandedEvents) {
checkTransformation(dropColl, expectedDrop, kShowExpandedEventsSpec, expectedInvalidate);
}
+TEST_F(ChangeStreamStageTest, TransformCreate) {
+ RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true);
+
+ OplogEntry create =
+ createCommand(BSON("create" << nss.coll() << "idIndex"
+ << BSON("v" << 2 << "key" << BSON("id" << 1)) << "name"
+ << "_id_"),
+ testUuid());
+
+ const auto expectedOpDescription = fromjson("{idIndex: {v: 2, key: {id: 1}}, name: '_id_'}");
+ Document expectedCreate{
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ ResumeTokenData::makeEventIdentifierFromOpDescription(
+ DSChangeStream::kCreateOpType, Value(expectedOpDescription)))},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kCreateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kCollectionUuidField, testUuid()},
+ {DSChangeStream::kWallTimeField, Date_t()},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kOperationDescriptionField, Value(expectedOpDescription)},
+ };
+
+ checkTransformation(create, expectedCreate, kShowExpandedEventsSpec);
+}
+
TEST_F(ChangeStreamStageTest, TransformRename) {
NamespaceString otherColl("test.bar");
OplogEntry rename =
@@ -1265,17 +1295,17 @@ TEST_F(ChangeStreamStageTest, TransformRenameShowExpandedEvents) {
Document expectedRename{
{DSChangeStream::kRenameTargetNssField,
D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
- {DSChangeStream::kOperationDescriptionField,
- D{
- {"to", D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
- {"dropTarget", dropTarget},
- }},
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kCollectionUuidField, testUuid()},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kOperationDescriptionField,
+ D{
+ {"to", D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
+ {"dropTarget", dropTarget},
+ }},
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
@@ -2250,6 +2280,60 @@ TEST_F(ChangeStreamStageTest, TransformApplyOps) {
// The third document is skipped.
}
+TEST_F(ChangeStreamStageTest, TransformApplyOpsWithCreateOperation) {
+ RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true);
+
+ // Doesn't use the checkTransformation() pattern that other tests use since we expect multiple
+ // documents to be returned from one applyOps.
+
+ Document idIndexDef = Document{{"v", 2}, {"key", D{{"_id", 1}}}};
+ Document applyOpsDoc{
+ {"applyOps",
+ Value{std::vector<Document>{
+ Document{{"op", "c"_sd},
+ {"ns", nss.db() + ".$cmd"},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"create", nss.coll()}, {"idIndex", idIndexDef}}}},
+ {"ts", Timestamp(0, 1)}},
+ Document{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}},
+ Document{
+ {"op", "c"_sd},
+ {"ns", nss.db() + ".$cmd"},
+ {"ui", UUID::gen()},
+ // Operation on another collection which should be skipped.
+ {"o", Value{Document{{"create", "otherCollection"_sd}, {"idIndex", idIndexDef}}}}},
+ }}},
+ };
+ LogicalSessionFromClient lsid = testLsid();
+ vector<Document> results = getApplyOpsResults(applyOpsDoc, lsid, kShowExpandedEventsSpec);
+
+ // The create operation should be skipped.
+ ASSERT_EQ(results.size(), 2u);
+
+ // Check that the first document is correct.
+ auto nextDoc = results[0];
+ ASSERT_EQ(nextDoc["txnNumber"].getLong(), 0LL);
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kCreateOpType);
+ ASSERT_VALUE_EQ(nextDoc[DSChangeStream::kOperationDescriptionField],
+ Value(Document{{"idIndex", idIndexDef}}));
+ ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0);
+
+ // Check the second document.
+ nextDoc = results[1];
+ ASSERT_EQ(nextDoc["txnNumber"].getLong(), 0LL);
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["x"].getString(), "hallo");
+ ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0);
+
+ // The third document is skipped.
+}
+
TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
const Timestamp ts(3, 45);
const long long term = 4;
@@ -2310,10 +2394,9 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
checkTransformation(rename, expectedRename);
}
-TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) {
- auto collSpec =
- D{{"create", "foo"_sd},
- {"idIndex", D{{"v", 2}, {"key", D{{"_id", 1}}}, {"name", "_id_"_sd}, {"ns", nss.ns()}}}};
+TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollectionWhenShowExpandedEventsOff) {
+ auto collSpec = D{{"create", "foo"_sd},
+ {"idIndex", D{{"v", 2}, {"key", D{{"_id", 1}}}, {"name", "_id_"_sd}}}};
OplogEntry createColl = createCommand(collSpec.toBson(), testUuid());
checkTransformation(createColl, boost::none);
}
@@ -4371,7 +4454,7 @@ public:
}
std::string getNsCollRegexMatchExpr(const std::string& field, const std::string& regex) {
- if (field == "$o.drop") {
+ if (field == "$o.drop" || field == "$o.create") {
return str::stream()
<< "{$expr: {$let: {vars: {oplogField: {$cond: [{ $eq: [{ $type: ['" << field
<< "']}, {$const: 'string'}]}, '" << field
@@ -4749,6 +4832,20 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeDrop) {
fromjson("{$and: [{op: {$eq: 'c'}}, {'o.drop': {$exists: true}}]}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeCreate) {
+ auto spec = fromjson("{operationType: 'create'}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: [{op: {$eq: 'c'}}, {'o.create': {$exists: true}}]}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeRename) {
auto spec = fromjson("{operationType: 'rename'}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
@@ -4816,6 +4913,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnOperationTypeSubField
TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationType) {
auto expr = BSON("operationType" << BSON("$in" << BSON_ARRAY("drop"
+ << "create"
<< "insert")));
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, getExpCtx());
ASSERT_OK(statusWithMatchExpression.getStatus());
@@ -4826,7 +4924,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationType) {
auto rewrittenPredicate = rewrittenMatchExpression->serialize();
ASSERT_BSONOBJ_EQ(rewrittenPredicate,
- BSON(OR(fromjson("{$and: [{op: {$eq: 'c'}}, {'o.drop': {$exists: true}}]}"),
+ BSON(OR(fromjson("{$and: [{op: {$eq: 'c'}}, {'o.create': {$exists: true}}]}"),
+ fromjson("{$and: [{op: {$eq: 'c'}}, {'o.drop': {$exists: true}}]}"),
fromjson("{op: {$eq: 'i'}}"))));
}
@@ -4963,7 +5062,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteExprWithOperationType) {
case: {$ne: ['$o.dropDatabase', '$$REMOVE']},
then: {$const: 'dropDatabase'}
},
- {case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$const: 'rename'}}
+ {case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$const: 'rename'}},
+ {case: {$ne: ['$o.create', '$$REMOVE']}, then: {$const: 'create'}}
],
default: '$$REMOVE'
}
@@ -5434,6 +5534,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteFullNamespaceObject) {
BSON(OR(BSON("o.renameCollection" << BSON("$eq" << ns)),
BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)),
BSON("o.drop" << BSON("$eq" << expCtx->ns.coll())))),
+ BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)),
+ BSON("o.create" << BSON("$eq" << expCtx->ns.coll())))),
BSON(AND(fromjson("{$alwaysFalse: 1}"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5453,8 +5555,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithSwappedField) {
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop.
+ fromjson("{$alwaysFalse: 1 }"), // rename.
+ fromjson("{$alwaysFalse: 1 }"), // create.
BSON(AND(fromjson("{$alwaysFalse: 1}"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5476,8 +5579,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithOnlyDbField) {
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1}"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop.
+ fromjson("{$alwaysFalse: 1 }"), // rename.
+ fromjson("{$alwaysFalse: 1 }"), // create.
BSON(AND(BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)))),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5497,8 +5601,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithOnlyCollectionField
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop.
+ fromjson("{$alwaysFalse: 1 }"), // rename.
+ fromjson("{$alwaysFalse: 1 }"), // create.
BSON(AND(fromjson("{$alwaysFalse: 1}"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5518,8 +5623,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithInvalidDbField) {
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop.
+ fromjson("{$alwaysFalse: 1 }"), // rename.
+ fromjson("{$alwaysFalse: 1 }"), // create.
BSON(AND(fromjson("{$alwaysFalse: 1}"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5539,8 +5645,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithInvalidCollField) {
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop.
+ fromjson("{$alwaysFalse: 1 }"), // rename.
+ fromjson("{$alwaysFalse: 1 }"), // create.
BSON(AND(fromjson("{$alwaysFalse: 1}"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5560,8 +5667,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithExtraField) {
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop.
+ fromjson("{$alwaysFalse: 1 }"), // rename.
+ fromjson("{$alwaysFalse: 1 }"), // create.
BSON(AND(fromjson("{$alwaysFalse: 1}"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5587,7 +5695,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithStringDbFieldPath) {
BSON("ns" << BSON("$regex" << regexNs)))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
BSON(OR(BSON("o.renameCollection" << BSON("$regex" << regexNs)),
- BSON("ns" << BSON("$eq" << cmdNs)),
+ BSON("ns" << BSON("$eq" << cmdNs)), // drop.
+ BSON("ns" << BSON("$eq" << cmdNs)), // create.
BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5613,6 +5722,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithCollectionFieldPath) {
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
BSON(OR(BSON("o.renameCollection" << BSON("$regex" << regexNs)),
BSON("o.drop" << BSON("$eq" << expCtx->ns.coll())),
+ BSON("o.create" << BSON("$eq" << expCtx->ns.coll())),
BSON(AND(fromjson("{$alwaysFalse: 1}"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5635,7 +5745,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithRegexDbFieldPath) {
fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^unit.*$)")),
- fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")), // drop.
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")), // create.
BSON(AND(fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5659,6 +5770,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithRegexCollectionFieldPath)
BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.renameCollection",
R"(^pipeline.*$)")),
fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^pipeline.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$o.create", R"(^pipeline.*$)")),
BSON(AND(fromjson("{ $alwaysFalse: 1 }"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5699,8 +5811,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithExtraDbFieldPath) {
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop
+ fromjson("{$alwaysFalse: 1 }"), // rename
+ fromjson("{$alwaysFalse: 1 }"), // create
BSON(AND(fromjson("{$alwaysFalse: 1 }"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5721,8 +5834,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithExtraCollectionFieldPath)
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop
+ fromjson("{$alwaysFalse: 1 }"), // rename
+ fromjson("{$alwaysFalse: 1 }"), // create
BSON(AND(fromjson("{$alwaysFalse: 1 }"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5743,8 +5857,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInvalidFieldPath) {
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop
+ fromjson("{$alwaysFalse: 1 }"), // rename
+ fromjson("{$alwaysFalse: 1 }"), // create
BSON(AND(fromjson("{$alwaysFalse: 1 }"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5776,7 +5891,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnDb) {
fromjson("{op: {$eq: 'c'}}"),
BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)),
BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)))),
- BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)),
+ BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)), // drop.
+ BSON("ns" << BSON("$eq" << secondCmdNs)))),
+ BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)), // create.
BSON("ns" << BSON("$eq" << secondCmdNs)))),
BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)),
BSON("ns" << BSON("$eq" << secondCmdNs)))),
@@ -5811,7 +5928,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnDb) {
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)))),
- BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), // drop.
+ BSON("ns" << BSON("$eq" << firstCmdNs)))),
+ BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), // create.
BSON("ns" << BSON("$eq" << firstCmdNs)))),
BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
BSON("ns" << BSON("$eq" << firstCmdNs)))),
@@ -5847,6 +5966,10 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnCollection)
<< "news")),
BSON("o.drop" << BSON("$eq"
<< "test")))),
+ BSON(OR(BSON("o.create" << BSON("$eq"
+ << "news")),
+ BSON("o.create" << BSON("$eq"
+ << "test")))),
BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
fromjson("{$alwaysFalse: 1}"))),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
@@ -5882,6 +6005,10 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnCollection
<< "news")),
BSON("o.drop" << BSON("$eq"
<< "test")))),
+ BSON(OR(BSON("o.create" << BSON("$eq"
+ << "news")),
+ BSON("o.create" << BSON("$eq"
+ << "test")))),
BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
fromjson("{$alwaysFalse: 1}"))),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
@@ -5909,7 +6036,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInRegexExpressionOnDb) {
BSON(OR(
BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")),
fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^news$)")))),
- BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), // drop.
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), // create.
fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
BSON(AND(BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
@@ -5938,7 +6067,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinRegexExpressionOnDb) {
BSON(OR(
BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")),
fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^news$)")))),
- BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), // drop.
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), // create.
fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
BSON(AND(BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
@@ -5969,6 +6100,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInRegexExpressionOnCollec
fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^news$)")))),
BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")),
fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^news$)")))),
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.create", R"(^test.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$o.create", R"(^news$)")))),
BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -5997,6 +6130,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinRegexExpressionOnColle
fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^news$)")))),
BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")),
fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^news$)")))),
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.create", R"(^test.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$o.create", R"(^news$)")))),
BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
}
@@ -6028,7 +6163,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnDbWithRegex
BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")))),
BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
- fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), // drop.
+ BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), // create.
BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
@@ -6062,7 +6199,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnDbWithRege
BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")))),
BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
- fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), // drop.
+ BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), // create.
BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
@@ -6085,20 +6224,23 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnCollectionW
auto rewrittenPredicate = rewrittenMatchExpression->serialize();
ASSERT_BSONOBJ_EQ(
rewrittenPredicate,
- BSON(OR(
- BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
- BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
- fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))),
- BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
- fromjson(getNsCollRegexMatchExpr("$o.renameCollection",
- R"(^test.*$)")))),
- BSON(OR(BSON("o.drop" << BSON("$eq"
- << "news")),
- fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))),
- BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
- fromjson("{$alwaysFalse: 1}"))),
- fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsCollRegexMatchExpr("$o.renameCollection",
+ R"(^test.*$)")))),
+ BSON(OR(BSON("o.drop" << BSON("$eq"
+ << "news")),
+ fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))),
+ BSON(OR(BSON("o.create" << BSON("$eq"
+ << "news")),
+ fromjson(getNsCollRegexMatchExpr("$o.create", R"(^test.*$)")))),
+ BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{$alwaysFalse: 1}"))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
TEST_F(ChangeStreamRewriteTest,
@@ -6123,16 +6265,20 @@ TEST_F(ChangeStreamRewriteTest,
BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))),
- BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
- fromjson(getNsCollRegexMatchExpr("$o.renameCollection",
- R"(^test.*$)")))),
- BSON(OR(BSON("o.drop" << BSON("$eq"
- << "news")),
- fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))),
- BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
- fromjson("{$alwaysFalse: 1}"))),
- fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsCollRegexMatchExpr("$o.renameCollection",
+ R"(^test.*$)")))),
+ BSON(OR(BSON("o.drop" << BSON("$eq"
+ << "news")),
+ fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))),
+ BSON(OR(BSON("o.create" << BSON("$eq"
+ << "news")),
+ fromjson(getNsCollRegexMatchExpr("$o.create", R"(^test.*$)")))),
+ BSON(AND(
+ BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
}
TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInExpressionOnInvalidDb) {
@@ -6194,8 +6340,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyInExpression) {
rewrittenPredicate,
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // rename.
+ fromjson("{$alwaysFalse: 1 }"), // drop.
+ fromjson("{$alwaysFalse: 1 }"), // create.
BSON(AND(fromjson("{$alwaysFalse: 1 }"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
}
@@ -6216,8 +6363,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyNinExpression) {
BSON(NOR(
BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
BSON(AND(fromjson("{op: {$eq: 'c'}}"),
- BSON(OR(fromjson("{$alwaysFalse: 1 }"),
- fromjson("{$alwaysFalse: 1 }"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"), // rename.
+ fromjson("{$alwaysFalse: 1 }"), // drop.
+ fromjson("{$alwaysFalse: 1 }"), // create.
BSON(AND(fromjson("{$alwaysFalse: 1 }"),
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
}
@@ -6233,17 +6381,23 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObject) {
expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
ASSERT(rewrittenMatchExpression);
- auto case1 =
+ auto caseCRUD =
"{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: "
"{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
- "-1}]}}";
- auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
- auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
- auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
- auto case5 =
+ "-1}]}}"s;
+ auto caseNotCmd = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
+ auto caseDrop = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
+ auto caseDropDatabase = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
+ auto caseRenameCollection =
"{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: "
"['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
"-1}]}}";
+ auto caseCreate = "{case: {$ne: ['$o.create', '$$REMOVE']}, then: '$o.create'}";
+
+ const auto cases = boost::algorithm::join(
+ std::vector<std::string>{
+ caseCRUD, caseNotCmd, caseDrop, caseDropDatabase, caseRenameCollection, caseCreate},
+ ",");
auto expectedExpr = fromjson(
"{"
@@ -6260,7 +6414,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObject) {
" coll: {"
" $switch: {"
" branches: ["s +
- " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 +
+ cases +
" ], default: '$$REMOVE'}}}}}, "
" {db: {$const: 'unittests' }, coll: {$const: 'pipeline_test'}}]}"
"}");
@@ -6279,17 +6433,23 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObjectWithOnlyDb) {
expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
ASSERT(rewrittenMatchExpression);
- auto case1 =
+ auto caseCRUD =
"{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: "
"{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
- "-1}]}}";
- auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
- auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
- auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
- auto case5 =
+ "-1}]}}"s;
+ auto caseNotCmd = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
+ auto caseDrop = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
+ auto caseDropDatabase = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
+ auto caseRenameCollection =
"{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: "
"['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
"-1}]}}";
+ auto caseCreate = "{case: {$ne: ['$o.create', '$$REMOVE']}, then: '$o.create'}";
+
+ const auto cases = boost::algorithm::join(
+ std::vector<std::string>{
+ caseCRUD, caseNotCmd, caseDrop, caseDropDatabase, caseRenameCollection, caseCreate},
+ ",");
auto expectedExpr = fromjson(
"{"
@@ -6306,7 +6466,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObjectWithOnlyDb) {
" coll: {"
" $switch: {"
" branches: ["s +
- " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 +
+ cases +
" ], default: '$$REMOVE'}}}}}, "
" {db: {$const: 'unittests' }}]}"
"}");
@@ -6353,17 +6513,23 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnCollFieldPath) {
expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
ASSERT(rewrittenMatchExpression);
- auto case1 =
+ auto caseCRUD =
"{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: "
"{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
- "-1}]}}";
- auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
- auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
- auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
- auto case5 =
+ "-1}]}}"s;
+ auto caseNotCmd = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
+ auto caseDrop = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
+ auto caseDropDatabase = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
+ auto caseRenameCollection =
"{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: "
"['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
"-1}]}}";
+ auto caseCreate = "{case: {$ne: ['$o.create', '$$REMOVE']}, then: '$o.create'}";
+
+ const auto cases = boost::algorithm::join(
+ std::vector<std::string>{
+ caseCRUD, caseNotCmd, caseDrop, caseDropDatabase, caseRenameCollection, caseCreate},
+ ",");
auto expectedExpr = fromjson(
"{"
@@ -6378,9 +6544,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnCollFieldPath) {
" in: {"
" $switch: {"
" branches: ["s +
- " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 +
+ cases +
" ], default: '$$REMOVE'}}}}, "
- " {$const: 'pipeline_test'}]}"
+ " {$const: 'pipeline_test'}]}"
"}");
auto rewrittenPredicate = rewrittenMatchExpression->serialize();
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 7f90cfb5074..1f2fc8184cb 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/pipeline/change_stream_document_diff_parser.h"
+#include "mongo/db/pipeline/change_stream_filter_helpers.h"
#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/pipeline/document_path_support.h"
@@ -64,6 +65,34 @@ using std::vector;
namespace {
constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
+
+Document copyDocExceptFields(const Document& source, const std::set<StringData>& fieldNames) {
+ MutableDocument doc(source);
+ for (auto fieldName : fieldNames) {
+ doc.remove(fieldName);
+ }
+ return doc.freeze();
+}
+
+ResumeTokenData makeResumeToken(Value ts,
+ Value txnOpIndex,
+ Value uuid,
+ StringData operationType,
+ Value documentKey,
+ Value opDescription) {
+ ResumeTokenData resumeTokenData;
+ resumeTokenData.clusterTime = ts.getTimestamp();
+ if (!uuid.missing()) {
+ resumeTokenData.uuid = uuid.getUuid();
+ }
+ if (!txnOpIndex.missing()) {
+ resumeTokenData.txnOpIndex = txnOpIndex.getLong();
+ }
+ resumeTokenData.eventIdentifier =
+ ResumeToken::makeEventIdentifier(operationType, documentKey, opDescription);
+
+ return resumeTokenData;
+}
} // namespace
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamTransform,
@@ -134,26 +163,6 @@ StageConstraints DocumentSourceChangeStreamTransform::constraints(
return constraints;
}
-ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts,
- Value uuid,
- Value documentKey,
- Value txnOpIndex) {
- ResumeTokenData resumeTokenData;
-
- resumeTokenData.clusterTime = ts.getTimestamp();
- resumeTokenData.documentKey = documentKey;
-
- if (!uuid.missing()) {
- resumeTokenData.uuid = uuid.getUuid();
- }
-
- if (!txnOpIndex.missing()) {
- resumeTokenData.txnOpIndex = txnOpIndex.getLong();
- }
-
- return resumeTokenData;
-}
-
Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) {
// If we're executing a change stream pipeline that was forwarded from mongos, then we expect it
// to "need merge"---we expect to be executing the shards part of a split pipeline. It is never
@@ -193,6 +202,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
Value fullDocument;
Value updateDescription;
Value documentKey;
+ Value operationDescription;
switch (opType) {
case repl::OpTypeEnum::kInsert: {
@@ -289,20 +299,20 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
break;
}
case repl::OpTypeEnum::kCommand: {
- if (!input.getNestedField("o.drop").missing()) {
+ const auto oField = input[repl::OplogEntry::kObjectFieldName].getDocument();
+ if (auto nssField = oField.getField("drop"); !nssField.missing()) {
operationType = DocumentSourceChangeStream::kDropCollectionOpType;
// The "o.drop" field will contain the actual collection name.
- nss = NamespaceString(nss.db(), input.getNestedField("o.drop").getString());
- } else if (!input.getNestedField("o.renameCollection").missing()) {
+ nss = NamespaceString(nss.db(), nssField.getString());
+ } else if (auto nssField = oField.getField("renameCollection"); !nssField.missing()) {
operationType = DocumentSourceChangeStream::kRenameCollectionOpType;
// The "o.renameCollection" field contains the namespace of the original collection.
- nss = NamespaceString(input.getNestedField("o.renameCollection").getString());
+ nss = NamespaceString(nssField.getString());
- // The "o.to" field contains the target namespace for the rename.
- const auto renameTargetNss =
- NamespaceString(input.getNestedField("o.to").getString());
+ // The "to" field contains the target namespace for the rename.
+ const auto renameTargetNss = NamespaceString(oField["to"].getString());
const Value renameTarget(Document{
{"db", renameTargetNss.db()},
{"coll", renameTargetNss.coll()},
@@ -315,27 +325,22 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
// If 'showExpandedEvents' is set, include full details of the rename in
// 'operationDescription'.
if (_changeStreamSpec.getShowExpandedEvents()) {
- MutableDocument operationDescription;
- operationDescription.addField(DocumentSourceChangeStream::kRenameTargetNssField,
- renameTarget);
-
- // If present, 'dropTarget' is the UUID of the collection that previously owned
- // the target namespace and was dropped during the rename operation.
- const auto dropTarget = input.getNestedField("o.dropTarget");
- if (!dropTarget.missing()) {
- checkValueType(dropTarget, "o.dropTarget", BSONType::BinData);
- operationDescription.addField("dropTarget", dropTarget);
- }
-
- doc.addField(DocumentSourceChangeStream::kOperationDescriptionField,
- operationDescription.freezeToValue());
+ MutableDocument opDescBuilder(
+ copyDocExceptFields(oField, {"renameCollection"_sd, "stayTemp"_sd}));
+ opDescBuilder.setField(DocumentSourceChangeStream::kRenameTargetNssField,
+ renameTarget);
+ operationDescription = opDescBuilder.freezeToValue();
}
- } else if (!input.getNestedField("o.dropDatabase").missing()) {
+ } else if (!oField.getField("dropDatabase").missing()) {
operationType = DocumentSourceChangeStream::kDropDatabaseOpType;
// Extract the database name from the namespace field and leave the collection name
// empty.
nss = NamespaceString(nss.db());
+ } else if (auto nssField = oField.getField("create"); !nssField.missing()) {
+ operationType = DocumentSourceChangeStream::kCreateOpType;
+ nss = NamespaceString(nss.db(), nssField.getString());
+ operationDescription = Value(copyDocExceptFields(oField, {"create"_sd}));
} else {
// All other commands will invalidate the stream.
operationType = DocumentSourceChangeStream::kInvalidateOpType;
@@ -402,7 +407,8 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
}
// Generate the resume token. Note that only 'ts' is always guaranteed to be present.
- auto resumeTokenData = getResumeToken(ts, uuid, documentKey, txnOpIndex);
+ auto resumeTokenData =
+ makeResumeToken(ts, txnOpIndex, uuid, operationType, documentKey, operationDescription);
auto resumeToken = ResumeToken(resumeTokenData).toDocument();
doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken));
@@ -463,7 +469,13 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
operationType == DocumentSourceChangeStream::kDropDatabaseOpType
? Value(Document{{"db", nss.db()}})
: Value(Document{{"db", nss.db()}, {"coll", nss.coll()}}));
+
+ // The event may have a documentKey OR an operationDescription, but not both. We already
+ // validated this while creating the resume token.
doc.addField(DocumentSourceChangeStream::kDocumentKeyField, std::move(documentKey));
+ if (_changeStreamSpec.getShowExpandedEvents()) {
+ doc.addField(DocumentSourceChangeStream::kOperationDescriptionField, operationDescription);
+ }
// Note that the update description field might be the 'missing' value, in which case it will
// not be serialized.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index d9dcede8598..67bcdcdeecb 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -73,11 +73,6 @@ private:
DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx,
DocumentSourceChangeStreamSpec spec);
- /**
- * Helper used for determining what resume token to return.
- */
- ResumeTokenData getResumeToken(Value ts, Value uuid, Value documentKey, Value txnOpIndex);
-
DocumentSourceChangeStreamSpec _changeStreamSpec;
// Records the documentKey fields from the client's resume token, if present.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
index 5f91cab0d6a..40603a0b1d7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
@@ -45,12 +45,43 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamUnwindTransaction,
DocumentSourceChangeStreamUnwindTransaction::createFromBson,
true);
+namespace change_stream_filter {
+/**
+ * Build a filter, similar to the optimized oplog filter, designed to reject individual transaction
+ * entries that we know would eventually get rejected by the 'userMatch' filter if they continued
+ * through the rest of the pipeline. We must also adjust the filter slightly for user rewrites, as
+ * events within a transaction do not have certain fields that are common to other oplog entries.
+ *
+ * NB: The new filter may contain references to strings in the BSONObj that 'userMatch' originated
+ * from. Callers that keep the new filter long-term should serialize and re-parse it to guard
+ * against the possibility of stale string references.
+ */
+std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch) {
+ // The transaction unwind filter is the same as the operation filter applied to the oplog. This
+ // includes a namespace filter, which ensures that it will discard all documents that would be
+ // filtered out by the default 'ns' filter this stage gets initialized with.
+ auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr));
+
+ // Attempt to rewrite the user's filter and combine it with the standard operation filter. We do
+ // this separately because we need to exclude certain fields from the user's filters. Unwound
+ // transaction events do not have these fields until we populate them from the commitTransaction
+ // event. We already applied these predicates during the oplog scan, so we know that they match.
+ static const std::set<std::string> excludedFields = {"clusterTime", "lsid", "txnNumber"};
+ if (auto rewrittenMatch =
+ change_stream_rewrite::rewriteFilterForFields(expCtx, userMatch, {}, excludedFields)) {
+ unwindFilter->add(std::move(rewrittenMatch));
+ }
+ return MatchExpression::optimize(std::move(unwindFilter));
+}
+} // namespace change_stream_filter
+
boost::intrusive_ptr<DocumentSourceChangeStreamUnwindTransaction>
DocumentSourceChangeStreamUnwindTransaction::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- auto filter =
- BSON("ns" << BSONRegEx(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns)));
- return new DocumentSourceChangeStreamUnwindTransaction(std::move(filter), expCtx);
+ std::unique_ptr<MatchExpression> matchExpr =
+ change_stream_filter::buildUnwindTransactionFilter(expCtx, nullptr);
+ return new DocumentSourceChangeStreamUnwindTransaction(matchExpr->serialize(), expCtx);
}
boost::intrusive_ptr<DocumentSourceChangeStreamUnwindTransaction>
@@ -394,37 +425,6 @@ void DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::
}
}
-namespace change_stream_filter {
-/**
- * Build a filter, similar to the optimized oplog filter, designed to reject individual transaction
- * entries that we know would eventually get rejected by the 'userMatch' filter if they continued
- * through the rest of the pipeline. We must also adjust the filter slightly for user rewrites, as
- * events within a transaction do not have certain fields that are common to other oplog entries.
- *
- * NB: The new filter may contain references to strings in the BSONObj that 'userMatch' originated
- * from. Callers that keep the new filter long-term should serialize and re-parse it to guard
- * against the possibility of stale string references.
- */
-std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch) {
- // The transaction unwind filter is the same as the operation filter applied to the oplog. This
- // includes a namespace filter, which ensures that it will discard all documents that would be
- // filtered out by the default 'ns' filter this stage gets initialized with.
- auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr));
-
- // Attempt to rewrite the user's filter and combine it with the standard operation filter. We do
- // this separately because we need to exclude certain fields from the user's filters. Unwound
- // transaction events do not have these fields until we populate them from the commitTransaction
- // event. We already applied these predicates during the oplog scan, so we know that they match.
- static const std::set<std::string> excludedFields = {"clusterTime", "lsid", "txnNumber"};
- if (auto rewrittenMatch =
- change_stream_rewrite::rewriteFilterForFields(expCtx, userMatch, {}, excludedFields)) {
- unwindFilter->add(std::move(rewrittenMatch));
- }
- return MatchExpression::optimize(std::move(unwindFilter));
-}
-} // namespace change_stream_filter
-
Pipeline::SourceContainer::iterator DocumentSourceChangeStreamUnwindTransaction::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
tassert(5687205, "Iterator mismatch during optimization", *itr == this);
@@ -441,12 +441,6 @@ Pipeline::SourceContainer::iterator DocumentSourceChangeStreamUnwindTransaction:
return nextChangeStreamStageItr;
}
- // We never expect to apply these optimizations more than once. The filter should be in its
- // default state.
- tassert(5902200,
- "Multiple optimizations attempted",
- _filter.nFields() == 1 && _filter.firstElementFieldNameStringData() == "ns");
-
// Seek to the stage that immediately follows the change streams stages.
itr = std::find_if_not(itr, container->end(), [](const auto& stage) {
return stage->constraints().isChangeStreamStage();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 7c025a92766..afa2f70a422 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -535,7 +535,7 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumen
TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject) {
// Verify that a resume token whose documentKey is not a valid object will neither succeed nor
- // cause an invariant when we perform the relaxed documentKey._id check when running in a
+ // cause an invariant when we perform the relaxed eventIdentifier._id check when running in a
// sharded context.
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
@@ -551,8 +551,8 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject)
TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) {
// Verify that a resume token whose documentKey omits the _id field will neither succeed nor
- // cause an invariant when we perform the relaxed documentKey._id, even when compared against an
- // artificial stream token whose _id is also missing.
+ // cause an invariant when we perform the relaxed eventIdentifier._id, even when compared
+ // against an artificial stream token whose _id is also missing.
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
@@ -607,7 +607,7 @@ TEST_F(CheckResumeTokenTest,
ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData();
ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, resumeTimestamp);
- ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey);
+ ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.eventIdentifier.getDocument(), expectedDocKey);
}
TEST_F(CheckResumeTokenTest,
@@ -674,7 +674,7 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn
ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData();
ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, firstEventAfter);
- ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey);
+ ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.eventIdentifier.getDocument(), expectedDocKey);
}
TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInvalidate) {
@@ -750,7 +750,7 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData();
ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, resumeTimestamp);
- ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey);
+ ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.eventIdentifier.getDocument(), expectedDocKey);
}
/**
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index 5d3a5e78d4c..cc612288fa9 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -37,6 +37,7 @@
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/exec/document_value/value_comparator.h"
+#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/storage/key_string.h"
#include "mongo/util/hex.h"
@@ -61,7 +62,7 @@ bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
return clusterTime == other.clusterTime && version == other.version &&
tokenType == other.tokenType && txnOpIndex == other.txnOpIndex &&
fromInvalidate == other.fromInvalidate && uuid == other.uuid &&
- (Value::compare(this->documentKey, other.documentKey, nullptr) == 0);
+ (Value::compare(this->eventIdentifier, other.eventIdentifier, nullptr) == 0);
}
std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) {
@@ -75,7 +76,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) {
out << ", fromInvalidate: " << static_cast<bool>(tokenData.fromInvalidate);
}
out << ", uuid: " << tokenData.uuid;
- out << ", documentKey: " << tokenData.documentKey << "}";
+ out << ", eventIdentifier: " << tokenData.eventIdentifier << "}";
return out;
}
@@ -96,7 +97,7 @@ ResumeToken::ResumeToken(const Document& resumeDoc) {
}
// We encode the resume token as a KeyString with the sequence:
-// clusterTime, version, txnOpIndex, fromInvalidate, uuid, documentKey Only the clusterTime,
+// clusterTime, version, txnOpIndex, fromInvalidate, uuid, eventIdentifier Only the clusterTime,
// version, txnOpIndex, and fromInvalidate are required.
ResumeToken::ResumeToken(const ResumeTokenData& data) {
BSONObjBuilder builder;
@@ -110,13 +111,13 @@ ResumeToken::ResumeToken(const ResumeTokenData& data) {
builder.appendBool("", data.fromInvalidate);
}
uassert(50788,
- "Unexpected resume token with a documentKey but no UUID",
- data.uuid || data.documentKey.missing());
+ "Unexpected resume token with a eventIdentifier but no UUID",
+ data.uuid || data.eventIdentifier.missing());
if (data.uuid) {
data.uuid->appendToBuilder(&builder, "");
}
- data.documentKey.addToBsonObj(&builder, "");
+ data.eventIdentifier.addToBsonObj(&builder, "");
auto keyObj = builder.obj();
KeyString::Builder encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj()));
_hexKeyString = hexblob::encode(encodedToken.getBuffer(), encodedToken.getSize());
@@ -130,9 +131,10 @@ bool ResumeToken::operator==(const ResumeToken& other) const {
// '_hexKeyString' is enough to determine equality. The type bits are used to unambiguously
// re-construct the original data, but we do not expect any two resume tokens to have the same
// data and different type bits, since that would imply they have (1) the same timestamp and (2)
- // the same documentKey (possibly different types). This should not be possible because
- // documents with the same documentKey should be on the same shard and therefore should have
- // different timestamps.
+ // the same eventIdentifier fields and values, but with different types. Change events with the
+ // same eventIdentifier are either (1) on the same shard in the case of CRUD events, which
+ // implies that they must have different timestamps; or (2) refer to the same logical event on
+ // different shards, in the case of non-CRUD events.
return _hexKeyString == other._hexKeyString;
}
@@ -208,15 +210,15 @@ ResumeTokenData ResumeToken::getData() const {
result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean());
}
- // The UUID and documentKey are not required.
+ // The UUID and eventIdentifier are not required.
if (!i.more()) {
return result;
}
- // The UUID comes first, then the documentKey.
+ // The UUID comes first, then the eventIdentifier.
result.uuid = uassertStatusOK(UUID::parse(i.next()));
if (i.more()) {
- result.documentKey = Value(i.next());
+ result.eventIdentifier = Value(i.next());
}
uassert(40646, "invalid oversized resume token", !i.more());
@@ -239,4 +241,19 @@ bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) {
return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime, tokenData.uuid);
}
+Value ResumeToken::makeEventIdentifier(StringData opType, Value documentKey, Value opDescription) {
+ tassert(6280100,
+ "both documentKey and operationDescription cannot be present for an event",
+ documentKey.missing() || opDescription.missing());
+
+ // For classic change events, the eventIdentifier is always the documentKey, even if missing.
+ if (change_stream_legacy::kClassicOperationTypes.count(opType)) {
+ return documentKey;
+ }
+
+ // For an expanded event, the eventIdentifier is its operation type and description.
+ return Value(
+ Document{{"operationType"_sd, opType}, {"operationDescription"_sd, opDescription}});
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index b5a7bddb218..99c018ab20a 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -62,18 +62,23 @@ struct ResumeTokenData {
int versionIn,
size_t txnOpIndexIn,
const boost::optional<UUID>& uuidIn,
- Value documentKeyIn)
+ Value eventIdentifierIn)
: clusterTime(clusterTimeIn),
version(versionIn),
txnOpIndex(txnOpIndexIn),
uuid(uuidIn),
- documentKey(std::move(documentKeyIn)){};
+ eventIdentifier(std::move(eventIdentifierIn)){};
bool operator==(const ResumeTokenData& other) const;
bool operator!=(const ResumeTokenData& other) const {
return !(*this == other);
}
+ static Value makeEventIdentifierFromOpDescription(StringData opType, Value opDescription) {
+ return Value(
+ Document{{"operationType"_sd, opType}, {"operationDescription"_sd, opDescription}});
+ }
+
Timestamp clusterTime;
int version = 1;
TokenType tokenType = TokenType::kEventToken;
@@ -87,7 +92,10 @@ struct ResumeTokenData {
// notification itself.
FromInvalidate fromInvalidate = FromInvalidate::kNotFromInvalidate;
boost::optional<UUID> uuid;
- Value documentKey;
+
+ // The eventIdentifier can be either be a document key for CRUD operations, or a more
+ // descriptive operation details for non-CRUD operations.
+ Value eventIdentifier;
};
std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData);
@@ -121,6 +129,11 @@ public:
static ResumeToken parse(const Document& document);
/**
+ * Generates an appropriate event identifier for the given operationType.
+ */
+ static Value makeEventIdentifier(StringData opType, Value documentKey, Value opDescription);
+
+ /**
* Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey.
*/
static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime);
diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp
index 88be9cdc940..7e893cf2899 100644
--- a/src/mongo/db/pipeline/resume_token_test.cpp
+++ b/src/mongo/db/pipeline/resume_token_test.cpp
@@ -47,9 +47,10 @@ namespace {
TEST(ResumeToken, EncodesFullTokenFromData) {
Timestamp ts(1000, 2);
UUID testUuid = UUID::gen();
- Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
+ Document eventIdentifier{{"_id"_sd, "stuff"_sd},
+ {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
- ResumeTokenData resumeTokenDataIn(ts, 0, 0, testUuid, Value(documentKey));
+ ResumeTokenData resumeTokenDataIn(ts, 0, 0, testUuid, Value(eventIdentifier));
ResumeToken token(resumeTokenDataIn);
ResumeTokenData tokenData = token.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
@@ -68,9 +69,10 @@ TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) {
TEST(ResumeToken, ShouldRoundTripThroughHexEncoding) {
Timestamp ts(1000, 2);
UUID testUuid = UUID::gen();
- Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
+ Document eventIdentifier{{"_id"_sd, "stuff"_sd},
+ {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
- ResumeTokenData resumeTokenDataIn(ts, 0, 0, testUuid, Value(documentKey));
+ ResumeTokenData resumeTokenDataIn(ts, 0, 0, testUuid, Value(eventIdentifier));
// Test serialization/parsing through Document.
auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument());
@@ -100,6 +102,27 @@ TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexEncoding) {
ASSERT_EQ(resumeTokenDataIn, tokenData);
}
+TEST(ResumeToken, NonDocumentKeyResumeTokenRoundTripsThroughHexEncoding) {
+ Timestamp ts(1001, 3);
+
+ ResumeTokenData resumeTokenDataIn;
+ resumeTokenDataIn.clusterTime = ts;
+ resumeTokenDataIn.uuid = UUID::gen();
+ resumeTokenDataIn.eventIdentifier = Value(BSON("operationType"
+ << "create"
+ << "operationDescription" << BSONObj()));
+
+ // Test serialization/parsing through Document.
+ auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
+ ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+
+ // Test serialization/parsing through BSON.
+ rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
+ tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+}
+
TEST(ResumeToken, TestMissingTypebitsOptimization) {
Timestamp ts(1000, 1);
UUID testUuid = UUID::gen();
@@ -118,8 +141,8 @@ TEST(ResumeToken, TestMissingTypebitsOptimization) {
auto rtNoTypeBitsData = ResumeToken::parse(noTypeBitsDoc).getData();
ASSERT_EQ(hasTypeBitsData, rtHasTypeBitsData);
ASSERT_EQ(noTypeBitsData, rtNoTypeBitsData);
- ASSERT_EQ(BSONType::NumberDouble, rtHasTypeBitsData.documentKey["_id"].getType());
- ASSERT_EQ(BSONType::NumberInt, rtNoTypeBitsData.documentKey["_id"].getType());
+ ASSERT_EQ(BSONType::NumberDouble, rtHasTypeBitsData.eventIdentifier["_id"].getType());
+ ASSERT_EQ(BSONType::NumberInt, rtNoTypeBitsData.eventIdentifier["_id"].getType());
}
TEST(ResumeToken, FailsToParseForInvalidTokenFormats) {
@@ -277,7 +300,7 @@ TEST(ResumeToken, InvalidTxnOpIndex) {
TEST(ResumeToken, StringEncodingSortsCorrectly) {
// Make sure that the string encoding of the resume tokens will compare in the correct order,
- // namely timestamp, version, txnOpIndex, uuid, then documentKey.
+ // namely timestamp, version, txnOpIndex, uuid, then eventIdentifier.
Timestamp ts2_2(2, 2);
Timestamp ts10_4(10, 4);
Timestamp ts10_5(10, 5);
@@ -309,7 +332,8 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) {
assertLt({ts2_2, 0, 0, boost::none, Value()}, {ts2_2, 1, 0, boost::none, Value()});
assertLt({ts10_4, 5, 0, boost::none, Value()}, {ts10_4, 10, 0, boost::none, Value()});
- // Test that the Timestamp is more important than the version, txnOpIndex, UUID and documentKey.
+ // Test that the Timestamp is more important than the version, txnOpIndex, UUID and
+ // eventIdentifier.
assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 0}})},
{ts10_5, 0, 0, lower_uuid, Value(Document{{"_id", 0}})});
assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})},
@@ -347,7 +371,7 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) {
assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 1}})},
{ts10_4, 0, 0, higher_uuid, Value(Document{{"_id", 2}})});
- // Test that when the Timestamp, version, txnOpIndex, and UUID are the same, the documentKey
+ // Test that when the Timestamp, version, txnOpIndex, and UUID are the same, the eventIdentifier
// breaks the tie.
assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})},
{ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 1}})});