summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2022-05-19 13:50:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-19 14:48:27 +0000
commit0419b6d14796177749c1921fb787a8c1d0e1faa3 (patch)
treedaff9855d2aae5cd8dceb5011b3fe1fa135448e1 /src/mongo
parent5b63a432d152cafcd2e8479a28754c3d51d63baf (diff)
downloadmongo-0419b6d14796177749c1921fb787a8c1d0e1faa3.tar.gz
SERVER-66028 Include tenantId when constructing namespaces for change streams agg stages
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/namespace_string.h2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp55
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform_test.cpp304
-rw-r--r--src/mongo/db/pipeline/change_stream_test_helpers.h2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp2
8 files changed, 344 insertions, 25 deletions
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index e29f501a0df..38606ae3d4f 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -241,7 +241,7 @@ public:
"namespaces cannot have embedded null characters",
_ns.find('\0') == std::string::npos);
- auto db = _dotIndex == std::string::npos ? ns : ns.substr(0, _dotIndex);
+ auto db = _dotIndex == std::string::npos ? ns : ns.substr(0, ns.find('.'));
_dbName = DatabaseName(tenantId, db);
}
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 8b0e48ef632..ddde981fbb9 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -434,6 +434,7 @@ env.Library(
'$BUILD_DIR/mongo/db/ops/write_ops_parsers',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers',
+ '$BUILD_DIR/mongo/db/server_feature_flags',
'$BUILD_DIR/mongo/db/update/update_driver',
'$BUILD_DIR/mongo/s/query/router_exec_stage',
'change_stream_preimage',
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 63d0aae84b6..64874656a89 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
namespace mongo {
@@ -61,7 +62,10 @@ repl::OpTypeEnum getOplogOpType(const Document& oplog) {
}
Value makeChangeStreamNsField(const NamespaceString& nss) {
- return Value(Document{{"db", nss.db()}, {"coll", nss.coll()}});
+ // For certain types, such as dropDatabase, the collection name may be empty and should be
+ // omitted. We never report the NamespaceString's tenantId in change stream events.
+ return Value(Document{{"db", nss.dbName().db()},
+ {"coll", (nss.coll().empty() ? Value() : Value(nss.coll()))}});
}
void setResumeTokenForEvent(const ResumeTokenData& resumeTokenData, MutableDocument* doc) {
@@ -73,6 +77,16 @@ void setResumeTokenForEvent(const ResumeTokenData& resumeTokenData, MutableDocum
const bool isSingleElementKey = true;
doc->metadata().setSortKey(resumeToken, isSingleElementKey);
}
+
+NamespaceString createNamespaceStringFromOplogEntry(Value tid, StringData ns) {
+ if (gFeatureFlagRequireTenantID.isEnabled(serverGlobalParams.featureCompatibility)) {
+ auto tenantId = tid.missing() ? boost::none : boost::optional<TenantId>{tid.getOid()};
+ return NamespaceString(tenantId, ns);
+ }
+
+ return NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(ns);
+}
+
} // namespace
ChangeStreamEventTransformation::ChangeStreamEventTransformation(
@@ -146,11 +160,12 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// Extract the fields we need.
Value ts = input[repl::OplogEntry::kTimestampFieldName];
Value ns = input[repl::OplogEntry::kNssFieldName];
+ Value tenantId = input[repl::OplogEntry::kTidFieldName];
checkValueType(ns, repl::OplogEntry::kNssFieldName, BSONType::String);
Value uuid = input[repl::OplogEntry::kUuidFieldName];
auto opType = getOplogOpType(input);
- NamespaceString nss(ns.getString());
+ NamespaceString nss = createNamespaceStringFromOplogEntry(tenantId, ns.getStringData());
Value id = input.getNestedField("o._id");
// Non-replace updates have the _id in field "o2".
StringData operationType;
@@ -251,15 +266,16 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
operationType = DocumentSourceChangeStream::kDropCollectionOpType;
// The "o.drop" field will contain the actual collection name.
- nss = NamespaceString(nss.db(), nssField.getString());
+ nss = NamespaceString(nss.dbName(), nssField.getStringData());
} else if (auto nssField = oField.getField("renameCollection"); !nssField.missing()) {
operationType = DocumentSourceChangeStream::kRenameCollectionOpType;
// The "o.renameCollection" field contains the namespace of the original collection.
- nss = NamespaceString(nssField.getString());
+ nss = createNamespaceStringFromOplogEntry(tenantId, nssField.getStringData());
// The "to" field contains the target namespace for the rename.
- const auto renameTargetNss = NamespaceString(oField["to"].getString());
+ const auto renameTargetNss =
+ createNamespaceStringFromOplogEntry(tenantId, oField["to"].getStringData());
const auto renameTarget = makeChangeStreamNsField(renameTargetNss);
// The 'to' field predates the 'operationDescription' field which was added in 5.3.
@@ -277,32 +293,32 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// Extract the database name from the namespace field and leave the collection name
// empty.
- nss = NamespaceString(nss.db());
+ nss = NamespaceString(nss.tenantId(), nss.dbName().db());
} else if (auto nssField = oField.getField("create"); !nssField.missing()) {
operationType = DocumentSourceChangeStream::kCreateOpType;
- nss = NamespaceString(nss.db(), nssField.getString());
+ nss = NamespaceString(nss.dbName(), nssField.getStringData());
operationDescription = Value(copyDocExceptFields(oField, {"create"_sd}));
} else if (auto nssField = oField.getField("createIndexes"); !nssField.missing()) {
operationType = DocumentSourceChangeStream::kCreateIndexesOpType;
- nss = NamespaceString(nss.db(), nssField.getString());
+ nss = NamespaceString(nss.dbName(), nssField.getStringData());
// Wrap the index spec in an "indexes" array for consistency with commitIndexBuild.
auto indexSpec = Value(copyDocExceptFields(oField, {"createIndexes"_sd}));
operationDescription = Value(Document{{"indexes", std::vector<Value>{indexSpec}}});
} else if (auto nssField = oField.getField("commitIndexBuild"); !nssField.missing()) {
operationType = DocumentSourceChangeStream::kCreateIndexesOpType;
- nss = NamespaceString(nss.db(), nssField.getString());
+ nss = NamespaceString(nss.dbName(), nssField.getStringData());
operationDescription = Value(Document{{"indexes", oField.getField("indexes")}});
} else if (auto nssField = oField.getField("dropIndexes"); !nssField.missing()) {
const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument();
operationType = DocumentSourceChangeStream::kDropIndexesOpType;
- nss = NamespaceString(nss.db(), nssField.getString());
+ nss = NamespaceString(nss.dbName(), nssField.getStringData());
// Wrap the index spec in an "indexes" array for consistency with createIndexes
// and commitIndexBuild.
auto indexSpec = Value(copyDocExceptFields(o2Field, {"dropIndexes"_sd}));
operationDescription = Value(Document{{"indexes", std::vector<Value>{indexSpec}}});
} else if (auto nssField = oField.getField("collMod"); !nssField.missing()) {
operationType = DocumentSourceChangeStream::kModifyOpType;
- nss = NamespaceString(nss.db(), nssField.getString());
+ nss = NamespaceString(nss.dbName(), nssField.getStringData());
operationDescription = Value(copyDocExceptFields(oField, {"collMod"_sd}));
const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument();
@@ -462,10 +478,9 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
doc.addField(DocumentSourceChangeStream::kPreImageIdField, Value(preImageId.toBSON()));
}
}
- doc.addField(DocumentSourceChangeStream::kNamespaceField,
- operationType == DocumentSourceChangeStream::kDropDatabaseOpType
- ? Value(Document{{"db", nss.db()}})
- : makeChangeStreamNsField(nss));
+
+ // Add the 'ns' field to the change stream document, based on the final value of 'nss'.
+ doc.addField(DocumentSourceChangeStream::kNamespaceField, makeChangeStreamNsField(nss));
// The event may have a documentKey OR an operationDescription, but not both. We already
// validated this while creating the resume token.
@@ -506,6 +521,7 @@ Document ChangeStreamViewDefinitionEventTransformation::applyTransformation(
const Document& input) const {
Value ts = input[repl::OplogEntry::kTimestampFieldName];
auto opType = getOplogOpType(input);
+ Value tenantId = input[repl::OplogEntry::kTidFieldName];
StringData operationType;
Value operationDescription;
@@ -551,7 +567,7 @@ Document ChangeStreamViewDefinitionEventTransformation::applyTransformation(
input[repl::OplogEntry::kWallClockTimeFieldName]);
// The 'o._id' is the full namespace string of the view.
- const auto nss = NamespaceString(oField["_id"].getString());
+ const auto nss = createNamespaceStringFromOplogEntry(tenantId, oField["_id"].getStringData());
doc.addField(DocumentSourceChangeStream::kNamespaceField, makeChangeStreamNsField(nss));
doc.addField(DocumentSourceChangeStream::kOperationDescriptionField, operationDescription);
@@ -570,7 +586,12 @@ ChangeStreamEventTransformer::ChangeStreamEventTransformer(
ChangeStreamEventTransformation* ChangeStreamEventTransformer::getBuilder(
const Document& oplog) const {
- auto nss = NamespaceString(oplog[repl::OplogEntry::kNssFieldName].getStringData());
+ // 'nss' is only used here determine which type of transformation to use. This is not dependent
+ // on the tenantId, so it is safe to ignore the tenantId in the oplog entry. It is useful to
+ // avoid extracting the tenantId because we must make this determination for every change stream
+ // event, and the check should therefore be as optimized as possible.
+ auto nss = NamespaceString(boost::none, oplog[repl::OplogEntry::kNssFieldName].getStringData());
+
if (!_isSingleCollStream && nss.isSystemDotViews()) {
return _viewNsEventBuilder.get();
}
diff --git a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp
index a5bdb3b0b73..def777627cd 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/exec/document_value/value_comparator.h"
#include "mongo/db/matcher/schema/expression_internal_schema_object_match.h"
+#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/change_stream_event_transform.h"
#include "mongo/db/pipeline/change_stream_rewrite_helpers.h"
@@ -42,6 +43,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/uuid.h"
@@ -97,8 +99,8 @@ TEST(ChangeStreamEventTransformTest, TestDefaultUpdateTransform) {
}
TEST(ChangeStreamEventTransformTest, TestCreateViewTransform) {
- const NamespaceString systemViewNss("viewDB.system.views");
- const NamespaceString viewNss("viewDB.view.name");
+ const NamespaceString systemViewNss(boost::none, "viewDB.system.views");
+ const NamespaceString viewNss(boost::none, "viewDB.view.name");
const auto viewPipeline =
Value(fromjson("[{$match: {field: 'value'}}, {$project: {field: 1}}]"));
const auto opDescription = Document{{"viewOn", "baseColl"_sd}, {"pipeline", viewPipeline}};
@@ -130,8 +132,8 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewTransform) {
}
TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) {
- const NamespaceString systemViewNss("viewDB.system.views");
- const NamespaceString viewNss("viewDB.view.name");
+ const NamespaceString systemViewNss(boost::none, "viewDB.system.views");
+ const NamespaceString viewNss(boost::none, "viewDB.view.name");
const auto viewPipeline =
Value(fromjson("[{$match: {field: 'value'}}, {$project: {field: 1}}]"));
const auto document = BSON("_id" << viewNss.toString() << "viewOn"
@@ -162,5 +164,299 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) {
ASSERT_DOCUMENT_EQ(applyTransformation(oplogEntry), expectedDoc);
}
+TEST(ChangeStreamEventTransformTest, TestUpdateTransformWithTenantId) {
+ // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be
+ // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the
+ // change event.
+ gMultitenancySupport = true;
+
+ const auto documentKey = Document{{"x", 1}, {"y", 1}};
+ const auto tenantId = TenantId(OID::gen());
+ NamespaceString nssWithTenant(tenantId, "unittests.serverless_change_stream");
+
+ auto updateField =
+ makeOplogEntry(repl::OpTypeEnum::kUpdate, // op type
+ nssWithTenant, // namespace
+ BSON("$v" << 2 << "diff" << BSON("u" << BSON("y" << 2))), // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ documentKey.toBson() // o2
+ );
+
+ Document expectedNamespace =
+ Document{{"db", nssWithTenant.dbName().db()}, {"coll", nssWithTenant.coll()}};
+
+ auto changeStreamDoc = applyTransformation(updateField, nssWithTenant);
+ auto outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument();
+
+ ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace);
+
+ // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field
+ // in the oplog entry. It should still not be a part of the db name in the change event.
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+
+ // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation
+ // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid
+ // including the tenantId as the db prefix in the OplogEntry's "ns" field. Until SERVER-66019 is
+ // complete, the tenantId will be included in both the "tid" field and "ns" fields in serialized
+ // oplog entries, because serializing NamespaceString currently will include the tenantId.
+ auto oplogEntry = BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op"
+ << "u"
+ << "ns"
+ << "unittests.serverless_change_stream"
+ << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid()
+ << "o" << BSON("$v" << 2 << "diff" << BSON("u" << BSON("y" << 2)))
+ << "o2" << documentKey.toBson());
+
+ DocumentSourceChangeStreamSpec spec;
+ spec.setStartAtOperationTime(kDefaultTs);
+ ChangeStreamEventTransformer transformer(
+ make_intrusive<ExpressionContextForTest>(nssWithTenant), spec);
+
+ changeStreamDoc = transformer.applyTransformation(Document(oplogEntry));
+ outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument();
+
+ ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace);
+}
+
+TEST(ChangeStreamEventTransformTest, TestRenameTransformWithTenantId) {
+ // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be
+ // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the
+ // change event.
+ gMultitenancySupport = true;
+
+ const auto tenantId = TenantId(OID::gen());
+ NamespaceString renameFrom(tenantId, "unittests.serverless_change_stream");
+ NamespaceString renameTo(tenantId, "unittests.rename_coll");
+
+ auto renameField = makeOplogEntry(
+ repl::OpTypeEnum::kCommand, // op type
+ renameFrom.getCommandNS(), // namespace
+ BSON("renameCollection" << renameFrom.toString() << "to" << renameTo.toString()), // o
+ testUuid() // uuid
+ );
+
+ Document expectedDoc{{DocumentSourceChangeStream::kNamespaceField,
+ Document{{"db", renameFrom.dbName().db()}, {"coll", renameFrom.coll()}}},
+ {DocumentSourceChangeStream::kRenameTargetNssField,
+ Document{{"db", renameTo.dbName().db()}, {"coll", renameTo.coll()}}},
+ {DocumentSourceChangeStream::kOperationDescriptionField,
+ Document{BSON("to" << BSON("db" << renameTo.dbName().db() << "coll"
+ << renameTo.coll()))}}};
+
+ auto changeStreamDoc = applyTransformation(renameField, renameFrom);
+ auto renameDoc = Document{
+ {DocumentSourceChangeStream::kNamespaceField,
+ changeStreamDoc.getField(DocumentSourceChangeStream::kNamespaceField)},
+ {DocumentSourceChangeStream::kRenameTargetNssField,
+ changeStreamDoc.getField(DocumentSourceChangeStream::kRenameTargetNssField)},
+ {DocumentSourceChangeStream::kOperationDescriptionField,
+ changeStreamDoc.getField(DocumentSourceChangeStream::kOperationDescriptionField)}};
+
+ ASSERT_DOCUMENT_EQ(renameDoc, expectedDoc);
+
+ // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field
+ // in the oplog entry. It should still not be a part of the db name in the change event.
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+
+ // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation
+ // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid
+ // including the tenantId as the db prefix in the OplogEntry's "ns", "renameCollection", and
+ // "to" fields. Until SERVER-66019 is complete, the tenantId will be included in both the "tid"
+ // field and these 3 fields in serialized oplog entries, because serializing NamespaceString
+ // currently will include the tenantId.
+ auto oplogEntry =
+ BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op"
+ << "c"
+ << "ns"
+ << "unittests.$cmd"
+ << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid() << "o"
+ << BSON("renameCollection"
+ << "unittests.serverless_change_stream"
+ << "to"
+ << "unittests.rename_coll"));
+
+ DocumentSourceChangeStreamSpec spec;
+ spec.setStartAtOperationTime(kDefaultTs);
+ spec.setShowExpandedEvents(true);
+ ChangeStreamEventTransformer transformer(make_intrusive<ExpressionContextForTest>(renameFrom),
+ spec);
+
+ changeStreamDoc = transformer.applyTransformation(Document(oplogEntry));
+ renameDoc = Document{
+ {DocumentSourceChangeStream::kNamespaceField,
+ changeStreamDoc.getField(DocumentSourceChangeStream::kNamespaceField)},
+ {DocumentSourceChangeStream::kRenameTargetNssField,
+ changeStreamDoc.getField(DocumentSourceChangeStream::kRenameTargetNssField)},
+ {DocumentSourceChangeStream::kOperationDescriptionField,
+ changeStreamDoc.getField(DocumentSourceChangeStream::kOperationDescriptionField)}};
+
+ ASSERT_DOCUMENT_EQ(renameDoc, expectedDoc);
+}
+
+TEST(ChangeStreamEventTransformTest, TestDropDatabaseTransformWithTenantId) {
+ // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be
+ // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the
+ // change event.
+ gMultitenancySupport = true;
+
+ const auto tenantId = TenantId(OID::gen());
+ NamespaceString dbToDrop(tenantId, "unittests");
+
+ auto dropDbField = makeOplogEntry(repl::OpTypeEnum::kCommand, // op type
+ dbToDrop.getCommandNS(), // namespace
+ BSON("dropDatabase" << 1), // o
+ testUuid() // uuid
+ );
+
+ Document expectedNamespace = Document{{"db", dbToDrop.dbName().db()}};
+
+ auto changeStreamDoc = applyTransformation(dropDbField, dbToDrop);
+ auto outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument();
+
+ ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace);
+
+ // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field
+ // in the oplog entry. It should still not be a part of the db name in the change event.
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+
+ // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation
+ // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid
+ // including the tenantId as the db prefix in the OplogEntry's "ns" field Until SERVER-66019 is
+ // complete, the tenantId will be included in both the "tid" and "ns" fields in serialized oplog
+ // entries, because serializing NamespaceString currently will include the tenantId.
+ auto oplogEntry = BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op"
+ << "c"
+ << "ns"
+ << "unittests.$cmd"
+ << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid()
+ << "o" << BSON("dropDatabase" << 1));
+
+ DocumentSourceChangeStreamSpec spec;
+ spec.setStartAtOperationTime(kDefaultTs);
+ ChangeStreamEventTransformer transformer(make_intrusive<ExpressionContextForTest>(dbToDrop),
+ spec);
+
+ changeStreamDoc = transformer.applyTransformation(Document(oplogEntry));
+ outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument();
+
+ ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace);
+}
+
+TEST(ChangeStreamEventTransformTest, TestCreateTransformWithTenantId) {
+ // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be
+ // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the
+ // change event.
+ gMultitenancySupport = true;
+
+ const auto tenantId = TenantId(OID::gen());
+ NamespaceString nssWithTenant(tenantId, "unittests.serverless_change_stream");
+
+ auto createField = makeOplogEntry(repl::OpTypeEnum::kCommand, // op type
+ nssWithTenant.getCommandNS(), // namespace
+ BSON("create" << nssWithTenant.coll()), // o
+ testUuid() // uuid
+ );
+
+ Document expectedNamespace =
+ Document{{"db", nssWithTenant.dbName().db()}, {"coll", nssWithTenant.coll()}};
+
+ auto changeStreamDoc = applyTransformation(createField, nssWithTenant);
+ auto outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument();
+
+ ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace);
+
+ // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field
+ // in the oplog entry. It should still not be a part of the db name in the change event.
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+
+ // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation
+ // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid
+ // including the tenantId as the db prefix in the OplogEntry's "ns" field Until SERVER-66019 is
+ // complete, the tenantId will be included in both the "tid" and "ns" fields in serialized oplog
+ // entries, because serializing NamespaceString currently will include the tenantId.
+ auto oplogEntry = BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op"
+ << "c"
+ << "ns"
+ << "unittests.$cmd"
+ << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid()
+ << "o" << BSON("create" << nssWithTenant.coll()));
+
+ DocumentSourceChangeStreamSpec spec;
+ spec.setStartAtOperationTime(kDefaultTs);
+ spec.setShowExpandedEvents(true);
+ ChangeStreamEventTransformer transformer(
+ make_intrusive<ExpressionContextForTest>(nssWithTenant), spec);
+
+ changeStreamDoc = transformer.applyTransformation(Document(oplogEntry));
+ outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument();
+
+ ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace);
+}
+
+TEST(ChangeStreamEventTransformTest, TestCreateViewTransformWithTenantId) {
+ // Turn on multitenancySupport, but not featureFlagRequireTenantId. We expect the tenantId to be
+ // part of the 'ns' field in the oplog entry, but it should not be a part of the db name in the
+ // change event.
+ gMultitenancySupport = true;
+
+ const auto tenantId = TenantId(OID::gen());
+
+ const NamespaceString systemViewNss(tenantId, "viewDB.system.views");
+ const NamespaceString viewNss(tenantId, "viewDB.view.name");
+ const auto viewPipeline =
+ Value(fromjson("[{$match: {field: 'value'}}, {$project: {field: 1}}]"));
+ const auto opDescription = Document{{"viewOn", "baseColl"_sd}, {"pipeline", viewPipeline}};
+ auto createView = makeOplogEntry(repl::OpTypeEnum::kInsert, // op type
+ systemViewNss, // namespace
+ BSON("_id" << viewNss.toString() << "viewOn"
+ << "baseColl"
+ << "pipeline" << viewPipeline), // o
+ testUuid()); // uuid
+
+ Document expectedNamespace = Document{{"db", viewNss.dbName().db()}, {"coll", viewNss.coll()}};
+
+ auto changeStreamDoc = applyTransformation(
+ createView, NamespaceString::makeCollectionlessAggregateNSS(viewNss.dbName()));
+ auto outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument();
+
+ ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace);
+
+
+ // Now set featureFlagRequireTenantId, so we expect the tenantId to be in a separate "tid" field
+ // in the oplog entry. It should still not be a part of the db name in the change event.
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+
+ // TODO SERVER-66019 Construct OplogEntry using makeOplogEntry and use the applyTransformation
+ // helper defined above. We manually construct the OplogEntry as a BSON object below to avoid
+ // including the tenantId as the db prefix in the OplogEntry's "ns" and "o._id" fields. Until
+ // SERVER-66019 is complete, the tenantId will be included in both the "tid" field and these 2
+ // fields in serialized oplog entries, because serializing NamespaceString currently will
+ // include the tenantId.
+ auto oplogEntry = BSON("ts" << Timestamp(0, 0) << "t" << 0LL << "op"
+ << "i"
+ << "ns"
+ << "viewDB.system.views"
+ << "tid" << tenantId << "wall" << Date_t() << "ui" << testUuid()
+ << "o"
+ << BSON("_id"
+ << "viewDB.view.name"
+ << "viewOn"
+ << "baseColl"
+ << "pipeline" << viewPipeline));
+
+ DocumentSourceChangeStreamSpec spec;
+ spec.setStartAtOperationTime(kDefaultTs);
+ ChangeStreamEventTransformer transformer(
+ make_intrusive<ExpressionContextForTest>(
+ NamespaceString::makeCollectionlessAggregateNSS(viewNss.dbName())),
+ spec);
+
+ changeStreamDoc = transformer.applyTransformation(Document(oplogEntry));
+ outputNs = changeStreamDoc[DocumentSourceChangeStream::kNamespaceField].getDocument();
+
+ ASSERT_DOCUMENT_EQ(outputNs, expectedNamespace);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_test_helpers.h b/src/mongo/db/pipeline/change_stream_test_helpers.h
index b2492dbe5dd..9e472042a25 100644
--- a/src/mongo/db/pipeline/change_stream_test_helpers.h
+++ b/src/mongo/db/pipeline/change_stream_test_helpers.h
@@ -42,7 +42,7 @@
namespace mongo::change_stream_test_helper {
static const Timestamp kDefaultTs(100, 1);
static const repl::OpTime kDefaultOpTime(kDefaultTs, 1);
-static const NamespaceString nss("unittests.change_stream");
+static const NamespaceString nss(boost::none, "unittests.change_stream");
static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}");
static const BSONObj kShowExpandedEventsSpec =
fromjson("{$changeStream: {showExpandedEvents: true}}");
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
index 051922556a5..ab3f95abca1 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
@@ -120,7 +120,7 @@ NamespaceString DocumentSourceChangeStreamAddPostImage::assertValidNamespace(
.getDocument();
auto dbName = assertFieldHasType(namespaceObject, "db"_sd, BSONType::String);
auto collectionName = assertFieldHasType(namespaceObject, "coll"_sd, BSONType::String);
- NamespaceString nss(dbName.getString(), collectionName.getString());
+ NamespaceString nss(pExpCtx->ns.tenantId(), dbName.getString(), collectionName.getString());
// Change streams on an entire database only need to verify that the database names match. If
// the database is 'admin', then this is a cluster-wide $changeStream and we are permitted to
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
index 33cf79cc36f..8aa0d00deb9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
@@ -375,5 +375,6 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPropagatePauses) {
ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index cdf631dfc63..8a48df33cbc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -386,7 +386,7 @@ public:
std::unique_ptr<Pipeline, PipelineDeleter> buildTestPipeline(
const std::vector<BSONObj>& rawPipeline) {
auto expCtx = getExpCtx();
- expCtx->ns = NamespaceString("a.collection");
+ expCtx->ns = NamespaceString(boost::none, "a.collection");
expCtx->inMongos = true;
auto pipeline = Pipeline::parse(rawPipeline, expCtx);