diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/catalog/collection_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_validation.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/catalog/validate_state.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/change_stream_change_collection_manager.cpp | 93 | ||||
-rw-r--r-- | src/mongo/db/change_stream_change_collection_manager.h | 17 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 8 |
10 files changed, 40 insertions, 113 deletions
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index ecb86d77238..a920e11d85c 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -1046,11 +1046,8 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, } } - // TODO SERVER-66813 fix issue with batch deletion. - if (!ns().isChangeCollection()) { - opCtx->getServiceContext()->getOpObserver()->onInserts( - opCtx, ns(), uuid(), begin, end, fromMigrate); - } + opCtx->getServiceContext()->getOpObserver()->onInserts( + opCtx, ns(), uuid(), begin, end, fromMigrate); _cappedDeleteAsNeeded(opCtx, records.begin()->id); diff --git a/src/mongo/db/catalog/collection_validation.cpp b/src/mongo/db/catalog/collection_validation.cpp index 7a3ad0c5dd2..610d7c39ca7 100644 --- a/src/mongo/db/catalog/collection_validation.cpp +++ b/src/mongo/db/catalog/collection_validation.cpp @@ -566,6 +566,7 @@ Status validate(OperationContext* opCtx, BSONObjBuilder* output, bool turnOnExtraLoggingForTest) { invariant(!opCtx->lockState()->isLocked() || storageGlobalParams.repair); + // This is deliberately outside of the try-catch block, so that any errors thrown in the // constructor fail the cmd, as opposed to returning OK with valid:false. ValidateState validateState(opCtx, nss, mode, repairMode, turnOnExtraLoggingForTest); diff --git a/src/mongo/db/catalog/validate_state.cpp b/src/mongo/db/catalog/validate_state.cpp index 09b597752cf..88d66867751 100644 --- a/src/mongo/db/catalog/validate_state.cpp +++ b/src/mongo/db/catalog/validate_state.cpp @@ -107,13 +107,11 @@ ValidateState::ValidateState(OperationContext* opCtx, bool ValidateState::shouldEnforceFastCount() const { if (_mode == ValidateMode::kForegroundFullEnforceFastCount) { - if (_nss.isOplog() || _nss.isChangeCollection()) { + if (_nss.isOplog()) { // Oplog writers only take a global IX lock, so the oplog can still be written to even // during full validation despite its collection X lock. This can cause validate to // incorrectly report an incorrect fast count on the oplog when run in enforceFastCount // mode. - // The oplog entries are also written to the change collections and are prone to fast - // count failures. return false; } else if (_nss == NamespaceString::kIndexBuildEntryNamespace) { // Do not enforce fast count on the 'config.system.indexBuilds' collection. This is an diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 0954a7af635..e27c5af2af2 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -37,21 +37,13 @@ #include "mongo/db/catalog/coll_mod.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" -#include "mongo/db/catalog_raii.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/oplog.h" #include "mongo/logv2/log.h" namespace mongo { namespace { const auto getChangeCollectionManager = ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>(); - -// TODO: SERVER-65950 create or update the change collection for a particular tenant. -NamespaceString getTenantChangeCollectionNamespace(boost::optional<TenantId> tenantId) { - return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName}; -} - } // namespace ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get( @@ -70,16 +62,19 @@ void ChangeStreamChangeCollectionManager::create(ServiceContext* service) { Status ChangeStreamChangeCollectionManager::createChangeCollection( OperationContext* opCtx, boost::optional<TenantId> tenantId) { + // TODO: SERVER-65950 create or update the change collection for a particular tenant. + const NamespaceString nss{NamespaceString::kConfigDb, + NamespaceString::kChangeStreamChangeCollection}; + // Make the change collection clustered by '_id'. The '_id' field will have the same value as // the 'ts' field of the oplog. CollectionOptions changeCollectionOptions; changeCollectionOptions.clusteredIndex.emplace(clustered_util::makeDefaultClusteredIdIndex()); changeCollectionOptions.capped = true; - auto status = createCollection( - opCtx, getTenantChangeCollectionNamespace(tenantId), changeCollectionOptions, BSONObj()); + auto status = createCollection(opCtx, nss, changeCollectionOptions, BSONObj()); if (status.code() == ErrorCodes::NamespaceExists) { - return Status::OK(); + return Status(ErrorCodes::Error::OK, ""); } return status; @@ -87,75 +82,21 @@ Status ChangeStreamChangeCollectionManager::createChangeCollection( Status ChangeStreamChangeCollectionManager::dropChangeCollection( OperationContext* opCtx, boost::optional<TenantId> tenantId) { + // TODO: SERVER-65950 remove the change collection for a particular tenant. + const NamespaceString nss{NamespaceString::kConfigDb, + NamespaceString::kChangeStreamChangeCollection}; DropReply dropReply; - return dropCollection(opCtx, - getTenantChangeCollectionNamespace(tenantId), - &dropReply, - DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); + return dropCollection( + opCtx, nss, &dropReply, DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); } -void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( +Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( OperationContext* opCtx, - const std::vector<Record>& oplogRecords, - const std::vector<Timestamp>& oplogTimestamps) { - invariant(oplogRecords.size() == oplogTimestamps.size()); - - // This method must be called within a 'WriteUnitOfWork'. The caller must be responsible for - // commiting the unit of work. - invariant(opCtx->lockState()->inAWriteUnitOfWork()); - - // Maps statements that should be inserted to the change collection for each tenant. - stdx::unordered_map<TenantId, std::vector<InsertStatement>, TenantId::Hasher> - tenantToInsertStatements; - - for (size_t idx = 0; idx < oplogRecords.size(); idx++) { - auto& record = oplogRecords[idx]; - auto& ts = oplogTimestamps[idx]; - - // Create a mutable document and update the '_id' field with the oplog entry timestamp. The - // '_id' field will be use to order the change collection documents. - Document oplogDoc(record.data.toBson()); - MutableDocument changeCollDoc(oplogDoc); - changeCollDoc["_id"] = Value(ts); - - // Create an insert statement that should be written at the timestamp 'ts' for a particular - // tenant. - auto readyChangeCollDoc = changeCollDoc.freeze(); - tenantToInsertStatements[TenantId::kSystemTenantId].push_back( - InsertStatement{readyChangeCollDoc.toBson(), ts, repl::OpTime::kUninitializedTerm}); - } - - for (auto&& [tenantId, insertStatements] : tenantToInsertStatements) { - // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove - // 'AllowLockAcquisitionOnTimestampedUnitOfWork'. - AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); - AutoGetCollection tenantChangeCollection( - opCtx, getTenantChangeCollectionNamespace(tenantId), LockMode::MODE_IX); - - // The change collection does not exist for a particular tenant because either the change - // collection is not enabled or is in the process of enablement. Ignore this insert for now. - // TODO: SERVER-65950 move this check before inserting to the map - // 'tenantToInsertStatements'. - if (!tenantChangeCollection) { - continue; - } - - // Writes to the change collection should not be replicated. - repl::UnreplicatedWritesBlock unReplBlock(opCtx); - - Status status = tenantChangeCollection->insertDocuments(opCtx, - insertStatements.begin(), - insertStatements.end(), - nullptr /* opDebug */, - false /* fromMigrate */); - if (!status.isOK()) { - LOGV2_FATAL(6612300, - "Write to change collection: {ns} failed: {error}", - "Write to change collection failed", - "ns"_attr = tenantChangeCollection->ns().toString(), - "error"_attr = status.toString()); - } - } + boost::optional<TenantId> tenantId, + std::vector<Record>* records, + const std::vector<Timestamp>& timestamps) { + // TODO SERVER-65210 add code to insert to the change collection in the primaries. + return Status(ErrorCodes::OK, ""); } } // namespace mongo diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 6b7de7da109..b4ad0e25c50 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -75,21 +75,16 @@ public: Status dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId); /** - * Inserts documents to change collections. The parameter 'oplogRecords' - * is a vector of oplog records and the parameter 'oplogTimestamps' is a vector for respective + * Inserts documents to the change collection for the specified tenant. The parameter 'records' + * is a vector of oplog records and the parameter 'timestamps' is a vector for respective * timestamp for each oplog record. * - * The method fetches the tenant-id from the oplog entry, performs necessary modification to the - * document and then write to the tenant's change collection at the specified oplog timestamp. - * - * Failure in insertion to any change collection will result in a fatal exception and will bring - * down the node. - * * TODO: SERVER-65950 make tenantId field mandatory. */ - void insertDocumentsToChangeCollection(OperationContext* opCtx, - const std::vector<Record>& oplogRecords, - const std::vector<Timestamp>& oplogTimestamps); + Status insertDocumentsToChangeCollection(OperationContext* opCtx, + boost::optional<TenantId> tenantId, + std::vector<Record>* records, + const std::vector<Timestamp>& timestamps); }; } // namespace mongo diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index d16370da310..525dd9c6724 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -243,7 +243,7 @@ bool NamespaceString::isLegalClientSystemNS( return true; } - if (isChangeCollection()) { + if (isChangeStreamChangeCollection()) { return true; } @@ -404,8 +404,8 @@ bool NamespaceString::isChangeStreamPreImagesCollection() const { return ns() == kChangeStreamPreImagesNamespace.ns(); } -bool NamespaceString::isChangeCollection() const { - return db() == kConfigDb && coll() == kChangeCollectionName; +bool NamespaceString::isChangeStreamChangeCollection() const { + return db() == kConfigDb && coll() == kChangeStreamChangeCollection; } bool NamespaceString::isConfigImagesCollection() const { @@ -432,7 +432,8 @@ NamespaceString NamespaceString::getTimeseriesViewNamespace() const { } bool NamespaceString::isImplicitlyReplicated() const { - if (isChangeStreamPreImagesCollection() || isConfigImagesCollection() || isChangeCollection()) { + if (isChangeStreamPreImagesCollection() || isConfigImagesCollection() || isChangeCollection() || + isChangeStreamChangeCollection()) { // Implicitly replicated namespaces are replicated, although they only replicate a subset of // writes. invariant(isReplicated()); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index dd6842974f0..687fb431bb1 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -77,7 +77,7 @@ public: static constexpr StringData kSystemDotViewsCollectionName = "system.views"_sd; // Name for the change stream change collection. - static constexpr StringData kChangeCollectionName = "system.change_collection"_sd; + static constexpr StringData kChangeStreamChangeCollection = "system.change_collection"_sd; // Names of privilege document collections static constexpr StringData kSystemUsers = "system.users"_sd; @@ -384,6 +384,9 @@ public: bool isSystemDotProfile() const { return coll() == "system.profile"; } + bool isChangeCollection() const { + return (db() == kConfigDb) && coll().startsWith("changes."); + } bool isSystemDotViews() const { return coll() == kSystemDotViewsCollectionName; } @@ -458,7 +461,7 @@ public: /** * Returns whether the specified namespace is config.system.changeCollection. */ - bool isChangeCollection() const; + bool isChangeStreamChangeCollection() const; /** * Returns whether the specified namespace is config.image_collection. diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e9bcecfbdbf..3379a0d34d1 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -73,7 +73,6 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/index_build_oplog_entry', '$BUILD_DIR/mongo/db/catalog/local_oplog_info', '$BUILD_DIR/mongo/db/catalog/multi_index_block', - '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/db_raii', diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index bde00eef906..8800297cb4d 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -108,9 +108,9 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss, } BaseCloner::ClonerStages CollectionCloner::getStages() { - if (_sourceNss.isChangeStreamPreImagesCollection() || _sourceNss.isChangeCollection()) { - // Only the change stream pre-images collection and change collection needs to be created - - // its documents should not be copied. + if (_sourceNss.isChangeStreamPreImagesCollection()) { + // Only the change stream pre-images collection needs to be created - its documents should + // not be copied. return {&_listIndexesStage, &_createCollectionStage, &_setupIndexBuildersForUnfinishedIndexesStage}; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 48a9ab150da..7f521591c0c 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -58,7 +58,6 @@ #include "mongo/db/catalog/local_oplog_info.h" #include "mongo/db/catalog/multi_index_block.h" #include "mongo/db/catalog/rename_collection.h" -#include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/client.h" #include "mongo/db/coll_mod_gen.h" #include "mongo/db/commands.h" @@ -388,13 +387,6 @@ void _logOpsInner(OperationContext* opCtx, "error"_attr = result.toString()); } - // Insert the oplog records to the respective tenants change collections. - if (::mongo::feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( - serverGlobalParams.featureCompatibility)) { - auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); - changeCollectionManager.insertDocumentsToChangeCollection(opCtx, *records, timestamps); - } - // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. opCtx->recoveryUnit()->onCommit( [opCtx, replCoord, finalOpTime, wallTime](boost::optional<Timestamp> commitTime) { |