summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-09-15 10:27:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 11:29:18 +0000
commite6b184b48b2f4ceaff580c98c24e14eac26e2c03 (patch)
tree27410d5d07867ef6be3026cb69a9a9821e03e254 /src
parent0797ff28efcd7cb954b88658425b7b38c980b605 (diff)
downloadmongo-e6b184b48b2f4ceaff580c98c24e14eac26e2c03.tar.gz
SERVER-66641 Introduce multi-tenancy for change collections.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript22
-rw-r--r--src/mongo/db/catalog/drop_collection.cpp4
-rw-r--r--src/mongo/db/change_collection_expired_change_remover_test.cpp18
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.cpp29
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.h3
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp113
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h21
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.cpp26
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.h7
-rw-r--r--src/mongo/db/change_stream_serverless_helpers.cpp101
-rw-r--r--src/mongo/db/change_stream_serverless_helpers.h71
-rw-r--r--src/mongo/db/change_streams_cluster_parameter.idl4
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/change_stream_state_command.cpp46
-rw-r--r--src/mongo/db/commands/dbcommands.cpp4
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp18
-rw-r--r--src/mongo/db/commands/set_cluster_parameter_command.cpp4
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp8
-rw-r--r--src/mongo/db/mongod_main.cpp6
-rw-r--r--src/mongo/db/namespace_string.cpp12
-rw-r--r--src/mongo/db/namespace_string.h8
-rw-r--r--src/mongo/db/op_observer/op_observer_impl_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp4
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp2
-rw-r--r--src/mongo/db/query/query_knobs.idl9
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/oplog.cpp3
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp6
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
-rw-r--r--src/mongo/db/set_change_stream_state_coordinator.cpp50
-rw-r--r--src/mongo/db/stats/SConscript1
-rw-r--r--src/mongo/db/stats/change_collection_server_status.cpp3
33 files changed, 427 insertions, 197 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 57c4feedb00..c0514e4dd44 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -511,7 +511,9 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/change_stream_state',
+ '$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/repl/primary_only_service',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
@@ -524,6 +526,21 @@ env.Library(
)
env.Library(
+ target='change_stream_serverless_helpers',
+ source=[
+ 'change_stream_serverless_helpers.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/collection',
+ '$BUILD_DIR/mongo/db/catalog/collection_catalog',
+ '$BUILD_DIR/mongo/db/query/query_knobs',
+ '$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/server_options',
+ '$BUILD_DIR/mongo/idl/feature_flag',
+ ],
+)
+
+env.Library(
target='change_stream_change_collection_manager',
source=[
'change_stream_change_collection_manager.cpp',
@@ -532,9 +549,10 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/clustered_collection_options',
'$BUILD_DIR/mongo/db/catalog/collection_crud',
- '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/dbhelpers',
+ '$BUILD_DIR/mongo/db/server_feature_flags',
'$BUILD_DIR/mongo/db/service_context',
],
)
@@ -546,6 +564,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/util/periodic_runner',
@@ -2515,6 +2534,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/catalog/local_oplog_info',
'$BUILD_DIR/mongo/db/change_collection_expired_change_remover',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
'$BUILD_DIR/mongo/db/commands/create_command',
'$BUILD_DIR/mongo/db/mongohasher',
diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp
index 80ff680df43..5e9089fce0c 100644
--- a/src/mongo/db/catalog/drop_collection.cpp
+++ b/src/mongo/db/catalog/drop_collection.cpp
@@ -233,7 +233,7 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx,
if (dropIfUUIDNotMatching && collectionUUID == *dropIfUUIDNotMatching) {
return Status::OK();
}
- const NamespaceStringOrUUID dbAndUUID{coll->ns().db().toString(), coll->uuid()};
+ const NamespaceStringOrUUID dbAndUUID{coll->ns().dbName(), coll->uuid()};
const int numIndexes = coll->getIndexCatalog()->numIndexesTotal(opCtx);
while (true) {
@@ -254,7 +254,7 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx,
<< collectionUUID << ") is being dropped");
// Take an exclusive lock to finish the collection drop.
- optionalAutoDb.emplace(opCtx, startingNss.db(), MODE_IX);
+ optionalAutoDb.emplace(opCtx, startingNss.dbName(), MODE_IX);
collLock.emplace(opCtx, dbAndUUID, MODE_X);
// Abandon the snapshot as the index catalog will compare the in-memory state to the
diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp
index e3ee3c411f2..8b8777e0d23 100644
--- a/src/mongo/db/change_collection_expired_change_remover_test.cpp
+++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/catalog/catalog_test_fixture.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/plan_executor.h"
@@ -54,7 +55,8 @@ namespace mongo {
class ChangeCollectionExpiredChangeRemoverTest : public CatalogTestFixture {
protected:
ChangeCollectionExpiredChangeRemoverTest()
- : CatalogTestFixture(Options{}.useMockClock(true)), _tenantId(OID::gen()) {
+ : CatalogTestFixture(Options{}.useMockClock(true)),
+ _tenantId(change_stream_serverless_helpers::getTenantIdForTesting()) {
ChangeStreamChangeCollectionManager::create(getServiceContext());
}
@@ -67,7 +69,7 @@ protected:
}
void insertDocumentToChangeCollection(OperationContext* opCtx,
- boost::optional<TenantId> tenantId,
+ const TenantId& tenantId,
const BSONObj& obj) {
const auto wallTime = now();
Timestamp timestamp{wallTime};
@@ -78,6 +80,7 @@ protected:
oplogEntry.setNss(NamespaceString::makeChangeCollectionNSS(tenantId));
oplogEntry.setObject(obj);
oplogEntry.setWallClockTime(wallTime);
+
auto oplogEntryBson = oplogEntry.toBSON();
RecordData recordData{oplogEntryBson.objdata(), oplogEntryBson.objsize()};
@@ -112,8 +115,7 @@ protected:
return entries;
}
- void dropAndRecreateChangeCollection(OperationContext* opCtx,
- boost::optional<TenantId> tenantId) {
+ void dropAndRecreateChangeCollection(OperationContext* opCtx, const TenantId& tenantId) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.dropChangeCollection(opCtx, tenantId);
changeCollectionManager.createChangeCollection(opCtx, tenantId);
@@ -136,11 +138,13 @@ protected:
opCtx, &*changeCollection, maxRecordIdBound);
}
- const boost::optional<TenantId> _tenantId;
+ const TenantId _tenantId;
+ boost::optional<ChangeStreamChangeCollectionManager> _changeCollectionManager;
+
RAIIServerParameterControllerForTest featureFlagController{"featureFlagServerlessChangeStreams",
true};
-
- boost::optional<ChangeStreamChangeCollectionManager> _changeCollectionManager;
+ RAIIServerParameterControllerForTest queryKnobController{
+ "internalChangeStreamUseTenantIdForTesting", true};
};
// Tests that the last expired focument retrieved is the expected one.
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp
index 80a816945be..d84fb21b672 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.cpp
+++ b/src/mongo/db/change_collection_expired_documents_remover.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/change_streams_cluster_parameter_gen.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -56,25 +57,31 @@ MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForRemovingExpiredDocuments);
namespace {
-// TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is
-// available.
-std::vector<boost::optional<TenantId>> getAllTenants() {
- return {boost::none};
+change_stream_serverless_helpers::TenantSet getConfigDbTenants(OperationContext* opCtx) {
+ auto tenantIds = change_stream_serverless_helpers::getConfigDbTenants(opCtx);
+ if (auto testTenantId = change_stream_serverless_helpers::resolveTenantId(boost::none)) {
+ tenantIds.insert(*testTenantId);
+ }
+
+ return tenantIds;
}
-boost::optional<int64_t> getExpireAfterSeconds(boost::optional<TenantId> tid) {
+boost::optional<int64_t> getExpireAfterSeconds(const TenantId& tenantId) {
auto* clusterParameters = ServerParameterSet::getClusterParameterSet();
auto* changeStreamsParam =
clusterParameters->get<ClusterParameterWithStorage<ChangeStreamsClusterParameterStorage>>(
"changeStreams");
- return changeStreamsParam->getValue(tid).getExpireAfterSeconds();
+
+ // TODO SERVER-69511 Pass 'tenantId' instead of 'boost::none'. Move this function to
+ // 'change_stream_serverless_helpers'.
+ return changeStreamsParam->getValue(boost::none).getExpireAfterSeconds();
}
void removeExpiredDocuments(Client* client) {
// TODO SERVER-66717 Remove this logic from this method. Due to the delay in the feature flag
// activation it was placed here. The remover job should ultimately be initialized at the mongod
// startup when launched in serverless mode.
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
return;
}
@@ -98,7 +105,7 @@ void removeExpiredDocuments(Client* client) {
long long maxStartWallTime = 0;
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get());
- for (const auto& tenantId : getAllTenants()) {
+ for (const auto& tenantId : getConfigDbTenants(opCtx.get())) {
auto expiredAfterSeconds = getExpireAfterSeconds(tenantId);
invariant(expiredAfterSeconds);
@@ -169,13 +176,13 @@ void removeExpiredDocuments(Client* client) {
/**
* Defines a periodic background job to remove expired documents from change collections.
- * The job will run every 'changeCollectionRemoverJobSleepSeconds', as defined in the cluster
- * parameter.
+ * The job will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds', as defined in
+ * the cluster parameter.
*/
class ChangeCollectionExpiredDocumentsRemover {
public:
ChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) {
- const auto period = Seconds{gChangeCollectionRemoverJobSleepSeconds.load()};
+ const auto period = Seconds{gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds.load()};
_jobAnchor = serviceContext->getPeriodicRunner()->makeJob(
{"ChangeCollectionExpiredDocumentsRemover", removeExpiredDocuments, period});
_jobAnchor.start();
diff --git a/src/mongo/db/change_collection_expired_documents_remover.h b/src/mongo/db/change_collection_expired_documents_remover.h
index bf9e36ae1f4..3ce5fc1ef94 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.h
+++ b/src/mongo/db/change_collection_expired_documents_remover.h
@@ -35,7 +35,8 @@ namespace mongo {
/**
* Starts a periodic background job to remove expired documents from change collections. The job
- * will run every 'changeCollectionRemoverJobSleepSeconds' as defined in the cluster parameter.
+ * will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds' as defined in the cluster
+ * parameter.
*/
void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext);
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index ca2735db03c..eee89bf9e07 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -38,7 +38,7 @@
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
-#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/namespace_string.h"
@@ -46,29 +46,16 @@
#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/logv2/log.h"
namespace mongo {
-
-// Sharded clusters do not support serverless mode at present, but this failpoint allows us to
-// nonetheless test the behaviour of change collections in a sharded environment.
-MONGO_FAIL_POINT_DEFINE(forceEnableChangeCollectionsMode);
-
namespace {
const auto getChangeCollectionManager =
ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>();
/**
- * Returns the list of all tenant ids in the replica set.
- * TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is
- * available.
- */
-std::vector<boost::optional<TenantId>> getAllTenants() {
- return {boost::none};
-}
-
-/**
* Creates a Document object from the supplied oplog entry, performs necessary modifications to it
* and then returns it as a BSON object.
*/
@@ -88,13 +75,15 @@ class ChangeCollectionsWriter {
public:
explicit ChangeCollectionsWriter(const AutoGetChangeCollection::AccessMode& accessMode)
: _accessMode{accessMode} {}
+
/**
* Adds the insert statement for the provided tenant that will be written to the change
* collection when the 'write()' method is called.
*/
- void add(const TenantId& tenantId, InsertStatement insertStatement) {
- if (_shouldAddEntry(insertStatement)) {
- _tenantStatementsMap[tenantId].push_back(std::move(insertStatement));
+ void add(InsertStatement insertStatement) {
+ if (auto tenantId = _extractTenantId(insertStatement);
+ tenantId && _shouldAddEntry(insertStatement)) {
+ _tenantStatementsMap[*tenantId].push_back(std::move(insertStatement));
}
}
@@ -104,14 +93,12 @@ public:
*/
Status write(OperationContext* opCtx, OpDebug* opDebug) {
for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) {
- AutoGetChangeCollection tenantChangeCollection(
- opCtx, _accessMode, boost::none /* tenantId */);
+ AutoGetChangeCollection tenantChangeCollection(opCtx, _accessMode, tenantId);
// The change collection does not exist for a particular tenant because either the
// change collection is not enabled or is in the process of enablement. Ignore this
// insert for now.
- // TODO: SERVER-65950 move this check before inserting to the map
- // 'tenantToInsertStatements'.
+ // TODO SERVER-67170 Move this check before inserting to the map.
if (!tenantChangeCollection) {
continue;
}
@@ -127,9 +114,9 @@ public:
false /* fromMigrate */);
if (!status.isOK()) {
return Status(status.code(),
- str::stream()
- << "Write to change collection: " << tenantChangeCollection->ns()
- << "failed, reason: " << status.reason());
+ str::stream() << "Write to change collection: "
+ << tenantChangeCollection->ns().toStringWithTenantId()
+ << "failed, reason: " << status.reason());
}
}
@@ -137,12 +124,31 @@ public:
}
private:
+ boost::optional<TenantId> _extractTenantId(const InsertStatement& insertStatement) {
+ // Parse the oplog entry to fetch the tenant id from 'tid' field. The oplog entry will not
+ // written to the change collection if 'tid' field is missing.
+ auto& oplogDoc = insertStatement.doc;
+ if (auto tidFieldElem = oplogDoc.getField(repl::OplogEntry::kTidFieldName)) {
+ return TenantId{Value(tidFieldElem).getOid()};
+ }
+
+ if (MONGO_unlikely(internalChangeStreamUseTenantIdForTesting.load())) {
+ return change_stream_serverless_helpers::getTenantIdForTesting();
+ }
+
+ return boost::none;
+ }
+
bool _shouldAddEntry(const InsertStatement& insertStatement) {
auto& oplogDoc = insertStatement.doc;
- // TODO SERVER-65950 retreive tenant from the oplog.
// TODO SERVER-67170 avoid inspecting the oplog BSON object.
if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]) {
+ // Avoid writing entry with empty 'ns' field, for eg. 'periodic noop' entry.
+ if (nssFieldElem.String().empty()) {
+ return false;
+ }
+
if (nssFieldElem.String() == "config.$cmd"_sd) {
if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) {
// The oplog entry might be a drop command on the change collection. Check if
@@ -225,40 +231,8 @@ void ChangeStreamChangeCollectionManager::create(ServiceContext* service) {
getChangeCollectionManager(service).emplace(service);
}
-bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() {
- // A change collection must not be enabled on the config server.
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- return false;
- }
-
- // If the force fail point is enabled then declare the change collection mode as active.
- if (MONGO_unlikely(forceEnableChangeCollectionsMode.shouldFail())) {
- return true;
- }
-
- // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag.
- return serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
- serverGlobalParams.featureCompatibility);
-}
-
-bool ChangeStreamChangeCollectionManager::hasChangeCollection(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) const {
- auto catalog = CollectionCatalog::get(opCtx);
- return static_cast<bool>(catalog->lookupCollectionByNamespace(
- opCtx, NamespaceString::makeChangeCollectionNSS(tenantId)));
-}
-
-bool ChangeStreamChangeCollectionManager::isChangeStreamEnabled(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) const {
- // A change stream in the serverless is declared as enabled if both the change collection and
- // the pre-images collection exist for the provided tenant.
- return isChangeCollectionsModeActive() && hasChangeCollection(opCtx, tenantId) &&
- ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(opCtx, tenantId);
-}
-
-void ChangeStreamChangeCollectionManager::createChangeCollection(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+void ChangeStreamChangeCollectionManager::createChangeCollection(OperationContext* opCtx,
+ const TenantId& tenantId) {
// Make the change collection clustered by '_id'. The '_id' field will have the same value as
// the 'ts' field of the oplog.
CollectionOptions changeCollectionOptions;
@@ -268,13 +242,13 @@ void ChangeStreamChangeCollectionManager::createChangeCollection(
const auto status = createCollection(opCtx, changeCollNss, changeCollectionOptions, BSONObj());
uassert(status.code(),
- str::stream() << "Failed to create change collection: " << changeCollNss
- << causedBy(status.reason()),
+ str::stream() << "Failed to create change collection: "
+ << changeCollNss.toStringWithTenantId() << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceExists);
}
void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* opCtx,
- boost::optional<TenantId> tenantId) {
+ const TenantId& tenantId) {
DropReply dropReply;
const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId);
@@ -284,8 +258,8 @@ void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext*
&dropReply,
DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
uassert(status.code(),
- str::stream() << "Failed to drop change collection: " << changeCollNss
- << causedBy(status.reason()),
+ str::stream() << "Failed to drop change collection: "
+ << changeCollNss.toStringWithTenantId() << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceNotFound);
}
@@ -310,9 +284,7 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
// tenant.
auto changeCollDoc = createChangeCollectionEntryFromOplog(record.data.toBson());
- // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id.
changeCollectionsWriter.add(
- TenantId::kSystemTenantId,
InsertStatement{std::move(changeCollDoc), ts, repl::OpTime::kUninitializedTerm});
}
@@ -352,11 +324,8 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
auto changeCollDoc = createChangeCollectionEntryFromOplog(oplogDoc);
- // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id.
- changeCollectionsWriter.add(TenantId::kSystemTenantId,
- InsertStatement{std::move(changeCollDoc),
- oplogSlot.getTimestamp(),
- oplogSlot.getTerm()});
+ changeCollectionsWriter.add(InsertStatement{
+ std::move(changeCollDoc), oplogSlot.getTimestamp(), oplogSlot.getTerm()});
}
// Write documents to change collections.
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 49ff64d635b..82d9fc01590 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -113,34 +113,19 @@ public:
static ChangeStreamChangeCollectionManager& get(OperationContext* opCtx);
/**
- * Returns true if the server is configured such that change collections can be used to record
- * oplog entries; ie, we are running in a Serverless context. Returns false otherwise.
- */
- static bool isChangeCollectionsModeActive();
-
- /**
- * Returns true if the change collection is present for the specified tenant, false otherwise.
- */
- bool hasChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId) const;
-
- /**
* Returns true if the change stream is enabled for the provided tenant, false otherwise.
*/
bool isChangeStreamEnabled(OperationContext* opCtx, boost::optional<TenantId> tenantId) const;
/**
* Creates a change collection for the specified tenant, if it doesn't exist.
- *
- * TODO: SERVER-65950 make tenantId field mandatory.
*/
- void createChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+ void createChangeCollection(OperationContext* opCtx, const TenantId& tenantId);
/**
* Deletes the change collection for the specified tenant, if it already exist.
- *
- * TODO: SERVER-65950 make tenantId field mandatory.
*/
- void dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+ void dropChangeCollection(OperationContext* opCtx, const TenantId& tenantId);
/**
* Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog
@@ -152,8 +137,6 @@ public:
*
* Failure in insertion to any change collection will result in a fatal exception and will bring
* down the node.
- *
- * TODO: SERVER-65950 make tenantId field mandatory.
*/
void insertDocumentsToChangeCollection(OperationContext* opCtx,
const std::vector<Record>& oplogRecords,
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
index 3b82e7a7c46..25a12d7636a 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
@@ -35,7 +35,9 @@
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_options_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/concurrency/lock_manager_defs.h"
#include "mongo/db/concurrency/locker.h"
@@ -47,7 +49,6 @@
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/idle_thread_block.h"
-#include "mongo/util/fail_point.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
@@ -118,7 +119,8 @@ void ChangeStreamPreImagesCollectionManager::createPreImagesCollection(
opCtx, preImagesCollectionNamespace, preImagesCollectionOptions, BSONObj());
uassert(status.code(),
str::stream() << "Failed to create the pre-images collection: "
- << preImagesCollectionNamespace.coll() << causedBy(status.reason()),
+ << preImagesCollectionNamespace.toStringWithTenantId()
+ << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceExists);
}
@@ -133,7 +135,8 @@ void ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(
DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
uassert(status.code(),
str::stream() << "Failed to drop the pre-images collection: "
- << preImagesCollectionNamespace.coll() << causedBy(status.reason()),
+ << preImagesCollectionNamespace.toStringWithTenantId()
+ << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceNotFound);
}
@@ -148,11 +151,13 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op
<< preImage.getId().getApplyOpsIndex(),
preImage.getId().getApplyOpsIndex() >= 0);
+ // TODO SERVER-66642 Consider using internal test-tenant id if applicable.
+ const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId);
+
// This lock acquisition can block on a stronger lock held by another operation modifying
// the pre-images collection. There are no known cases where an operation holding an
// exclusive lock on the pre-images collection also waits for oplog visibility.
AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
- const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId);
AutoGetCollection preImagesCollectionRaii(
opCtx, preImagesCollectionNamespace, LockMode::MODE_IX);
auto& changeStreamPreImagesCollection = preImagesCollectionRaii.getCollection();
@@ -173,13 +178,6 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op
uassertStatusOK(insertionStatus);
}
-bool ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) {
- auto catalog = CollectionCatalog::get(opCtx);
- return static_cast<bool>(catalog->lookupCollectionByNamespace(
- opCtx, NamespaceString::makePreImageCollectionNSS(tenantId)));
-}
-
namespace {
RecordId toRecordId(ChangeStreamPreImageId id) {
return record_id_helpers::keyForElem(
@@ -408,8 +406,9 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
// Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
// doesn't exist.
+ // TODO SERVER-66642 Account for multitenancy.
AutoGetCollection autoColl(
- opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX);
+ opCtx.get(), NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX);
const auto& preImagesColl = autoColl.getCollection();
if (!preImagesColl) {
return;
@@ -436,11 +435,12 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
change_stream_pre_image_helpers::getPreImageExpirationTime(
opCtx.get(), currentTimeForTimeBasedExpiration));
+ // TODO SERVER-66642 Account for multitenancy.
for (const auto& collectionRange : expiredPreImages) {
writeConflictRetry(
opCtx.get(),
"ChangeStreamExpiredPreImagesRemover",
- NamespaceString::kChangeStreamPreImagesNamespace.ns(),
+ NamespaceString::makePreImageCollectionNSS(boost::none).ns(),
[&] {
auto params = std::make_unique<DeleteStageParams>();
params->isMulti = true;
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h
index dede0e38c96..75efb28c22d 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.h
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.h
@@ -89,13 +89,6 @@ public:
const ChangeStreamPreImage& preImage);
/**
- * Returns true if the pre-images collection exists, false otherwise. If 'tenantId' is provided
- * then the pre-images collection associated with that tenant will be checked for existence,
- * otherwise the default pre-images collection will be checked.
- */
- static bool hasPreImagesCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
-
- /**
* Scans the system pre-images collection and deletes the expired pre-images from it.
*/
static void performExpiredChangeStreamPreImagesRemovalPass(Client* client);
diff --git a/src/mongo/db/change_stream_serverless_helpers.cpp b/src/mongo/db/change_stream_serverless_helpers.cpp
new file mode 100644
index 00000000000..0577894e397
--- /dev/null
+++ b/src/mongo/db/change_stream_serverless_helpers.cpp
@@ -0,0 +1,101 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+#include "mongo/db/change_stream_serverless_helpers.h"
+
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/server_feature_flags_gen.h"
+#include "mongo/db/server_options.h"
+
+namespace mongo {
+namespace change_stream_serverless_helpers {
+
+bool isChangeCollectionsModeActive() {
+ // A change collection must not be enabled on the config server.
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return false;
+ }
+
+ // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag.
+ return serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
+ serverGlobalParams.featureCompatibility);
+}
+
+bool isChangeStreamEnabled(OperationContext* opCtx, const TenantId& tenantId) {
+ auto catalog = CollectionCatalog::get(opCtx);
+
+ // A change stream in the serverless is declared as enabled if both the change collection and
+ // the pre-images collection exist for the provided tenant.
+ // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection.
+ return isChangeCollectionsModeActive() &&
+ static_cast<bool>(catalog->lookupCollectionByNamespace(
+ opCtx, NamespaceString::makeChangeCollectionNSS(tenantId))) &&
+ static_cast<bool>(catalog->lookupCollectionByNamespace(
+ opCtx, NamespaceString::makePreImageCollectionNSS(boost::none)));
+}
+
+const TenantId& getTenantIdForTesting() {
+ static const TenantId kTestTenantId(
+ OID("00000000" /* timestamp */
+ "0000000000" /* process id */
+ "000000" /* counter */));
+
+ return kTestTenantId;
+}
+
+boost::optional<TenantId> resolveTenantId(boost::optional<TenantId> tenantId) {
+ if (tenantId) {
+ return tenantId;
+ } else if (MONGO_unlikely(internalChangeStreamUseTenantIdForTesting.load())) {
+ return getTenantIdForTesting();
+ }
+
+ return tenantId;
+}
+
+TenantSet getConfigDbTenants(OperationContext* opCtx) {
+ TenantSet tenantIds;
+
+ auto dbNames = CollectionCatalog::get(opCtx)->getAllDbNames();
+ for (auto&& dbName : dbNames) {
+ if (dbName.db() == NamespaceString::kConfigDb && dbName.tenantId()) {
+ tenantIds.insert(*dbName.tenantId());
+ }
+ }
+
+ return tenantIds;
+}
+
+} // namespace change_stream_serverless_helpers
+} // namespace mongo
diff --git a/src/mongo/db/change_stream_serverless_helpers.h b/src/mongo/db/change_stream_serverless_helpers.h
new file mode 100644
index 00000000000..bdeb04f3ff7
--- /dev/null
+++ b/src/mongo/db/change_stream_serverless_helpers.h
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional/optional.hpp>
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/tenant_id.h"
+
+namespace mongo {
+namespace change_stream_serverless_helpers {
+
+using TenantSet = stdx::unordered_set<TenantId, TenantId::Hasher>;
+
+/**
+ * Returns true if the server is configured such that change collections can be used to record
+ * oplog entries; ie, we are running in a Serverless context. Returns false otherwise.
+ */
+bool isChangeCollectionsModeActive();
+
+/**
+ * Returns true if the change stream is enabled for the provided tenant, false otherwise.
+ */
+bool isChangeStreamEnabled(OperationContext* opCtx, const TenantId& tenantId);
+
+/**
+ * Returns an internal tenant id that will be used for testing purposes. This tenant id will not
+ * conflict with any other tenant id.
+ */
+const TenantId& getTenantIdForTesting();
+
+/**
+ * If the provided 'tenantId' is missing and 'internalChangeStreamUseTenantIdForTesting' is true,
+ * returns a special 'TenantId' for testing purposes. Otherwise, returns the provided 'tenantId'.
+ */
+boost::optional<TenantId> resolveTenantId(boost::optional<TenantId> tenantId);
+
+/**
+ * Returns the list of the tenants associated with a 'config' database.
+ */
+TenantSet getConfigDbTenants(OperationContext* opCtx);
+
+} // namespace change_stream_serverless_helpers
+} // namespace mongo
diff --git a/src/mongo/db/change_streams_cluster_parameter.idl b/src/mongo/db/change_streams_cluster_parameter.idl
index 466e1c0345a..899018d04df 100644
--- a/src/mongo/db/change_streams_cluster_parameter.idl
+++ b/src/mongo/db/change_streams_cluster_parameter.idl
@@ -58,11 +58,11 @@ server_parameters:
cpp_varname: gChangeStreamsClusterParameter
validator:
callback: validateChangeStreamsClusterParameter
- changeCollectionRemoverJobSleepSeconds:
+ changeCollectionExpiredDocumentsRemoverJobSleepSeconds:
description: "Specifies the number of seconds for which the periodic change collection remover job will sleep between each cycle."
set_at: [ startup ]
cpp_vartype: AtomicWord<int>
- cpp_varname: "gChangeCollectionRemoverJobSleepSeconds"
+ cpp_varname: "gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds"
validator:
gte: 1
default: 10
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index c26e7dfc7c0..77573fa709c 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -354,6 +354,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/command_can_run_here',
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
diff --git a/src/mongo/db/commands/change_stream_state_command.cpp b/src/mongo/db/commands/change_stream_state_command.cpp
index de8d98083ea..44d186c03a1 100644
--- a/src/mongo/db/commands/change_stream_state_command.cpp
+++ b/src/mongo/db/commands/change_stream_state_command.cpp
@@ -30,8 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/change_stream_change_collection_manager.h"
-#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/change_stream_state_gen.h"
#include "mongo/db/commands.h"
#include "mongo/db/set_change_stream_state_coordinator.h"
@@ -40,7 +39,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
namespace mongo {
-
namespace {
/**
@@ -67,6 +65,10 @@ public:
" enabled: enable or disable the change stream";
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
@@ -74,18 +76,19 @@ public:
void typedRun(OperationContext* opCtx) {
uassert(ErrorCodes::CommandNotSupported,
str::stream() << SetChangeStreamStateCommandRequest::kCommandName
- << " is only supported in the serverless",
- ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive());
+ << " command is only supported in serverless",
+ change_stream_serverless_helpers::isChangeCollectionsModeActive());
- // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be
- // present. Remove 'getDollarTenant()' and fetch tenant from dbName().
- const std::string tenantId = request().getDollarTenant()
- ? request().getDollarTenant()->toString()
- : TenantId::kSystemTenantId.toString();
+ const auto tenantId =
+ change_stream_serverless_helpers::resolveTenantId(request().getDbName().tenantId());
+ uassert(ErrorCodes::BadValue,
+ str::stream() << SetChangeStreamStateCommandRequest::kCommandName
+ << " command must be provided with a tenant id",
+ tenantId);
// Prepare the payload for the 'SetChangeStreamStateCoordinator'.
SetChangeStreamStateCoordinatorId coordinatorId;
- coordinatorId.setTenantId({TenantId{OID(tenantId)}});
+ coordinatorId.setTenantId(tenantId);
SetChangeStreamStateCoordinatorDocument coordinatorDoc{
coordinatorId, request().getChangeStreamStateParameters().toBSON()};
@@ -134,6 +137,10 @@ public:
" {getChangeStreamState: 1}";
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
@@ -141,17 +148,20 @@ public:
auto typedRun(OperationContext* opCtx) {
uassert(ErrorCodes::CommandNotSupported,
str::stream() << GetChangeStreamStateCommandRequest::kCommandName
- << " is only supported in the serverless",
- ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive());
+ << " command is only supported in serverless",
+ change_stream_serverless_helpers::isChangeCollectionsModeActive());
+
+ const auto tenantId =
+ change_stream_serverless_helpers::resolveTenantId(request().getDbName().tenantId());
+ uassert(ErrorCodes::BadValue,
+ str::stream() << GetChangeStreamStateCommandRequest::kCommandName
+ << " command must be provided with a tenant id",
+ tenantId);
- // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be
- // present.
- boost::optional<TenantId> tenantId = boost::none;
// Set the change stream enablement state in the 'reply' object.
- auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
GetChangeStreamStateCommandRequest::Reply reply{
- changeCollectionManager.isChangeStreamEnabled(opCtx, tenantId)};
+ change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId)};
return reply;
}
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp
index eb3c3567077..29c85819230 100644
--- a/src/mongo/db/commands/dbcommands.cpp
+++ b/src/mongo/db/commands/dbcommands.cpp
@@ -188,6 +188,10 @@ public:
bool collectsResourceConsumptionMetrics() const final {
return true;
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBaseGen {
public:
using InvocationBaseGen::InvocationBaseGen;
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 4fe8a736ad7..b7bd221ec61 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/curop.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
@@ -757,13 +758,20 @@ Status runAggregate(OperationContext* opCtx,
nss = NamespaceString::kRsOplogNamespace;
// In case of serverless the change stream will be opened on the change collection.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
- auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
+ const auto tenantId =
+ change_stream_serverless_helpers::resolveTenantId(origNss.tenantId());
+
+ uassert(ErrorCodes::BadValue,
+ "Change streams cannot be used without tenant id",
+ tenantId);
+
uassert(ErrorCodes::ChangeStreamNotEnabled,
"Change streams must be enabled before being used.",
- changeCollectionManager.isChangeStreamEnabled(opCtx, origNss.tenantId()));
+ change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId));
+
- nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId());
+ nss = NamespaceString::makeChangeCollectionNSS(tenantId);
}
// Assert that a change stream on the config server is always opened on the oplog.
@@ -785,7 +793,7 @@ Status runAggregate(OperationContext* opCtx,
LOGV2_INFO(6689200,
"Opening change stream on the namespace: {nss}",
"Opening change stream",
- "nss"_attr = nss.toString());
+ "nss"_attr = nss.toStringWithTenantId());
}
// Upgrade and wait for read concern if necessary.
diff --git a/src/mongo/db/commands/set_cluster_parameter_command.cpp b/src/mongo/db/commands/set_cluster_parameter_command.cpp
index 08ae1b2835e..8c49717baad 100644
--- a/src/mongo/db/commands/set_cluster_parameter_command.cpp
+++ b/src/mongo/db/commands/set_cluster_parameter_command.cpp
@@ -66,6 +66,10 @@ public:
return "Set cluster parameter on replica set or node";
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index ce879e0ee2a..9240c572a54 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -87,6 +87,10 @@ public:
return true;
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBaseGen {
public:
using InvocationBaseGen::InvocationBaseGen;
@@ -200,6 +204,10 @@ public:
return true;
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBaseGen {
public:
using InvocationBaseGen::InvocationBaseGen;
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 8cbe5909154..f1a5d2a4871 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -349,7 +349,7 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) {
}
}
- // TODO SERVER-65950 create 'SetChangeStreamStateCoordinatorService' only in the serverless.
+ // TODO SERVER-66717 create 'SetChangeStreamStateCoordinatorService' only in the serverless.
services.push_back(std::make_unique<SetChangeStreamStateCoordinatorService>(serviceContext));
for (auto& service : services) {
@@ -790,6 +790,8 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
repl::ReplicationCoordinator::modeNone;
if (!isStandalone) {
startChangeStreamExpiredPreImagesRemover(serviceContext);
+ // TODO SERVER-66717 Start 'startChangeCollectionExpiredDocumentsRemover' only in the
+ // serverless.
startChangeCollectionExpiredDocumentsRemover(serviceContext);
}
@@ -1561,7 +1563,7 @@ int mongod_main(int argc, char* argv[]) {
ReadWriteConcernDefaults::create(service, readWriteConcernDefaultsCacheLookupMongoD);
ChangeStreamOptionsManager::create(service);
- // TODO SERVER-65950 create 'ChangeStreamChangeCollectionManager' only in the serverless.
+ // TODO SERVER-66717 Create 'ChangeStreamChangeCollectionManager' only in the serverless.
ChangeStreamChangeCollectionManager::create(service);
#if defined(_WIN32)
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 5cfd942a8f3..a3a28901993 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -111,8 +111,6 @@ const NamespaceString NamespaceString::kSystemReplSetNamespace(NamespaceString::
"system.replset");
const NamespaceString NamespaceString::kLastVoteNamespace(NamespaceString::kLocalDb,
"replset.election");
-const NamespaceString NamespaceString::kChangeStreamPreImagesNamespace(NamespaceString::kConfigDb,
- "system.preimages");
const NamespaceString NamespaceString::kIndexBuildEntryNamespace(NamespaceString::kConfigDb,
"system.indexBuilds");
const NamespaceString NamespaceString::kRangeDeletionNamespace(NamespaceString::kConfigDb,
@@ -338,8 +336,7 @@ NamespaceString NamespaceString::makeCollectionlessAggregateNSS(const DatabaseNa
NamespaceString NamespaceString::makeChangeCollectionNSS(
const boost::optional<TenantId>& tenantId) {
- // TODO: SERVER-65950 create namespace for a particular tenant.
- return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName};
+ return NamespaceString{tenantId, kConfigDb, kChangeCollectionName};
}
NamespaceString NamespaceString::makeGlobalIndexNSS(const UUID& id) {
@@ -350,8 +347,7 @@ NamespaceString NamespaceString::makeGlobalIndexNSS(const UUID& id) {
NamespaceString NamespaceString::makePreImageCollectionNSS(
const boost::optional<TenantId>& tenantId) {
- return tenantId ? NamespaceString(tenantId, kConfigDb, "system.preimages")
- : kChangeStreamPreImagesNamespace;
+ return NamespaceString{tenantId, kConfigDb, kPreImagesCollectionName};
}
std::string NamespaceString::getSisterNS(StringData local) const {
@@ -469,11 +465,11 @@ bool NamespaceString::isTimeseriesBucketsCollection() const {
}
bool NamespaceString::isChangeStreamPreImagesCollection() const {
- return ns() == kChangeStreamPreImagesNamespace.ns();
+ return _dbName.db() == kConfigDb && coll() == kPreImagesCollectionName;
}
bool NamespaceString::isChangeCollection() const {
- return db() == kConfigDb && coll() == kChangeCollectionName;
+ return _dbName.db() == kConfigDb && coll() == kChangeCollectionName;
}
bool NamespaceString::isConfigImagesCollection() const {
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 79da7a0f8ae..da20bca25d2 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -82,7 +82,10 @@ public:
// Name for the system.js collection
static constexpr StringData kSystemDotJavascriptCollectionName = "system.js"_sd;
- // Name for the change stream change collection.
+ // Name of the pre-images collection.
+ static constexpr StringData kPreImagesCollectionName = "system.preimages"_sd;
+
+ // Name of the change stream change collection.
static constexpr StringData kChangeCollectionName = "system.change_collection"_sd;
// Names of privilege document collections
@@ -171,9 +174,6 @@ public:
// Namespace for storing the last replica set election vote.
static const NamespaceString kLastVoteNamespace;
- // Namespace for change stream pre-images collection.
- static const NamespaceString kChangeStreamPreImagesNamespace;
-
// Namespace for index build entries.
static const NamespaceString kIndexBuildEntryNamespace;
diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp
index 6e120317ca8..bb54d4bc07a 100644
--- a/src/mongo/db/op_observer/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp
@@ -225,7 +225,7 @@ protected:
reset(opCtx, NamespaceString::kRsOplogNamespace);
reset(opCtx, NamespaceString::kSessionTransactionsTableNamespace);
reset(opCtx, NamespaceString::kConfigImagesNamespace);
- reset(opCtx, NamespaceString::kChangeStreamPreImagesNamespace);
+ reset(opCtx, NamespaceString::makePreImageCollectionNSS(boost::none));
}
// Assert that the oplog has the expected number of entries, and return them
@@ -288,7 +288,7 @@ protected:
bool didWriteDeletedDocToPreImagesCollection(OperationContext* opCtx,
const ChangeStreamPreImageId preImageId) {
AutoGetCollection preImagesCollection(
- opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IS);
+ opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), LockMode::MODE_IS);
const auto preImage = Helpers::findOneForTesting(
opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON()), false);
return !preImage.isEmpty();
@@ -323,7 +323,7 @@ protected:
const ChangeStreamPreImageId& preImageId,
BSONObj* container) {
AutoGetCollection preImagesCollection(
- opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IS);
+ opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), LockMode::MODE_IS);
*container = Helpers::findOneForTesting(opCtx,
preImagesCollection.getCollection(),
BSON("_id" << preImageId.toBSON()))
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
index 583e3198a42..78d7e9f19cf 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
@@ -120,9 +120,11 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamAddPreImage::doGetNext()
boost::optional<Document> DocumentSourceChangeStreamAddPreImage::lookupPreImage(
boost::intrusive_ptr<ExpressionContext> pExpCtx, const Document& preImageId) {
// Look up the pre-image document on the local node by id.
+ // TODO SERVER-66642 Consider using internal test-tenant id if applicable.
+ const auto tenantId = pExpCtx->ns.tenantId();
auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocumentLocally(
pExpCtx,
- NamespaceString::kChangeStreamPreImagesNamespace,
+ NamespaceString::makePreImageCollectionNSS(tenantId),
Document{{ChangeStreamPreImage::kIdFieldName, preImageId}});
// Return boost::none to signify that we failed to find the pre-image.
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 9444a694c81..68d03d0c96a 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1544,7 +1544,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
return (ns.isLocal() || ns.isConfigDotCacheDotChunks() ||
ns.isReshardingLocalOplogBufferCollection() ||
ns == NamespaceString::kConfigImagesNamespace ||
- ns == NamespaceString::kChangeStreamPreImagesNamespace);
+ ns.isChangeStreamPreImagesCollection());
};
if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed ||
diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl
index efbe81f8501..e36a60c4df3 100644
--- a/src/mongo/db/query/query_knobs.idl
+++ b/src/mongo/db/query/query_knobs.idl
@@ -906,6 +906,15 @@ server_parameters:
default:
expr: false
+ # TODO SERVER-68341 Remove this query knob after tenancy is supported in the sharded cluster.
+ internalChangeStreamUseTenantIdForTesting:
+ description: "If true, then change streams will operate upon an internal tenant id for testing
+ purposes if the actual tenant is not provided."
+ set_at: [ startup ]
+ cpp_varname: "internalChangeStreamUseTenantIdForTesting"
+ cpp_vartype: AtomicWord<bool>
+ default: false
+
# Note for adding additional query knobs:
#
# When adding a new query knob, you should consider whether or not you need to add an 'on_update'
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 7185e7e77d6..d91977bac6c 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -75,6 +75,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
@@ -634,6 +635,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/collection_crud',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/commands/mongod_fsync',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
@@ -1502,6 +1504,7 @@ env.Library(
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/auth/auth',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/cloner',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/curop',
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index b17e5f108ec..5c4ec5d5dab 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/client.h"
#include "mongo/db/coll_mod_gen.h"
#include "mongo/db/commands.h"
@@ -390,7 +391,7 @@ void _logOpsInner(OperationContext* opCtx,
}
// Insert the oplog records to the respective tenants change collections.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
opCtx, *records, timestamps);
}
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 032e9109f14..51eebc457bc 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/db_raii.h"
@@ -150,7 +151,7 @@ Status _insertDocumentsToOplogAndChangeCollections(
// Write the corresponding oplog entries to tenants respective change
// collections in the serverless.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
auto status =
ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
opCtx,
@@ -416,8 +417,7 @@ void scheduleWritesToOplogAndChangeCollection(OperationContext* opCtx,
bool skipWritesToOplog) {
// Skip performing any writes during the startup recovery when running in the non-serverless
// environment.
- if (skipWritesToOplog &&
- !ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (skipWritesToOplog && !change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
return;
}
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index d644d5ad734..842cca1bc65 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -481,10 +481,10 @@ TEST_F(OplogApplierImplTest, applyOplogEntryToRecordChangeStreamPreImages) {
WriteUnitOfWork wuow{_opCtx.get()};
ChangeStreamPreImageId preImageId{*(options.uuid), op.getOpTime().getTimestamp(), 0};
BSONObj preImageDocumentKey = BSON("_id" << preImageId.toBSON());
- auto preImageLoadResult =
- getStorageInterface()->deleteById(_opCtx.get(),
- NamespaceString::kChangeStreamPreImagesNamespace,
- preImageDocumentKey.firstElement());
+ auto preImageLoadResult = getStorageInterface()->deleteById(
+ _opCtx.get(),
+ NamespaceString::makePreImageCollectionNSS(boost::none),
+ preImageDocumentKey.firstElement());
repl::getNextOpTime(_opCtx.get());
wuow.commit();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 8da1bcc1f5c..71bcbe60296 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/catalog/local_oplog_info.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/commands/rwc_defaults_commands_gen.h"
@@ -541,7 +542,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
});
// Create the pre-images collection if it doesn't exist yet in the non-serverless environment.
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
ChangeStreamPreImagesCollectionManager::createPreImagesCollection(
opCtx, boost::none /* tenantId */);
}
diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp
index a0ee72fac51..9191769214c 100644
--- a/src/mongo/db/set_change_stream_state_coordinator.cpp
+++ b/src/mongo/db/set_change_stream_state_coordinator.cpp
@@ -34,6 +34,8 @@
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/change_stream_state_gen.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/op_observer/op_observer.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/logv2/log.h"
@@ -77,15 +79,15 @@ public:
const auto setChangeStreamParameter = ChangeStreamStateParameters::parse(
IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand());
- invariant(_stateDoc.getId().getTenantId());
-
- // TODO SERVER-65950 replace 'tenantId' with the provided tenant id.
- auto tenantId = boost::none;
+ // A tenant's change collection and the pre-images collection are always associated with a
+ // tenant id.
+ const auto tenantId = _stateDoc.getId().getTenantId();
+ tassert(6664100, "Tenant id is missing", tenantId);
if (setChangeStreamParameter.getEnabled()) {
- _enableChangeStream(opCtx, tenantId);
+ _enableChangeStream(opCtx, *tenantId);
} else {
- _disableChangeStream(opCtx, tenantId);
+ _disableChangeStream(opCtx, *tenantId);
}
}
@@ -94,11 +96,38 @@ private:
* Enables the change stream in the serverless by creating the change collection and the
* pre-image collection.
*/
- void _enableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ void _enableChangeStream(OperationContext* opCtx, const TenantId& tenantId) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.createChangeCollection(opCtx, tenantId);
- ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId);
+ // TODO SERVER-66643 Remove this code. A change collection must have atleast one entry for
+ // the change stream to advance. As such artifically create any oplog entry such that it
+ // will be captured by the change collection. With SERVER-66643, the pre-images collection
+ // 'create' oplog entry will be auto captured by the change collection and hence writing
+ // this entry will not be required. Also remove the necessary header and linked library
+ // after removing this code.
+ [&]() {
+ writeConflictRetry(
+ opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ Lock::GlobalLock lock(opCtx, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ opCtx,
+ NamespaceString::makeChangeCollectionNSS(tenantId),
+ boost::none,
+ BSON("msg"
+ << "enable change stream"),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ wuow.commit();
+ });
+ }();
+
+ // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection.
+ ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, boost::none);
// Wait until the create requests are majority committed.
waitForMajority(opCtx);
@@ -108,11 +137,12 @@ private:
* Disables the change stream in the serverless by dropping the change collection and the
* pre-image collection.
*/
- void _disableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ void _disableChangeStream(OperationContext* opCtx, const TenantId& tenantId) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.dropChangeCollection(opCtx, tenantId);
- ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId);
+ // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection.
+ ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, boost::none);
// Wait until the drop requests are majority committed.
waitForMajority(opCtx);
diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript
index 01242257d02..a4378a89caf 100644
--- a/src/mongo/db/stats/SConscript
+++ b/src/mongo/db/stats/SConscript
@@ -90,6 +90,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/commands/server_status_core',
'$BUILD_DIR/mongo/db/server_base',
],
diff --git a/src/mongo/db/stats/change_collection_server_status.cpp b/src/mongo/db/stats/change_collection_server_status.cpp
index ee424a4ae43..f7d7f75a75c 100644
--- a/src/mongo/db/stats/change_collection_server_status.cpp
+++ b/src/mongo/db/stats/change_collection_server_status.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/commands/server_status.h"
namespace mongo {
@@ -49,7 +50,7 @@ public:
const BSONElement& configElement,
BSONObjBuilder* result) const override {
// Append the section only when running in serverless.
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
return;
}