diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2022-09-15 10:27:24 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-15 11:29:18 +0000 |
commit | e6b184b48b2f4ceaff580c98c24e14eac26e2c03 (patch) | |
tree | 27410d5d07867ef6be3026cb69a9a9821e03e254 /src/mongo | |
parent | 0797ff28efcd7cb954b88658425b7b38c980b605 (diff) | |
download | mongo-e6b184b48b2f4ceaff580c98c24e14eac26e2c03.tar.gz |
SERVER-66641 Introduce multi-tenancy for change collections.
Diffstat (limited to 'src/mongo')
33 files changed, 427 insertions, 197 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 57c4feedb00..c0514e4dd44 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -511,7 +511,9 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/change_stream_state', + '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/db/repl/primary_only_service', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', @@ -524,6 +526,21 @@ env.Library( ) env.Library( + target='change_stream_serverless_helpers', + source=[ + 'change_stream_serverless_helpers.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/collection', + '$BUILD_DIR/mongo/db/catalog/collection_catalog', + '$BUILD_DIR/mongo/db/query/query_knobs', + '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/server_options', + '$BUILD_DIR/mongo/idl/feature_flag', + ], +) + +env.Library( target='change_stream_change_collection_manager', source=[ 'change_stream_change_collection_manager.cpp', @@ -532,9 +549,10 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/clustered_collection_options', '$BUILD_DIR/mongo/db/catalog/collection_crud', - '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/dbhelpers', + '$BUILD_DIR/mongo/db/server_feature_flags', '$BUILD_DIR/mongo/db/service_context', ], ) @@ -546,6 +564,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/change_streams_cluster_parameter', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/util/periodic_runner', @@ -2515,6 +2534,7 @@ if wiredtiger: '$BUILD_DIR/mongo/db/catalog/local_oplog_info', '$BUILD_DIR/mongo/db/change_collection_expired_change_remover', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/change_streams_cluster_parameter', '$BUILD_DIR/mongo/db/commands/create_command', '$BUILD_DIR/mongo/db/mongohasher', diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp index 80ff680df43..5e9089fce0c 100644 --- a/src/mongo/db/catalog/drop_collection.cpp +++ b/src/mongo/db/catalog/drop_collection.cpp @@ -233,7 +233,7 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx, if (dropIfUUIDNotMatching && collectionUUID == *dropIfUUIDNotMatching) { return Status::OK(); } - const NamespaceStringOrUUID dbAndUUID{coll->ns().db().toString(), coll->uuid()}; + const NamespaceStringOrUUID dbAndUUID{coll->ns().dbName(), coll->uuid()}; const int numIndexes = coll->getIndexCatalog()->numIndexesTotal(opCtx); while (true) { @@ -254,7 +254,7 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx, << collectionUUID << ") is being dropped"); // Take an exclusive lock to finish the collection drop. - optionalAutoDb.emplace(opCtx, startingNss.db(), MODE_IX); + optionalAutoDb.emplace(opCtx, startingNss.dbName(), MODE_IX); collLock.emplace(opCtx, dbAndUUID, MODE_X); // Abandon the snapshot as the index catalog will compare the in-memory state to the diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp index e3ee3c411f2..8b8777e0d23 100644 --- a/src/mongo/db/change_collection_expired_change_remover_test.cpp +++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/catalog/catalog_test_fixture.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/plan_executor.h" @@ -54,7 +55,8 @@ namespace mongo { class ChangeCollectionExpiredChangeRemoverTest : public CatalogTestFixture { protected: ChangeCollectionExpiredChangeRemoverTest() - : CatalogTestFixture(Options{}.useMockClock(true)), _tenantId(OID::gen()) { + : CatalogTestFixture(Options{}.useMockClock(true)), + _tenantId(change_stream_serverless_helpers::getTenantIdForTesting()) { ChangeStreamChangeCollectionManager::create(getServiceContext()); } @@ -67,7 +69,7 @@ protected: } void insertDocumentToChangeCollection(OperationContext* opCtx, - boost::optional<TenantId> tenantId, + const TenantId& tenantId, const BSONObj& obj) { const auto wallTime = now(); Timestamp timestamp{wallTime}; @@ -78,6 +80,7 @@ protected: oplogEntry.setNss(NamespaceString::makeChangeCollectionNSS(tenantId)); oplogEntry.setObject(obj); oplogEntry.setWallClockTime(wallTime); + auto oplogEntryBson = oplogEntry.toBSON(); RecordData recordData{oplogEntryBson.objdata(), oplogEntryBson.objsize()}; @@ -112,8 +115,7 @@ protected: return entries; } - void dropAndRecreateChangeCollection(OperationContext* opCtx, - boost::optional<TenantId> tenantId) { + void dropAndRecreateChangeCollection(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.dropChangeCollection(opCtx, tenantId); changeCollectionManager.createChangeCollection(opCtx, tenantId); @@ -136,11 +138,13 @@ protected: opCtx, &*changeCollection, maxRecordIdBound); } - const boost::optional<TenantId> _tenantId; + const TenantId _tenantId; + boost::optional<ChangeStreamChangeCollectionManager> _changeCollectionManager; + RAIIServerParameterControllerForTest featureFlagController{"featureFlagServerlessChangeStreams", true}; - - boost::optional<ChangeStreamChangeCollectionManager> _changeCollectionManager; + RAIIServerParameterControllerForTest queryKnobController{ + "internalChangeStreamUseTenantIdForTesting", true}; }; // Tests that the last expired focument retrieved is the expected one. diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp index 80a816945be..d84fb21b672 100644 --- a/src/mongo/db/change_collection_expired_documents_remover.cpp +++ b/src/mongo/db/change_collection_expired_documents_remover.cpp @@ -31,6 +31,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/change_streams_cluster_parameter_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/replication_coordinator.h" @@ -56,25 +57,31 @@ MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForRemovingExpiredDocuments); namespace { -// TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is -// available. -std::vector<boost::optional<TenantId>> getAllTenants() { - return {boost::none}; +change_stream_serverless_helpers::TenantSet getConfigDbTenants(OperationContext* opCtx) { + auto tenantIds = change_stream_serverless_helpers::getConfigDbTenants(opCtx); + if (auto testTenantId = change_stream_serverless_helpers::resolveTenantId(boost::none)) { + tenantIds.insert(*testTenantId); + } + + return tenantIds; } -boost::optional<int64_t> getExpireAfterSeconds(boost::optional<TenantId> tid) { +boost::optional<int64_t> getExpireAfterSeconds(const TenantId& tenantId) { auto* clusterParameters = ServerParameterSet::getClusterParameterSet(); auto* changeStreamsParam = clusterParameters->get<ClusterParameterWithStorage<ChangeStreamsClusterParameterStorage>>( "changeStreams"); - return changeStreamsParam->getValue(tid).getExpireAfterSeconds(); + + // TODO SERVER-69511 Pass 'tenantId' instead of 'boost::none'. Move this function to + // 'change_stream_serverless_helpers'. + return changeStreamsParam->getValue(boost::none).getExpireAfterSeconds(); } void removeExpiredDocuments(Client* client) { // TODO SERVER-66717 Remove this logic from this method. Due to the delay in the feature flag // activation it was placed here. The remover job should ultimately be initialized at the mongod // startup when launched in serverless mode. - if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) { return; } @@ -98,7 +105,7 @@ void removeExpiredDocuments(Client* client) { long long maxStartWallTime = 0; auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get()); - for (const auto& tenantId : getAllTenants()) { + for (const auto& tenantId : getConfigDbTenants(opCtx.get())) { auto expiredAfterSeconds = getExpireAfterSeconds(tenantId); invariant(expiredAfterSeconds); @@ -169,13 +176,13 @@ void removeExpiredDocuments(Client* client) { /** * Defines a periodic background job to remove expired documents from change collections. - * The job will run every 'changeCollectionRemoverJobSleepSeconds', as defined in the cluster - * parameter. + * The job will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds', as defined in + * the cluster parameter. */ class ChangeCollectionExpiredDocumentsRemover { public: ChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) { - const auto period = Seconds{gChangeCollectionRemoverJobSleepSeconds.load()}; + const auto period = Seconds{gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds.load()}; _jobAnchor = serviceContext->getPeriodicRunner()->makeJob( {"ChangeCollectionExpiredDocumentsRemover", removeExpiredDocuments, period}); _jobAnchor.start(); diff --git a/src/mongo/db/change_collection_expired_documents_remover.h b/src/mongo/db/change_collection_expired_documents_remover.h index bf9e36ae1f4..3ce5fc1ef94 100644 --- a/src/mongo/db/change_collection_expired_documents_remover.h +++ b/src/mongo/db/change_collection_expired_documents_remover.h @@ -35,7 +35,8 @@ namespace mongo { /** * Starts a periodic background job to remove expired documents from change collections. The job - * will run every 'changeCollectionRemoverJobSleepSeconds' as defined in the cluster parameter. + * will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds' as defined in the cluster + * parameter. */ void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext); diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index ca2735db03c..eee89bf9e07 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -38,7 +38,7 @@ #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog_raii.h" -#include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/multitenancy_gen.h" #include "mongo/db/namespace_string.h" @@ -46,29 +46,16 @@ #include "mongo/db/repl/apply_ops_command_info.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry_gen.h" +#include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/server_options.h" #include "mongo/logv2/log.h" namespace mongo { - -// Sharded clusters do not support serverless mode at present, but this failpoint allows us to -// nonetheless test the behaviour of change collections in a sharded environment. -MONGO_FAIL_POINT_DEFINE(forceEnableChangeCollectionsMode); - namespace { const auto getChangeCollectionManager = ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>(); /** - * Returns the list of all tenant ids in the replica set. - * TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is - * available. - */ -std::vector<boost::optional<TenantId>> getAllTenants() { - return {boost::none}; -} - -/** * Creates a Document object from the supplied oplog entry, performs necessary modifications to it * and then returns it as a BSON object. */ @@ -88,13 +75,15 @@ class ChangeCollectionsWriter { public: explicit ChangeCollectionsWriter(const AutoGetChangeCollection::AccessMode& accessMode) : _accessMode{accessMode} {} + /** * Adds the insert statement for the provided tenant that will be written to the change * collection when the 'write()' method is called. */ - void add(const TenantId& tenantId, InsertStatement insertStatement) { - if (_shouldAddEntry(insertStatement)) { - _tenantStatementsMap[tenantId].push_back(std::move(insertStatement)); + void add(InsertStatement insertStatement) { + if (auto tenantId = _extractTenantId(insertStatement); + tenantId && _shouldAddEntry(insertStatement)) { + _tenantStatementsMap[*tenantId].push_back(std::move(insertStatement)); } } @@ -104,14 +93,12 @@ public: */ Status write(OperationContext* opCtx, OpDebug* opDebug) { for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) { - AutoGetChangeCollection tenantChangeCollection( - opCtx, _accessMode, boost::none /* tenantId */); + AutoGetChangeCollection tenantChangeCollection(opCtx, _accessMode, tenantId); // The change collection does not exist for a particular tenant because either the // change collection is not enabled or is in the process of enablement. Ignore this // insert for now. - // TODO: SERVER-65950 move this check before inserting to the map - // 'tenantToInsertStatements'. + // TODO SERVER-67170 Move this check before inserting to the map. if (!tenantChangeCollection) { continue; } @@ -127,9 +114,9 @@ public: false /* fromMigrate */); if (!status.isOK()) { return Status(status.code(), - str::stream() - << "Write to change collection: " << tenantChangeCollection->ns() - << "failed, reason: " << status.reason()); + str::stream() << "Write to change collection: " + << tenantChangeCollection->ns().toStringWithTenantId() + << "failed, reason: " << status.reason()); } } @@ -137,12 +124,31 @@ public: } private: + boost::optional<TenantId> _extractTenantId(const InsertStatement& insertStatement) { + // Parse the oplog entry to fetch the tenant id from 'tid' field. The oplog entry will not + // written to the change collection if 'tid' field is missing. + auto& oplogDoc = insertStatement.doc; + if (auto tidFieldElem = oplogDoc.getField(repl::OplogEntry::kTidFieldName)) { + return TenantId{Value(tidFieldElem).getOid()}; + } + + if (MONGO_unlikely(internalChangeStreamUseTenantIdForTesting.load())) { + return change_stream_serverless_helpers::getTenantIdForTesting(); + } + + return boost::none; + } + bool _shouldAddEntry(const InsertStatement& insertStatement) { auto& oplogDoc = insertStatement.doc; - // TODO SERVER-65950 retreive tenant from the oplog. // TODO SERVER-67170 avoid inspecting the oplog BSON object. if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]) { + // Avoid writing entry with empty 'ns' field, for eg. 'periodic noop' entry. + if (nssFieldElem.String().empty()) { + return false; + } + if (nssFieldElem.String() == "config.$cmd"_sd) { if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) { // The oplog entry might be a drop command on the change collection. Check if @@ -225,40 +231,8 @@ void ChangeStreamChangeCollectionManager::create(ServiceContext* service) { getChangeCollectionManager(service).emplace(service); } -bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() { - // A change collection must not be enabled on the config server. - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - return false; - } - - // If the force fail point is enabled then declare the change collection mode as active. - if (MONGO_unlikely(forceEnableChangeCollectionsMode.shouldFail())) { - return true; - } - - // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag. - return serverGlobalParams.featureCompatibility.isVersionInitialized() && - feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( - serverGlobalParams.featureCompatibility); -} - -bool ChangeStreamChangeCollectionManager::hasChangeCollection( - OperationContext* opCtx, boost::optional<TenantId> tenantId) const { - auto catalog = CollectionCatalog::get(opCtx); - return static_cast<bool>(catalog->lookupCollectionByNamespace( - opCtx, NamespaceString::makeChangeCollectionNSS(tenantId))); -} - -bool ChangeStreamChangeCollectionManager::isChangeStreamEnabled( - OperationContext* opCtx, boost::optional<TenantId> tenantId) const { - // A change stream in the serverless is declared as enabled if both the change collection and - // the pre-images collection exist for the provided tenant. - return isChangeCollectionsModeActive() && hasChangeCollection(opCtx, tenantId) && - ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(opCtx, tenantId); -} - -void ChangeStreamChangeCollectionManager::createChangeCollection( - OperationContext* opCtx, boost::optional<TenantId> tenantId) { +void ChangeStreamChangeCollectionManager::createChangeCollection(OperationContext* opCtx, + const TenantId& tenantId) { // Make the change collection clustered by '_id'. The '_id' field will have the same value as // the 'ts' field of the oplog. CollectionOptions changeCollectionOptions; @@ -268,13 +242,13 @@ void ChangeStreamChangeCollectionManager::createChangeCollection( const auto status = createCollection(opCtx, changeCollNss, changeCollectionOptions, BSONObj()); uassert(status.code(), - str::stream() << "Failed to create change collection: " << changeCollNss - << causedBy(status.reason()), + str::stream() << "Failed to create change collection: " + << changeCollNss.toStringWithTenantId() << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceExists); } void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* opCtx, - boost::optional<TenantId> tenantId) { + const TenantId& tenantId) { DropReply dropReply; const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId); @@ -284,8 +258,8 @@ void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* &dropReply, DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); uassert(status.code(), - str::stream() << "Failed to drop change collection: " << changeCollNss - << causedBy(status.reason()), + str::stream() << "Failed to drop change collection: " + << changeCollNss.toStringWithTenantId() << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceNotFound); } @@ -310,9 +284,7 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( // tenant. auto changeCollDoc = createChangeCollectionEntryFromOplog(record.data.toBson()); - // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id. changeCollectionsWriter.add( - TenantId::kSystemTenantId, InsertStatement{std::move(changeCollDoc), ts, repl::OpTime::kUninitializedTerm}); } @@ -352,11 +324,8 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( auto changeCollDoc = createChangeCollectionEntryFromOplog(oplogDoc); - // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id. - changeCollectionsWriter.add(TenantId::kSystemTenantId, - InsertStatement{std::move(changeCollDoc), - oplogSlot.getTimestamp(), - oplogSlot.getTerm()}); + changeCollectionsWriter.add(InsertStatement{ + std::move(changeCollDoc), oplogSlot.getTimestamp(), oplogSlot.getTerm()}); } // Write documents to change collections. diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 49ff64d635b..82d9fc01590 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -113,34 +113,19 @@ public: static ChangeStreamChangeCollectionManager& get(OperationContext* opCtx); /** - * Returns true if the server is configured such that change collections can be used to record - * oplog entries; ie, we are running in a Serverless context. Returns false otherwise. - */ - static bool isChangeCollectionsModeActive(); - - /** - * Returns true if the change collection is present for the specified tenant, false otherwise. - */ - bool hasChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId) const; - - /** * Returns true if the change stream is enabled for the provided tenant, false otherwise. */ bool isChangeStreamEnabled(OperationContext* opCtx, boost::optional<TenantId> tenantId) const; /** * Creates a change collection for the specified tenant, if it doesn't exist. - * - * TODO: SERVER-65950 make tenantId field mandatory. */ - void createChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId); + void createChangeCollection(OperationContext* opCtx, const TenantId& tenantId); /** * Deletes the change collection for the specified tenant, if it already exist. - * - * TODO: SERVER-65950 make tenantId field mandatory. */ - void dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId); + void dropChangeCollection(OperationContext* opCtx, const TenantId& tenantId); /** * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog @@ -152,8 +137,6 @@ public: * * Failure in insertion to any change collection will result in a fatal exception and will bring * down the node. - * - * TODO: SERVER-65950 make tenantId field mandatory. */ void insertDocumentsToChangeCollection(OperationContext* opCtx, const std::vector<Record>& oplogRecords, diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp index 3b82e7a7c46..25a12d7636a 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp +++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp @@ -35,7 +35,9 @@ #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_options_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/concurrency/lock_manager_defs.h" #include "mongo/db/concurrency/locker.h" @@ -47,7 +49,6 @@ #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/idle_thread_block.h" -#include "mongo/util/fail_point.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -118,7 +119,8 @@ void ChangeStreamPreImagesCollectionManager::createPreImagesCollection( opCtx, preImagesCollectionNamespace, preImagesCollectionOptions, BSONObj()); uassert(status.code(), str::stream() << "Failed to create the pre-images collection: " - << preImagesCollectionNamespace.coll() << causedBy(status.reason()), + << preImagesCollectionNamespace.toStringWithTenantId() + << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceExists); } @@ -133,7 +135,8 @@ void ChangeStreamPreImagesCollectionManager::dropPreImagesCollection( DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); uassert(status.code(), str::stream() << "Failed to drop the pre-images collection: " - << preImagesCollectionNamespace.coll() << causedBy(status.reason()), + << preImagesCollectionNamespace.toStringWithTenantId() + << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceNotFound); } @@ -148,11 +151,13 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op << preImage.getId().getApplyOpsIndex(), preImage.getId().getApplyOpsIndex() >= 0); + // TODO SERVER-66642 Consider using internal test-tenant id if applicable. + const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId); + // This lock acquisition can block on a stronger lock held by another operation modifying // the pre-images collection. There are no known cases where an operation holding an // exclusive lock on the pre-images collection also waits for oplog visibility. AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); - const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId); AutoGetCollection preImagesCollectionRaii( opCtx, preImagesCollectionNamespace, LockMode::MODE_IX); auto& changeStreamPreImagesCollection = preImagesCollectionRaii.getCollection(); @@ -173,13 +178,6 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op uassertStatusOK(insertionStatus); } -bool ChangeStreamPreImagesCollectionManager::hasPreImagesCollection( - OperationContext* opCtx, boost::optional<TenantId> tenantId) { - auto catalog = CollectionCatalog::get(opCtx); - return static_cast<bool>(catalog->lookupCollectionByNamespace( - opCtx, NamespaceString::makePreImageCollectionNSS(tenantId))); -} - namespace { RecordId toRecordId(ChangeStreamPreImageId id) { return record_id_helpers::keyForElem( @@ -408,8 +406,9 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection // doesn't exist. + // TODO SERVER-66642 Account for multitenancy. AutoGetCollection autoColl( - opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX); + opCtx.get(), NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX); const auto& preImagesColl = autoColl.getCollection(); if (!preImagesColl) { return; @@ -436,11 +435,12 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim change_stream_pre_image_helpers::getPreImageExpirationTime( opCtx.get(), currentTimeForTimeBasedExpiration)); + // TODO SERVER-66642 Account for multitenancy. for (const auto& collectionRange : expiredPreImages) { writeConflictRetry( opCtx.get(), "ChangeStreamExpiredPreImagesRemover", - NamespaceString::kChangeStreamPreImagesNamespace.ns(), + NamespaceString::makePreImageCollectionNSS(boost::none).ns(), [&] { auto params = std::make_unique<DeleteStageParams>(); params->isMulti = true; diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h index dede0e38c96..75efb28c22d 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.h +++ b/src/mongo/db/change_stream_pre_images_collection_manager.h @@ -89,13 +89,6 @@ public: const ChangeStreamPreImage& preImage); /** - * Returns true if the pre-images collection exists, false otherwise. If 'tenantId' is provided - * then the pre-images collection associated with that tenant will be checked for existence, - * otherwise the default pre-images collection will be checked. - */ - static bool hasPreImagesCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId); - - /** * Scans the system pre-images collection and deletes the expired pre-images from it. */ static void performExpiredChangeStreamPreImagesRemovalPass(Client* client); diff --git a/src/mongo/db/change_stream_serverless_helpers.cpp b/src/mongo/db/change_stream_serverless_helpers.cpp new file mode 100644 index 00000000000..0577894e397 --- /dev/null +++ b/src/mongo/db/change_stream_serverless_helpers.cpp @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/db/change_stream_serverless_helpers.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/server_feature_flags_gen.h" +#include "mongo/db/server_options.h" + +namespace mongo { +namespace change_stream_serverless_helpers { + +bool isChangeCollectionsModeActive() { + // A change collection must not be enabled on the config server. + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return false; + } + + // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag. + return serverGlobalParams.featureCompatibility.isVersionInitialized() && + feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( + serverGlobalParams.featureCompatibility); +} + +bool isChangeStreamEnabled(OperationContext* opCtx, const TenantId& tenantId) { + auto catalog = CollectionCatalog::get(opCtx); + + // A change stream in the serverless is declared as enabled if both the change collection and + // the pre-images collection exist for the provided tenant. + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + return isChangeCollectionsModeActive() && + static_cast<bool>(catalog->lookupCollectionByNamespace( + opCtx, NamespaceString::makeChangeCollectionNSS(tenantId))) && + static_cast<bool>(catalog->lookupCollectionByNamespace( + opCtx, NamespaceString::makePreImageCollectionNSS(boost::none))); +} + +const TenantId& getTenantIdForTesting() { + static const TenantId kTestTenantId( + OID("00000000" /* timestamp */ + "0000000000" /* process id */ + "000000" /* counter */)); + + return kTestTenantId; +} + +boost::optional<TenantId> resolveTenantId(boost::optional<TenantId> tenantId) { + if (tenantId) { + return tenantId; + } else if (MONGO_unlikely(internalChangeStreamUseTenantIdForTesting.load())) { + return getTenantIdForTesting(); + } + + return tenantId; +} + +TenantSet getConfigDbTenants(OperationContext* opCtx) { + TenantSet tenantIds; + + auto dbNames = CollectionCatalog::get(opCtx)->getAllDbNames(); + for (auto&& dbName : dbNames) { + if (dbName.db() == NamespaceString::kConfigDb && dbName.tenantId()) { + tenantIds.insert(*dbName.tenantId()); + } + } + + return tenantIds; +} + +} // namespace change_stream_serverless_helpers +} // namespace mongo diff --git a/src/mongo/db/change_stream_serverless_helpers.h b/src/mongo/db/change_stream_serverless_helpers.h new file mode 100644 index 00000000000..bdeb04f3ff7 --- /dev/null +++ b/src/mongo/db/change_stream_serverless_helpers.h @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional/optional.hpp> + +#include "mongo/db/operation_context.h" +#include "mongo/db/tenant_id.h" + +namespace mongo { +namespace change_stream_serverless_helpers { + +using TenantSet = stdx::unordered_set<TenantId, TenantId::Hasher>; + +/** + * Returns true if the server is configured such that change collections can be used to record + * oplog entries; ie, we are running in a Serverless context. Returns false otherwise. + */ +bool isChangeCollectionsModeActive(); + +/** + * Returns true if the change stream is enabled for the provided tenant, false otherwise. + */ +bool isChangeStreamEnabled(OperationContext* opCtx, const TenantId& tenantId); + +/** + * Returns an internal tenant id that will be used for testing purposes. This tenant id will not + * conflict with any other tenant id. + */ +const TenantId& getTenantIdForTesting(); + +/** + * If the provided 'tenantId' is missing and 'internalChangeStreamUseTenantIdForTesting' is true, + * returns a special 'TenantId' for testing purposes. Otherwise, returns the provided 'tenantId'. + */ +boost::optional<TenantId> resolveTenantId(boost::optional<TenantId> tenantId); + +/** + * Returns the list of the tenants associated with a 'config' database. + */ +TenantSet getConfigDbTenants(OperationContext* opCtx); + +} // namespace change_stream_serverless_helpers +} // namespace mongo diff --git a/src/mongo/db/change_streams_cluster_parameter.idl b/src/mongo/db/change_streams_cluster_parameter.idl index 466e1c0345a..899018d04df 100644 --- a/src/mongo/db/change_streams_cluster_parameter.idl +++ b/src/mongo/db/change_streams_cluster_parameter.idl @@ -58,11 +58,11 @@ server_parameters: cpp_varname: gChangeStreamsClusterParameter validator: callback: validateChangeStreamsClusterParameter - changeCollectionRemoverJobSleepSeconds: + changeCollectionExpiredDocumentsRemoverJobSleepSeconds: description: "Specifies the number of seconds for which the periodic change collection remover job will sleep between each cycle." set_at: [ startup ] cpp_vartype: AtomicWord<int> - cpp_varname: "gChangeCollectionRemoverJobSleepSeconds" + cpp_varname: "gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds" validator: gte: 1 default: 10 diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index c26e7dfc7c0..77573fa709c 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -354,6 +354,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/multi_index_block', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/command_can_run_here', '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/concurrency/exception_util', diff --git a/src/mongo/db/commands/change_stream_state_command.cpp b/src/mongo/db/commands/change_stream_state_command.cpp index de8d98083ea..44d186c03a1 100644 --- a/src/mongo/db/commands/change_stream_state_command.cpp +++ b/src/mongo/db/commands/change_stream_state_command.cpp @@ -30,8 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/change_stream_change_collection_manager.h" -#include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/change_stream_state_gen.h" #include "mongo/db/commands.h" #include "mongo/db/set_change_stream_state_coordinator.h" @@ -40,7 +39,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand namespace mongo { - namespace { /** @@ -67,6 +65,10 @@ public: " enabled: enable or disable the change stream"; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; @@ -74,18 +76,19 @@ public: void typedRun(OperationContext* opCtx) { uassert(ErrorCodes::CommandNotSupported, str::stream() << SetChangeStreamStateCommandRequest::kCommandName - << " is only supported in the serverless", - ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()); + << " command is only supported in serverless", + change_stream_serverless_helpers::isChangeCollectionsModeActive()); - // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be - // present. Remove 'getDollarTenant()' and fetch tenant from dbName(). - const std::string tenantId = request().getDollarTenant() - ? request().getDollarTenant()->toString() - : TenantId::kSystemTenantId.toString(); + const auto tenantId = + change_stream_serverless_helpers::resolveTenantId(request().getDbName().tenantId()); + uassert(ErrorCodes::BadValue, + str::stream() << SetChangeStreamStateCommandRequest::kCommandName + << " command must be provided with a tenant id", + tenantId); // Prepare the payload for the 'SetChangeStreamStateCoordinator'. SetChangeStreamStateCoordinatorId coordinatorId; - coordinatorId.setTenantId({TenantId{OID(tenantId)}}); + coordinatorId.setTenantId(tenantId); SetChangeStreamStateCoordinatorDocument coordinatorDoc{ coordinatorId, request().getChangeStreamStateParameters().toBSON()}; @@ -134,6 +137,10 @@ public: " {getChangeStreamState: 1}"; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; @@ -141,17 +148,20 @@ public: auto typedRun(OperationContext* opCtx) { uassert(ErrorCodes::CommandNotSupported, str::stream() << GetChangeStreamStateCommandRequest::kCommandName - << " is only supported in the serverless", - ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()); + << " command is only supported in serverless", + change_stream_serverless_helpers::isChangeCollectionsModeActive()); + + const auto tenantId = + change_stream_serverless_helpers::resolveTenantId(request().getDbName().tenantId()); + uassert(ErrorCodes::BadValue, + str::stream() << GetChangeStreamStateCommandRequest::kCommandName + << " command must be provided with a tenant id", + tenantId); - // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be - // present. - boost::optional<TenantId> tenantId = boost::none; // Set the change stream enablement state in the 'reply' object. - auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); GetChangeStreamStateCommandRequest::Reply reply{ - changeCollectionManager.isChangeStreamEnabled(opCtx, tenantId)}; + change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId)}; return reply; } diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index eb3c3567077..29c85819230 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -188,6 +188,10 @@ public: bool collectsResourceConsumptionMetrics() const final { return true; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBaseGen { public: using InvocationBaseGen::InvocationBaseGen; diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 4fe8a736ad7..b7bd221ec61 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/curop.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" @@ -757,13 +758,20 @@ Status runAggregate(OperationContext* opCtx, nss = NamespaceString::kRsOplogNamespace; // In case of serverless the change stream will be opened on the change collection. - if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { - auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { + const auto tenantId = + change_stream_serverless_helpers::resolveTenantId(origNss.tenantId()); + + uassert(ErrorCodes::BadValue, + "Change streams cannot be used without tenant id", + tenantId); + uassert(ErrorCodes::ChangeStreamNotEnabled, "Change streams must be enabled before being used.", - changeCollectionManager.isChangeStreamEnabled(opCtx, origNss.tenantId())); + change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId)); + - nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId()); + nss = NamespaceString::makeChangeCollectionNSS(tenantId); } // Assert that a change stream on the config server is always opened on the oplog. @@ -785,7 +793,7 @@ Status runAggregate(OperationContext* opCtx, LOGV2_INFO(6689200, "Opening change stream on the namespace: {nss}", "Opening change stream", - "nss"_attr = nss.toString()); + "nss"_attr = nss.toStringWithTenantId()); } // Upgrade and wait for read concern if necessary. diff --git a/src/mongo/db/commands/set_cluster_parameter_command.cpp b/src/mongo/db/commands/set_cluster_parameter_command.cpp index 08ae1b2835e..8c49717baad 100644 --- a/src/mongo/db/commands/set_cluster_parameter_command.cpp +++ b/src/mongo/db/commands/set_cluster_parameter_command.cpp @@ -66,6 +66,10 @@ public: return "Set cluster parameter on replica set or node"; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index ce879e0ee2a..9240c572a54 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -87,6 +87,10 @@ public: return true; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBaseGen { public: using InvocationBaseGen::InvocationBaseGen; @@ -200,6 +204,10 @@ public: return true; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBaseGen { public: using InvocationBaseGen::InvocationBaseGen; diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 8cbe5909154..f1a5d2a4871 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -349,7 +349,7 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) { } } - // TODO SERVER-65950 create 'SetChangeStreamStateCoordinatorService' only in the serverless. + // TODO SERVER-66717 create 'SetChangeStreamStateCoordinatorService' only in the serverless. services.push_back(std::make_unique<SetChangeStreamStateCoordinatorService>(serviceContext)); for (auto& service : services) { @@ -790,6 +790,8 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { repl::ReplicationCoordinator::modeNone; if (!isStandalone) { startChangeStreamExpiredPreImagesRemover(serviceContext); + // TODO SERVER-66717 Start 'startChangeCollectionExpiredDocumentsRemover' only in the + // serverless. startChangeCollectionExpiredDocumentsRemover(serviceContext); } @@ -1561,7 +1563,7 @@ int mongod_main(int argc, char* argv[]) { ReadWriteConcernDefaults::create(service, readWriteConcernDefaultsCacheLookupMongoD); ChangeStreamOptionsManager::create(service); - // TODO SERVER-65950 create 'ChangeStreamChangeCollectionManager' only in the serverless. + // TODO SERVER-66717 Create 'ChangeStreamChangeCollectionManager' only in the serverless. ChangeStreamChangeCollectionManager::create(service); #if defined(_WIN32) diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 5cfd942a8f3..a3a28901993 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -111,8 +111,6 @@ const NamespaceString NamespaceString::kSystemReplSetNamespace(NamespaceString:: "system.replset"); const NamespaceString NamespaceString::kLastVoteNamespace(NamespaceString::kLocalDb, "replset.election"); -const NamespaceString NamespaceString::kChangeStreamPreImagesNamespace(NamespaceString::kConfigDb, - "system.preimages"); const NamespaceString NamespaceString::kIndexBuildEntryNamespace(NamespaceString::kConfigDb, "system.indexBuilds"); const NamespaceString NamespaceString::kRangeDeletionNamespace(NamespaceString::kConfigDb, @@ -338,8 +336,7 @@ NamespaceString NamespaceString::makeCollectionlessAggregateNSS(const DatabaseNa NamespaceString NamespaceString::makeChangeCollectionNSS( const boost::optional<TenantId>& tenantId) { - // TODO: SERVER-65950 create namespace for a particular tenant. - return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName}; + return NamespaceString{tenantId, kConfigDb, kChangeCollectionName}; } NamespaceString NamespaceString::makeGlobalIndexNSS(const UUID& id) { @@ -350,8 +347,7 @@ NamespaceString NamespaceString::makeGlobalIndexNSS(const UUID& id) { NamespaceString NamespaceString::makePreImageCollectionNSS( const boost::optional<TenantId>& tenantId) { - return tenantId ? NamespaceString(tenantId, kConfigDb, "system.preimages") - : kChangeStreamPreImagesNamespace; + return NamespaceString{tenantId, kConfigDb, kPreImagesCollectionName}; } std::string NamespaceString::getSisterNS(StringData local) const { @@ -469,11 +465,11 @@ bool NamespaceString::isTimeseriesBucketsCollection() const { } bool NamespaceString::isChangeStreamPreImagesCollection() const { - return ns() == kChangeStreamPreImagesNamespace.ns(); + return _dbName.db() == kConfigDb && coll() == kPreImagesCollectionName; } bool NamespaceString::isChangeCollection() const { - return db() == kConfigDb && coll() == kChangeCollectionName; + return _dbName.db() == kConfigDb && coll() == kChangeCollectionName; } bool NamespaceString::isConfigImagesCollection() const { diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 79da7a0f8ae..da20bca25d2 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -82,7 +82,10 @@ public: // Name for the system.js collection static constexpr StringData kSystemDotJavascriptCollectionName = "system.js"_sd; - // Name for the change stream change collection. + // Name of the pre-images collection. + static constexpr StringData kPreImagesCollectionName = "system.preimages"_sd; + + // Name of the change stream change collection. static constexpr StringData kChangeCollectionName = "system.change_collection"_sd; // Names of privilege document collections @@ -171,9 +174,6 @@ public: // Namespace for storing the last replica set election vote. static const NamespaceString kLastVoteNamespace; - // Namespace for change stream pre-images collection. - static const NamespaceString kChangeStreamPreImagesNamespace; - // Namespace for index build entries. static const NamespaceString kIndexBuildEntryNamespace; diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp index 6e120317ca8..bb54d4bc07a 100644 --- a/src/mongo/db/op_observer/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp @@ -225,7 +225,7 @@ protected: reset(opCtx, NamespaceString::kRsOplogNamespace); reset(opCtx, NamespaceString::kSessionTransactionsTableNamespace); reset(opCtx, NamespaceString::kConfigImagesNamespace); - reset(opCtx, NamespaceString::kChangeStreamPreImagesNamespace); + reset(opCtx, NamespaceString::makePreImageCollectionNSS(boost::none)); } // Assert that the oplog has the expected number of entries, and return them @@ -288,7 +288,7 @@ protected: bool didWriteDeletedDocToPreImagesCollection(OperationContext* opCtx, const ChangeStreamPreImageId preImageId) { AutoGetCollection preImagesCollection( - opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IS); + opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), LockMode::MODE_IS); const auto preImage = Helpers::findOneForTesting( opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON()), false); return !preImage.isEmpty(); @@ -323,7 +323,7 @@ protected: const ChangeStreamPreImageId& preImageId, BSONObj* container) { AutoGetCollection preImagesCollection( - opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IS); + opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), LockMode::MODE_IS); *container = Helpers::findOneForTesting(opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON())) diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp index 583e3198a42..78d7e9f19cf 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp @@ -120,9 +120,11 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamAddPreImage::doGetNext() boost::optional<Document> DocumentSourceChangeStreamAddPreImage::lookupPreImage( boost::intrusive_ptr<ExpressionContext> pExpCtx, const Document& preImageId) { // Look up the pre-image document on the local node by id. + // TODO SERVER-66642 Consider using internal test-tenant id if applicable. + const auto tenantId = pExpCtx->ns.tenantId(); auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocumentLocally( pExpCtx, - NamespaceString::kChangeStreamPreImagesNamespace, + NamespaceString::makePreImageCollectionNSS(tenantId), Document{{ChangeStreamPreImage::kIdFieldName, preImageId}}); // Return boost::none to signify that we failed to find the pre-image. diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 9444a694c81..68d03d0c96a 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1544,7 +1544,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( return (ns.isLocal() || ns.isConfigDotCacheDotChunks() || ns.isReshardingLocalOplogBufferCollection() || ns == NamespaceString::kConfigImagesNamespace || - ns == NamespaceString::kChangeStreamPreImagesNamespace); + ns.isChangeStreamPreImagesCollection()); }; if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed || diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl index efbe81f8501..e36a60c4df3 100644 --- a/src/mongo/db/query/query_knobs.idl +++ b/src/mongo/db/query/query_knobs.idl @@ -906,6 +906,15 @@ server_parameters: default: expr: false + # TODO SERVER-68341 Remove this query knob after tenancy is supported in the sharded cluster. + internalChangeStreamUseTenantIdForTesting: + description: "If true, then change streams will operate upon an internal tenant id for testing + purposes if the actual tenant is not provided." + set_at: [ startup ] + cpp_varname: "internalChangeStreamUseTenantIdForTesting" + cpp_vartype: AtomicWord<bool> + default: false + # Note for adding additional query knobs: # # When adding a new query knob, you should consider whether or not you need to add an 'on_update' diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 7185e7e77d6..d91977bac6c 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -75,6 +75,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/multi_index_block', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers', '$BUILD_DIR/mongo/db/commands/txn_cmd_request', '$BUILD_DIR/mongo/db/concurrency/exception_util', @@ -634,6 +635,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/collection_crud', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/commands/mongod_fsync', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/concurrency/lock_manager', @@ -1502,6 +1504,7 @@ env.Library( '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/auth/auth', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/cloner', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/curop', diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index b17e5f108ec..5c4ec5d5dab 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -57,6 +57,7 @@ #include "mongo/db/catalog/rename_collection.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/client.h" #include "mongo/db/coll_mod_gen.h" #include "mongo/db/commands.h" @@ -390,7 +391,7 @@ void _logOpsInner(OperationContext* opCtx, } // Insert the oplog records to the respective tenants change collections. - if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection( opCtx, *records, timestamps); } diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 032e9109f14..51eebc457bc 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -36,6 +36,7 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/db_raii.h" @@ -150,7 +151,7 @@ Status _insertDocumentsToOplogAndChangeCollections( // Write the corresponding oplog entries to tenants respective change // collections in the serverless. - if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { auto status = ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection( opCtx, @@ -416,8 +417,7 @@ void scheduleWritesToOplogAndChangeCollection(OperationContext* opCtx, bool skipWritesToOplog) { // Skip performing any writes during the startup recovery when running in the non-serverless // environment. - if (skipWritesToOplog && - !ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (skipWritesToOplog && !change_stream_serverless_helpers::isChangeCollectionsModeActive()) { return; } diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index d644d5ad734..842cca1bc65 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -481,10 +481,10 @@ TEST_F(OplogApplierImplTest, applyOplogEntryToRecordChangeStreamPreImages) { WriteUnitOfWork wuow{_opCtx.get()}; ChangeStreamPreImageId preImageId{*(options.uuid), op.getOpTime().getTimestamp(), 0}; BSONObj preImageDocumentKey = BSON("_id" << preImageId.toBSON()); - auto preImageLoadResult = - getStorageInterface()->deleteById(_opCtx.get(), - NamespaceString::kChangeStreamPreImagesNamespace, - preImageDocumentKey.firstElement()); + auto preImageLoadResult = getStorageInterface()->deleteById( + _opCtx.get(), + NamespaceString::makePreImageCollectionNSS(boost::none), + preImageDocumentKey.firstElement()); repl::getNextOpTime(_opCtx.get()); wuow.commit(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 8da1bcc1f5c..71bcbe60296 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -45,6 +45,7 @@ #include "mongo/db/catalog/local_oplog_info.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/client.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/commands/rwc_defaults_commands_gen.h" @@ -541,7 +542,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC }); // Create the pre-images collection if it doesn't exist yet in the non-serverless environment. - if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) { ChangeStreamPreImagesCollectionManager::createPreImagesCollection( opCtx, boost::none /* tenantId */); } diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp index a0ee72fac51..9191769214c 100644 --- a/src/mongo/db/set_change_stream_state_coordinator.cpp +++ b/src/mongo/db/set_change_stream_state_coordinator.cpp @@ -34,6 +34,8 @@ #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" #include "mongo/db/change_stream_state_gen.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/op_observer/op_observer.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/logv2/log.h" @@ -77,15 +79,15 @@ public: const auto setChangeStreamParameter = ChangeStreamStateParameters::parse( IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand()); - invariant(_stateDoc.getId().getTenantId()); - - // TODO SERVER-65950 replace 'tenantId' with the provided tenant id. - auto tenantId = boost::none; + // A tenant's change collection and the pre-images collection are always associated with a + // tenant id. + const auto tenantId = _stateDoc.getId().getTenantId(); + tassert(6664100, "Tenant id is missing", tenantId); if (setChangeStreamParameter.getEnabled()) { - _enableChangeStream(opCtx, tenantId); + _enableChangeStream(opCtx, *tenantId); } else { - _disableChangeStream(opCtx, tenantId); + _disableChangeStream(opCtx, *tenantId); } } @@ -94,11 +96,38 @@ private: * Enables the change stream in the serverless by creating the change collection and the * pre-image collection. */ - void _enableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) { + void _enableChangeStream(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.createChangeCollection(opCtx, tenantId); - ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId); + // TODO SERVER-66643 Remove this code. A change collection must have atleast one entry for + // the change stream to advance. As such artifically create any oplog entry such that it + // will be captured by the change collection. With SERVER-66643, the pre-images collection + // 'create' oplog entry will be auto captured by the change collection and hence writing + // this entry will not be required. Also remove the necessary header and linked library + // after removing this code. + [&]() { + writeConflictRetry( + opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&] { + Lock::GlobalLock lock(opCtx, MODE_IX); + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( + opCtx, + NamespaceString::makeChangeCollectionNSS(tenantId), + boost::none, + BSON("msg" + << "enable change stream"), + boost::none, + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + }); + }(); + + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, boost::none); // Wait until the create requests are majority committed. waitForMajority(opCtx); @@ -108,11 +137,12 @@ private: * Disables the change stream in the serverless by dropping the change collection and the * pre-image collection. */ - void _disableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) { + void _disableChangeStream(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.dropChangeCollection(opCtx, tenantId); - ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId); + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, boost::none); // Wait until the drop requests are majority committed. waitForMajority(opCtx); diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript index 01242257d02..a4378a89caf 100644 --- a/src/mongo/db/stats/SConscript +++ b/src/mongo/db/stats/SConscript @@ -90,6 +90,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/commands/server_status_core', '$BUILD_DIR/mongo/db/server_base', ], diff --git a/src/mongo/db/stats/change_collection_server_status.cpp b/src/mongo/db/stats/change_collection_server_status.cpp index ee424a4ae43..f7d7f75a75c 100644 --- a/src/mongo/db/stats/change_collection_server_status.cpp +++ b/src/mongo/db/stats/change_collection_server_status.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/commands/server_status.h" namespace mongo { @@ -49,7 +50,7 @@ public: const BSONElement& configElement, BSONObjBuilder* result) const override { // Append the section only when running in serverless. - if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) { return; } |