diff options
author | jannaerin <golden.janna@gmail.com> | 2022-05-19 13:50:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-19 14:48:27 +0000 |
commit | 0419b6d14796177749c1921fb787a8c1d0e1faa3 (patch) | |
tree | daff9855d2aae5cd8dceb5011b3fe1fa135448e1 /src/mongo | |
parent | 5b63a432d152cafcd2e8479a28754c3d51d63baf (diff) | |
download | mongo-0419b6d14796177749c1921fb787a8c1d0e1faa3.tar.gz |
SERVER-66028 Include tenantId when constructing namespaces for change streams agg stages
Diffstat (limited to 'src/mongo')
8 files changed, 344 insertions, 25 deletions
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index e29f501a0df..38606ae3d4f 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -241,7 +241,7 @@ public: "namespaces cannot have embedded null characters", _ns.find('\0') == std::string::npos); - auto db = _dotIndex == std::string::npos ? ns : ns.substr(0, _dotIndex); + auto db = _dotIndex == std::string::npos ? ns : ns.substr(0, ns.find('.')); _dbName = DatabaseName(tenantId, db); } diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 8b0e48ef632..ddde981fbb9 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -434,6 +434,7 @@ env.Library( '$BUILD_DIR/mongo/db/ops/write_ops_parsers', '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', + '$BUILD_DIR/mongo/db/server_feature_flags', '$BUILD_DIR/mongo/db/update/update_driver', '$BUILD_DIR/mongo/s/query/router_exec_stage', 'change_stream_preimage', diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index 63d0aae84b6..64874656a89 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -38,6 +38,7 @@ #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" +#include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/update/update_oplog_entry_serialization.h" namespace mongo { @@ -61,7 +62,10 @@ repl::OpTypeEnum getOplogOpType(const Document& oplog) { } Value makeChangeStreamNsField(const NamespaceString& nss) { - return Value(Document{{"db", nss.db()}, {"coll", nss.coll()}}); + // For certain types, such as dropDatabase, the collection name may be empty and should be + // omitted. We never report the NamespaceString's tenantId in change stream events. + return Value(Document{{"db", nss.dbName().db()}, + {"coll", (nss.coll().empty() ? Value() : Value(nss.coll()))}}); } void setResumeTokenForEvent(const ResumeTokenData& resumeTokenData, MutableDocument* doc) { @@ -73,6 +77,16 @@ void setResumeTokenForEvent(const ResumeTokenData& resumeTokenData, MutableDocum const bool isSingleElementKey = true; doc->metadata().setSortKey(resumeToken, isSingleElementKey); } + +NamespaceString createNamespaceStringFromOplogEntry(Value tid, StringData ns) { + if (gFeatureFlagRequireTenantID.isEnabled(serverGlobalParams.featureCompatibility)) { + auto tenantId = tid.missing() ? boost::none : boost::optional<TenantId>{tid.getOid()}; + return NamespaceString(tenantId, ns); + } + + return NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(ns); +} + } // namespace ChangeStreamEventTransformation::ChangeStreamEventTransformation( @@ -146,11 +160,12 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum // Extract the fields we need. Value ts = input[repl::OplogEntry::kTimestampFieldName]; Value ns = input[repl::OplogEntry::kNssFieldName]; + Value tenantId = input[repl::OplogEntry::kTidFieldName]; checkValueType(ns, repl::OplogEntry::kNssFieldName, BSONType::String); Value uuid = input[repl::OplogEntry::kUuidFieldName]; auto opType = getOplogOpType(input); - NamespaceString nss(ns.getString()); + NamespaceString nss = createNamespaceStringFromOplogEntry(tenantId, ns.getStringData()); Value id = input.getNestedField("o._id"); // Non-replace updates have the _id in field "o2". StringData operationType; @@ -251,15 +266,16 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum operationType = DocumentSourceChangeStream::kDropCollectionOpType; // The "o.drop" field will contain the actual collection name. - nss = NamespaceString(nss.db(), nssField.getString()); + nss = NamespaceString(nss.dbName(), nssField.getStringData()); } else if (auto nssField = oField.getField("renameCollection"); !nssField.missing()) { operationType = DocumentSourceChangeStream::kRenameCollectionOpType; // The "o.renameCollection" field contains the namespace of the original collection. - nss = NamespaceString(nssField.getString()); + nss = createNamespaceStringFromOplogEntry(tenantId, nssField.getStringData()); // The "to" field contains the target namespace for the rename. - const auto renameTargetNss = NamespaceString(oField["to"].getString()); + const auto renameTargetNss = + createNamespaceStringFromOplogEntry(tenantId, oField["to"].getStringData()); const auto renameTarget = makeChangeStreamNsField(renameTargetNss); // The 'to' field predates the 'operationDescription' field which was added in 5.3. @@ -277,32 +293,32 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum // Extract the database name from the namespace field and leave the collection name // empty. - nss = NamespaceString(nss.db()); + nss = NamespaceString(nss.tenantId(), nss.dbName().db()); } else if (auto nssField = oField.getField("create"); !nssField.missing()) { operationType = DocumentSourceChangeStream::kCreateOpType; - nss = NamespaceString(nss.db(), nssField.getString()); + nss = NamespaceString(nss.dbName(), nssField.getStringData()); operationDescription = Value(copyDocExceptFields(oField, {"create"_sd})); } else if (auto nssField = oField.getField("createIndexes"); !nssField.missing()) { operationType = DocumentSourceChangeStream::kCreateIndexesOpType; - nss = NamespaceString(nss.db(), nssField.getString()); + nss = NamespaceString(nss.dbName(), nssField.getStringData()); // Wrap the index spec in an "indexes" array for consistency with commitIndexBuild. auto indexSpec = Value(copyDocExceptFields(oField, {"createIndexes"_sd})); operationDescription = Value(Document{{"indexes", std::vector<Value>{indexSpec}}}); } else if (auto nssField = oField.getField("commitIndexBuild"); !nssField.missing()) { operationType = DocumentSourceChangeStream::kCreateIndexesOpType; - nss = NamespaceString(nss.db(), nssField.getString()); + nss = NamespaceString(nss.dbName(), nssField.getStringData()); operationDescription = Value(Document{{"indexes", oField.getField("indexes")}}); } else if (auto nssField = oField.getField("dropIndexes"); !nssField.missing()) { const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument(); operationType = DocumentSourceChangeStream::kDropIndexesOpType; - nss = NamespaceString(nss.db(), nssField.getString()); + nss = NamespaceString(nss.dbName(), nssField.getStringData()); // Wrap the index spec in an "indexes" array for consistency with createIndexes // and commitIndexBuild. auto indexSpec = Value(copyDocExceptFields(o2Field, {"dropIndexes"_sd})); operationDescription = Value(Document{{"indexes", std::vector<Value>{indexSpec}}}); } else if (auto nssField = oField.getField("collMod"); !nssField.missing()) { operationType = DocumentSourceChangeStream::kModifyOpType; - nss = NamespaceString(nss.db(), nssField.getString()); + nss = NamespaceString(nss.dbName(), nssField.getStringData()); operationDescription = Value(copyDocExceptFields(oField, {"collMod"_sd})); const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument(); @@ -462,10 +478,9 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum doc.addField(DocumentSourceChangeStream::kPreImageIdField, Value(preImageId.toBSON())); } } - doc.addField(DocumentSourceChangeStream::kNamespaceField, - operationType == DocumentSourceChangeStream::kDropDatabaseOpType - ? Value(Document{{"db", nss.db()}}) - : makeChangeStreamNsField(nss)); + + // Add the 'ns' field to the change stream document, based on the final value of 'nss'. + doc.addField(DocumentSourceChangeStream::kNamespaceField, makeChangeStreamNsField(nss)); // The event may have a documentKey OR an operationDescription, but not both. We already // validated this while creating the resume token. @@ -506,6 +521,7 @@ Document ChangeStreamViewDefinitionEventTransformation::applyTransformation( const Document& input) const { Value ts = input[repl::OplogEntry::kTimestampFieldName]; auto opType = getOplogOpType(input); + Value tenantId = input[repl::OplogEntry::kTidFieldName]; StringData operationType; Value operationDescription; @@ -551,7 +567,7 @@ Document ChangeStreamViewDefinitionEventTransformation::applyTransformation( input[repl::OplogEntry::kWallClockTimeFieldName]); // The 'o._id' is the full namespace string of the view. - const auto nss = NamespaceString(oField["_id"].getString()); + const auto nss = createNamespaceStringFromOplogEntry(tenantId, oField["_id"].getStringData()); doc.addField(DocumentSourceChangeStream::kNamespaceField, makeChangeStreamNsField(nss)); doc.addField(DocumentSourceChangeStream::kOperationDescriptionField, operationDescription); @@ -570,7 +586,12 @@ ChangeStreamEventTransformer::ChangeStreamEventTransformer( ChangeStreamEventTransformation* ChangeStreamEventTransformer::getBuilder( const Document& oplog) const { - auto nss = NamespaceString(oplog[repl::OplogEntry::kNssFieldName].getStringData()); + // 'nss' is only used here determine which type of transformation to use. This is not dependent + // on the tenantId, so it is safe to ignore the tenantId in the oplog entry. It is useful to + // avoid extracting the tenantId because we must make this determination for every change stream + // event, and the check should therefore be as optimized as possible. + auto nss = NamespaceString(boost::none, oplog[repl::OplogEntry::kNssFieldName].getStringData()); + if (!_isSingleCollStream && nss.isSystemDotViews()) { return _viewNsEventBuilder.get(); } diff --git a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp index a5bdb3b0b73..def777627cd 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/exec/document_value/value.h" #include "mongo/db/exec/document_value/value_comparator.h" #include "mongo/db/matcher/schema/expression_internal_schema_object_match.h" +#include "mongo/db/multitenancy_gen.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/change_stream_event_transform.h" #include "mongo/db/pipeline/change_stream_rewrite_helpers.h" @@ -42,6 +43,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/idl/server_parameter_test_util.h" #include "mongo/unittest/unittest.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/uuid.h" @@ -97,8 +99,8 @@ TEST(ChangeStreamEventTransformTest, TestDefaultUpdateTransform) { } TEST(ChangeStreamEventTransformTest, TestCreateViewTransform) { - const NamespaceString systemViewNss("viewDB.system.views"); - const NamespaceString viewNss("viewDB.view.name"); + const NamespaceString systemViewNss(boost::none, "viewDB.system.views"); + const NamespaceString viewNss(boost::none, "viewDB.view.name"); const auto viewPipeline = Value(fromjson("[{$match: {field: 'value'}}, {$project: {field: 1}}]")); const auto opDescription = Document{{"viewOn", "baseColl"_sd}, {"pipeline", viewPipeline}}; @@ -130,8 +132,8 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewTransform) { } TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) { - const NamespaceString systemViewNss("viewDB.system.views"); - const NamespaceString viewNss("viewDB.view.name"); + const NamespaceString systemViewNss(boost::none, "viewDB.system.views"); + const NamespaceString viewNss(boost::none, "viewDB.view.name"); const auto viewPipeline = Value(fromjson("[{$match: {field: 'value'}}, {$project: {field: 1}}]")); const auto document = BSON("_id" << viewNss.toString() << "viewOn" @@ -162,5 +164,299 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) { ASSERT_DOCUMENT_EQ(applyTransformation(oplogEntry), expectedDoc); } +TEST(ChangeStreamEventTransformTest, TestUpdateTransformWithTenantId) { + // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be + // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the + // change event. + gMultitenancySupport = true; + + const auto documentKey = Document{{"x", 1}, {"y", 1}}; + const auto tenantId = TenantId(OID::gen()); + NamespaceString nssWithTenant(tenantId, "unittests.serverless_change_stream"); + + auto updateField = + makeOplogEntry(repl::OpTypeEnum::kUpdate, // op type + nssWithTenant, // namespace + BSON("$v" << 2 << "diff" << BSON("u" << BSON("y" << 2))), // o + testUuid(), // uuid + boost::none, // fromMigrate + documentKey.toBson() // o2 + ); + + Document expectedNamespace = + Document{{"db", nssWithTenant.dbName().db()}, {"coll", nssWithTenant.coll()}}; + + auto changeStreamDoc = applyTransformation(updateField, nssWithTenant); + auto outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument(); + + ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace); + + // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field + // in the oplog entry. It should still not be a part of the db name in the change event. + RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true); + + // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation + // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid + // including the tenantId as the db prefix in the OplogEntry's "ns" field. Until SERVER-66019 is + // complete, the tenantId will be included in both the "tid" field and "ns" fields in serialized + // oplog entries, because serializing NamespaceString currently will include the tenantId. + auto oplogEntry = BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op" + << "u" + << "ns" + << "unittests.serverless_change_stream" + << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid() + << "o" << BSON("$v" << 2 << "diff" << BSON("u" << BSON("y" << 2))) + << "o2" << documentKey.toBson()); + + DocumentSourceChangeStreamSpec spec; + spec.setStartAtOperationTime(kDefaultTs); + ChangeStreamEventTransformer transformer( + make_intrusive<ExpressionContextForTest>(nssWithTenant), spec); + + changeStreamDoc = transformer.applyTransformation(Document(oplogEntry)); + outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument(); + + ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace); +} + +TEST(ChangeStreamEventTransformTest, TestRenameTransformWithTenantId) { + // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be + // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the + // change event. + gMultitenancySupport = true; + + const auto tenantId = TenantId(OID::gen()); + NamespaceString renameFrom(tenantId, "unittests.serverless_change_stream"); + NamespaceString renameTo(tenantId, "unittests.rename_coll"); + + auto renameField = makeOplogEntry( + repl::OpTypeEnum::kCommand, // op type + renameFrom.getCommandNS(), // namespace + BSON("renameCollection" << renameFrom.toString() << "to" << renameTo.toString()), // o + testUuid() // uuid + ); + + Document expectedDoc{{DocumentSourceChangeStream::kNamespaceField, + Document{{"db", renameFrom.dbName().db()}, {"coll", renameFrom.coll()}}}, + {DocumentSourceChangeStream::kRenameTargetNssField, + Document{{"db", renameTo.dbName().db()}, {"coll", renameTo.coll()}}}, + {DocumentSourceChangeStream::kOperationDescriptionField, + Document{BSON("to" << BSON("db" << renameTo.dbName().db() << "coll" + << renameTo.coll()))}}}; + + auto changeStreamDoc = applyTransformation(renameField, renameFrom); + auto renameDoc = Document{ + {DocumentSourceChangeStream::kNamespaceField, + changeStreamDoc.getField(DocumentSourceChangeStream::kNamespaceField)}, + {DocumentSourceChangeStream::kRenameTargetNssField, + changeStreamDoc.getField(DocumentSourceChangeStream::kRenameTargetNssField)}, + {DocumentSourceChangeStream::kOperationDescriptionField, + changeStreamDoc.getField(DocumentSourceChangeStream::kOperationDescriptionField)}}; + + ASSERT_DOCUMENT_EQ(renameDoc, expectedDoc); + + // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field + // in the oplog entry. It should still not be a part of the db name in the change event. + RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true); + + // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation + // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid + // including the tenantId as the db prefix in the OplogEntry's "ns", "renameCollection", and + // "to" fields. Until SERVER-66019 is complete, the tenantId will be included in both the "tid" + // field and these 3 fields in serialized oplog entries, because serializing NamespaceString + // currently will include the tenantId. + auto oplogEntry = + BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op" + << "c" + << "ns" + << "unittests.$cmd" + << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid() << "o" + << BSON("renameCollection" + << "unittests.serverless_change_stream" + << "to" + << "unittests.rename_coll")); + + DocumentSourceChangeStreamSpec spec; + spec.setStartAtOperationTime(kDefaultTs); + spec.setShowExpandedEvents(true); + ChangeStreamEventTransformer transformer(make_intrusive<ExpressionContextForTest>(renameFrom), + spec); + + changeStreamDoc = transformer.applyTransformation(Document(oplogEntry)); + renameDoc = Document{ + {DocumentSourceChangeStream::kNamespaceField, + changeStreamDoc.getField(DocumentSourceChangeStream::kNamespaceField)}, + {DocumentSourceChangeStream::kRenameTargetNssField, + changeStreamDoc.getField(DocumentSourceChangeStream::kRenameTargetNssField)}, + {DocumentSourceChangeStream::kOperationDescriptionField, + changeStreamDoc.getField(DocumentSourceChangeStream::kOperationDescriptionField)}}; + + ASSERT_DOCUMENT_EQ(renameDoc, expectedDoc); +} + +TEST(ChangeStreamEventTransformTest, TestDropDatabaseTransformWithTenantId) { + // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be + // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the + // change event. + gMultitenancySupport = true; + + const auto tenantId = TenantId(OID::gen()); + NamespaceString dbToDrop(tenantId, "unittests"); + + auto dropDbField = makeOplogEntry(repl::OpTypeEnum::kCommand, // op type + dbToDrop.getCommandNS(), // namespace + BSON("dropDatabase" << 1), // o + testUuid() // uuid + ); + + Document expectedNamespace = Document{{"db", dbToDrop.dbName().db()}}; + + auto changeStreamDoc = applyTransformation(dropDbField, dbToDrop); + auto outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument(); + + ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace); + + // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field + // in the oplog entry. It should still not be a part of the db name in the change event. + RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true); + + // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation + // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid + // including the tenantId as the db prefix in the OplogEntry's "ns" field Until SERVER-66019 is + // complete, the tenantId will be included in both the "tid" and "ns" fields in serialized oplog + // entries, because serializing NamespaceString currently will include the tenantId. + auto oplogEntry = BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op" + << "c" + << "ns" + << "unittests.$cmd" + << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid() + << "o" << BSON("dropDatabase" << 1)); + + DocumentSourceChangeStreamSpec spec; + spec.setStartAtOperationTime(kDefaultTs); + ChangeStreamEventTransformer transformer(make_intrusive<ExpressionContextForTest>(dbToDrop), + spec); + + changeStreamDoc = transformer.applyTransformation(Document(oplogEntry)); + outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument(); + + ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace); +} + +TEST(ChangeStreamEventTransformTest, TestCreateTransformWithTenantId) { + // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be + // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the + // change event. + gMultitenancySupport = true; + + const auto tenantId = TenantId(OID::gen()); + NamespaceString nssWithTenant(tenantId, "unittests.serverless_change_stream"); + + auto createField = makeOplogEntry(repl::OpTypeEnum::kCommand, // op type + nssWithTenant.getCommandNS(), // namespace + BSON("create" << nssWithTenant.coll()), // o + testUuid() // uuid + ); + + Document expectedNamespace = + Document{{"db", nssWithTenant.dbName().db()}, {"coll", nssWithTenant.coll()}}; + + auto changeStreamDoc = applyTransformation(createField, nssWithTenant); + auto outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument(); + + ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace); + + // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field + // in the oplog entry. It should still not be a part of the db name in the change event. + RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true); + + // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation + // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid + // including the tenantId as the db prefix in the OplogEntry's "ns" field Until SERVER-66019 is + // complete, the tenantId will be included in both the "tid" and "ns" fields in serialized oplog + // entries, because serializing NamespaceString currently will include the tenantId. + auto oplogEntry = BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op" + << "c" + << "ns" + << "unittests.$cmd" + << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid() + << "o" << BSON("create" << nssWithTenant.coll())); + + DocumentSourceChangeStreamSpec spec; + spec.setStartAtOperationTime(kDefaultTs); + spec.setShowExpandedEvents(true); + ChangeStreamEventTransformer transformer( + make_intrusive<ExpressionContextForTest>(nssWithTenant), spec); + + changeStreamDoc = transformer.applyTransformation(Document(oplogEntry)); + outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument(); + + ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace); +} + +TEST(ChangeStreamEventTransformTest, TestCreateViewTransformWithTenantId) { + // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be + // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the + // change event. + gMultitenancySupport = true; + + const auto tenantId = TenantId(OID::gen()); + + const NamespaceString systemViewNss(tenantId, "viewDB.system.views"); + const NamespaceString viewNss(tenantId, "viewDB.view.name"); + const auto viewPipeline = + Value(fromjson("[{$match: {field: 'value'}}, {$project: {field: 1}}]")); + const auto opDescription = Document{{"viewOn", "baseColl"_sd}, {"pipeline", viewPipeline}}; + auto createView = makeOplogEntry(repl::OpTypeEnum::kInsert, // op type + systemViewNss, // namespace + BSON("_id" << viewNss.toString() << "viewOn" + << "baseColl" + << "pipeline" << viewPipeline), // o + testUuid()); // uuid + + Document expectedNamespace = Document{{"db", viewNss.dbName().db()}, {"coll", viewNss.coll()}}; + + auto changeStreamDoc = applyTransformation( + createView, NamespaceString::makeCollectionlessAggregateNSS(viewNss.dbName())); + auto outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument(); + + ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace); + + + // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field + // in the oplog entry. It should still not be a part of the db name in the change event. + RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true); + + // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation + // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid + // including the tenantId as the db prefix in the OplogEntry's "ns" and "o._id" fields. Until + // SERVER-66019 is complete, the tenantId will be included in both the "tid" field and these 2 + // fields in serialized oplog entries, because serializing NamespaceString currently will + // include the tenantId. + auto oplogEntry = BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op" + << "i" + << "ns" + << "viewDB.system.views" + << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid() + << "o" + << BSON("_id" + << "viewDB.view.name" + << "viewOn" + << "baseColl" + << "pipeline" << viewPipeline)); + + DocumentSourceChangeStreamSpec spec; + spec.setStartAtOperationTime(kDefaultTs); + ChangeStreamEventTransformer transformer( + make_intrusive<ExpressionContextForTest>( + NamespaceString::makeCollectionlessAggregateNSS(viewNss.dbName())), + spec); + + changeStreamDoc = transformer.applyTransformation(Document(oplogEntry)); + outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument(); + + ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_test_helpers.h b/src/mongo/db/pipeline/change_stream_test_helpers.h index b2492dbe5dd..9e472042a25 100644 --- a/src/mongo/db/pipeline/change_stream_test_helpers.h +++ b/src/mongo/db/pipeline/change_stream_test_helpers.h @@ -42,7 +42,7 @@ namespace mongo::change_stream_test_helper { static const Timestamp kDefaultTs(100, 1); static const repl::OpTime kDefaultOpTime(kDefaultTs, 1); -static const NamespaceString nss("unittests.change_stream"); +static const NamespaceString nss(boost::none, "unittests.change_stream"); static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}"); static const BSONObj kShowExpandedEventsSpec = fromjson("{$changeStream: {showExpandedEvents: true}}"); diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp index 051922556a5..ab3f95abca1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp @@ -120,7 +120,7 @@ NamespaceString DocumentSourceChangeStreamAddPostImage::assertValidNamespace( .getDocument(); auto dbName = assertFieldHasType(namespaceObject, "db"_sd, BSONType::String); auto collectionName = assertFieldHasType(namespaceObject, "coll"_sd, BSONType::String); - NamespaceString nss(dbName.getString(), collectionName.getString()); + NamespaceString nss(pExpCtx->ns.tenantId(), dbName.getString(), collectionName.getString()); // Change streams on an entire database only need to verify that the database names match. If // the database is 'admin', then this is a cluster-wide $changeStream and we are permitted to diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp index 33cf79cc36f..8aa0d00deb9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp @@ -375,5 +375,6 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPropagatePauses) { ASSERT_TRUE(lookupChangeStage->getNext().isEOF()); ASSERT_TRUE(lookupChangeStage->getNext().isEOF()); } + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index cdf631dfc63..8a48df33cbc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -386,7 +386,7 @@ public: std::unique_ptr<Pipeline, PipelineDeleter> buildTestPipeline( const std::vector<BSONObj>& rawPipeline) { auto expCtx = getExpCtx(); - expCtx->ns = NamespaceString("a.collection"); + expCtx->ns = NamespaceString(boost::none, "a.collection"); expCtx->inMongos = true; auto pipeline = Pipeline::parse(rawPipeline, expCtx); |