diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2022-06-10 19:03:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-10 19:48:46 +0000 |
commit | 09bb1491952e6e756746640fbfdfaddead59c61b (patch) | |
tree | cec59ed6242e97b56e5938cb9492262cec3e2d4d | |
parent | ea3e7127282a3d2974c39d7f0045ffa29d1dc426 (diff) | |
download | mongo-09bb1491952e6e756746640fbfdfaddead59c61b.tar.gz |
SERVER-66633 Record change collection entries in the secondaries.
-rw-r--r-- | jstests/replsets/write_change_stream_change_collection.js | 28 | ||||
-rw-r--r-- | src/mongo/db/catalog_raii.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/catalog_raii.h | 27 | ||||
-rw-r--r-- | src/mongo/db/change_stream_change_collection_manager.cpp | 150 | ||||
-rw-r--r-- | src/mongo/db/change_stream_change_collection_manager.h | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 18 |
7 files changed, 227 insertions, 50 deletions
diff --git a/jstests/replsets/write_change_stream_change_collection.js b/jstests/replsets/write_change_stream_change_collection.js index 32fce968ecc..d4e1a75d04b 100644 --- a/jstests/replsets/write_change_stream_change_collection.js +++ b/jstests/replsets/write_change_stream_change_collection.js @@ -8,18 +8,20 @@ (function() { "use strict"; -const replSetTest = new ReplSetTest({nodes: 1}); +const replSetTest = new ReplSetTest({nodes: 2}); replSetTest.startSet({setParameter: "multitenancySupport=true"}); replSetTest.initiate(); const primary = replSetTest.getPrimary(); -const oplogColl = primary.getDB("local").oplog.rs; -const changeColl = primary.getDB("config").system.change_collection; +const secondary = replSetTest.getSecondary(); const testDb = primary.getDB("test"); // Verifies that the oplog and change collection entries are the same for the specified start and // end duration of the oplog timestamp. -function verifyChangeCollectionEntries(startOplogTimestamp, endOplogTimestamp) { +function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) { + const oplogColl = connection.getDB("local").oplog.rs; + const changeColl = connection.getDB("config").system.change_collection; + // Fetch all oplog and change collection entries for the duration: [startOplogTimestamp, // endOplogTimestamp]. const oplogEntries = @@ -69,6 +71,7 @@ function performWrites(coll) { // Test the change collection entries with the oplog by performing some basic writes. (function testBasicWritesInChangeCollection() { + const oplogColl = primary.getDB("local").oplog.rs; const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts; assert(startOplogTimestamp != undefined); @@ -79,11 +82,18 @@ function performWrites(coll) { assert(endOplogTimestamp !== undefined); assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0); - verifyChangeCollectionEntries(startOplogTimestamp, endOplogTimestamp); + // Wait for the replication to finish. + replSetTest.awaitReplication(); + + // Verify that the change collection entries are the same as the oplog in the primary and the + // secondary node. + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp); + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp); })(); // Test the change collection entries with the oplog by performing writes in a transaction. (function testWritesinChangeCollectionWithTrasactions() { + const oplogColl = primary.getDB("local").oplog.rs; const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts; assert(startOplogTimestamp != undefined); @@ -97,7 +107,13 @@ function performWrites(coll) { assert(endOplogTimestamp != undefined); assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0); - verifyChangeCollectionEntries(startOplogTimestamp, endOplogTimestamp); + // Wait for the replication to finish. + replSetTest.awaitReplication(); + + // Verify that the change collection entries are the same as the oplog in the primary and the + // secondary node for the applyOps. + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp); + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp); })(); replSetTest.stopSet(); diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp index 00106047492..2063cf2590b 100644 --- a/src/mongo/db/catalog_raii.cpp +++ b/src/mongo/db/catalog_raii.cpp @@ -596,4 +596,35 @@ AutoGetOplog::AutoGetOplog(OperationContext* opCtx, OplogAccessMode mode, Date_t _oplog = &_oplogInfo->getCollection(); } + +AutoGetChangeCollection::AutoGetChangeCollection(OperationContext* opCtx, + AutoGetChangeCollection::AccessMode mode, + boost::optional<TenantId> tenantId, + Date_t deadline) { + auto nss = NamespaceString::makeChangeCollectionNSS(tenantId); + if (mode == AccessMode::kWrite) { + // The global lock must already be held. + invariant(opCtx->lockState()->isWriteLocked()); + + // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove + // 'AllowLockAcquisitionOnTimestampedUnitOfWork'. + AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); + _coll.emplace( + opCtx, nss, LockMode::MODE_IX, AutoGetCollectionViewMode::kViewsForbidden, deadline); + } +} + +const Collection* AutoGetChangeCollection::operator->() const { + return _coll ? _coll->getCollection().get() : nullptr; +} + +const CollectionPtr& AutoGetChangeCollection::operator*() const { + return _coll->getCollection(); +} + +AutoGetChangeCollection::operator bool() const { + return _coll && _coll->getCollection().get(); +} + + } // namespace mongo diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h index ae1111041db..84121f53965 100644 --- a/src/mongo/db/catalog_raii.h +++ b/src/mongo/db/catalog_raii.h @@ -476,4 +476,31 @@ private: const CollectionPtr* _oplog; }; +/** + * A RAII-style class to acquire lock to a particular tenant's change collection. + * + * A change collection can be accessed in the following modes: + * kWrite - This mode assumes that the global IX lock is already held before writing to the change + * collection. + */ +class AutoGetChangeCollection { +public: + enum class AccessMode { kWrite }; + + AutoGetChangeCollection(OperationContext* opCtx, + AccessMode mode, + boost::optional<TenantId> tenantId, + Date_t deadline = Date_t::max()); + + AutoGetChangeCollection(const AutoGetChangeCollection&) = delete; + AutoGetChangeCollection& operator=(const AutoGetChangeCollection&) = delete; + + const Collection* operator->() const; + const CollectionPtr& operator*() const; + explicit operator bool() const; + +private: + boost::optional<AutoGetCollection> _coll; +}; + } // namespace mongo diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 840e045ebd9..e2872e3d815 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -48,6 +48,75 @@ namespace { const auto getChangeCollectionManager = ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>(); +/** + * Creates a Document object from the supplied oplog entry, performs necessary modifications to it + * and then returns it as a BSON object. + */ +BSONObj createChangeCollectionEntryFromOplog(const BSONObj& oplogEntry) { + Document oplogDoc(oplogEntry); + MutableDocument changeCollDoc(oplogDoc); + changeCollDoc["_id"] = Value(oplogDoc["ts"]); + + auto readyChangeCollDoc = changeCollDoc.freeze(); + return readyChangeCollDoc.toBson(); +} + +/** + * Helper to write insert statements to respective change collections based on tenant ids. + */ +class ChangeCollectionsWriter { +public: + /** + * Adds the insert statement for the provided tenant that will be written to the change + * collection when the 'write()' method is called. + */ + void add(const TenantId& tenantId, InsertStatement insertStatement) { + _tenantStatementsMap[tenantId].push_back(std::move(insertStatement)); + } + + /** + * Writes the batch of insert statements for each change collection. Bails out further writes if + * a failure is encountered in writing to a any change collection. + */ + Status write(OperationContext* opCtx, OpDebug* opDebug) { + for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) { + AutoGetChangeCollection tenantChangeCollection( + opCtx, AutoGetChangeCollection::AccessMode::kWrite, boost::none /* tenantId */); + + // The change collection does not exist for a particular tenant because either the + // change collection is not enabled or is in the process of enablement. Ignore this + // insert for now. + // TODO: SERVER-65950 move this check before inserting to the map + // 'tenantToInsertStatements'. + if (!tenantChangeCollection) { + continue; + } + + // Writes to the change collection should not be replicated. + repl::UnreplicatedWritesBlock unReplBlock(opCtx); + + Status status = tenantChangeCollection->insertDocuments(opCtx, + insertStatements.begin(), + insertStatements.end(), + opDebug, + false /* fromMigrate */); + if (!status.isOK()) { + return Status(status.code(), + str::stream() + << "Write to change collection: " << tenantChangeCollection->ns() + << "failed, reason: " << status.reason()); + } + } + + return Status::OK(); + } + +private: + // Maps inserts statements for each tenant. + stdx::unordered_map<TenantId, std::vector<InsertStatement>, TenantId::Hasher> + _tenantStatementsMap; +}; + } // namespace ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get( @@ -115,58 +184,57 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( // commiting the unit of work. invariant(opCtx->lockState()->inAWriteUnitOfWork()); - // Maps statements that should be inserted to the change collection for each tenant. - stdx::unordered_map<TenantId, std::vector<InsertStatement>, TenantId::Hasher> - tenantToInsertStatements; + ChangeCollectionsWriter changeCollectionsWriter; for (size_t idx = 0; idx < oplogRecords.size(); idx++) { auto& record = oplogRecords[idx]; auto& ts = oplogTimestamps[idx]; - // Create a mutable document and update the '_id' field with the oplog entry timestamp. The - // '_id' field will be use to order the change collection documents. - Document oplogDoc(record.data.toBson()); - MutableDocument changeCollDoc(oplogDoc); - changeCollDoc["_id"] = Value(ts); - // Create an insert statement that should be written at the timestamp 'ts' for a particular // tenant. - auto readyChangeCollDoc = changeCollDoc.freeze(); - tenantToInsertStatements[TenantId::kSystemTenantId].push_back( - InsertStatement{readyChangeCollDoc.toBson(), ts, repl::OpTime::kUninitializedTerm}); + auto changeCollDoc = createChangeCollectionEntryFromOplog(record.data.toBson()); + + // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id. + changeCollectionsWriter.add( + TenantId::kSystemTenantId, + InsertStatement{std::move(changeCollDoc), ts, repl::OpTime::kUninitializedTerm}); } - for (auto&& [tenantId, insertStatements] : tenantToInsertStatements) { - // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove - // 'AllowLockAcquisitionOnTimestampedUnitOfWork'. - AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); - AutoGetCollection tenantChangeCollection( - opCtx, NamespaceString::makeChangeCollectionNSS(tenantId), LockMode::MODE_IX); - - // The change collection does not exist for a particular tenant because either the change - // collection is not enabled or is in the process of enablement. Ignore this insert for now. - // TODO: SERVER-65950 move this check before inserting to the map - // 'tenantToInsertStatements'. - if (!tenantChangeCollection) { - continue; - } + // Write documents to change collections and throw exception in case of any failure. + Status status = changeCollectionsWriter.write(opCtx, nullptr /* opDebug */); + if (!status.isOK()) { + LOGV2_FATAL( + 6612300, "Failed to write to change collection", "reason"_attr = status.reason()); + } +} - // Writes to the change collection should not be replicated. - repl::UnreplicatedWritesBlock unReplBlock(opCtx); - - Status status = tenantChangeCollection->insertDocuments(opCtx, - insertStatements.begin(), - insertStatements.end(), - nullptr /* opDebug */, - false /* fromMigrate */); - if (!status.isOK()) { - LOGV2_FATAL(6612300, - "Write to change collection: {ns} failed: {error}", - "Write to change collection failed", - "ns"_attr = tenantChangeCollection->ns().toString(), - "error"_attr = status.toString()); - } +Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( + OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator beginOplogEntries, + std::vector<InsertStatement>::const_iterator endOplogEntries, + OpDebug* opDebug) { + ChangeCollectionsWriter changeCollectionsWriter; + + // Transform oplog entries to change collections entries and group them by tenant id. + for (auto oplogEntryIter = beginOplogEntries; oplogEntryIter != endOplogEntries; + oplogEntryIter++) { + auto& oplogDoc = oplogEntryIter->doc; + + // The initial seed oplog insertion is not timestamped as such the 'oplogSlot' is not + // initialized. The corresponding change collection insertion will not be timestamped. + auto oplogSlot = oplogEntryIter->oplogSlot; + + auto changeCollDoc = createChangeCollectionEntryFromOplog(oplogDoc); + + // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id. + changeCollectionsWriter.add(TenantId::kSystemTenantId, + InsertStatement{std::move(changeCollDoc), + oplogSlot.getTimestamp(), + oplogSlot.getTerm()}); } + + // Write documents to change collections. + return changeCollectionsWriter.write(opCtx, opDebug); } } // namespace mongo diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 2006b8c9426..f9fe6d6f414 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -31,6 +31,7 @@ #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" namespace mongo { @@ -86,9 +87,9 @@ public: Status dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId); /** - * Inserts documents to change collections. The parameter 'oplogRecords' - * is a vector of oplog records and the parameter 'oplogTimestamps' is a vector for respective - * timestamp for each oplog record. + * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog + * records and the parameter 'oplogTimestamps' is a vector for respective timestamp for each + * oplog record. * * The method fetches the tenant-id from the oplog entry, performs necessary modification to the * document and then write to the tenant's change collection at the specified oplog timestamp. @@ -101,6 +102,20 @@ public: void insertDocumentsToChangeCollection(OperationContext* opCtx, const std::vector<Record>& oplogRecords, const std::vector<Timestamp>& oplogTimestamps); + + + /** + * Performs a range inserts on respective change collections using the oplog entries as + * specified by 'beginOplogEntries' and 'endOplogEntries'. + * + * Bails out if a failure is encountered in inserting documents to a particular change + * collection. + */ + Status insertDocumentsToChangeCollection( + OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator beginOplogEntries, + std::vector<InsertStatement>::const_iterator endOplogEntries, + OpDebug* opDebug); }; } // namespace mongo diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e9bcecfbdbf..4fa9735e923 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -261,6 +261,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/catalog/multi_index_block', + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/dbhelpers', @@ -619,6 +620,7 @@ env.Library( 'storage_interface', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/commands/mongod_fsync', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/storage/storage_control', diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 1a90e3a57c8..12240e945ba 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -50,6 +50,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/exception_util.h" @@ -347,6 +348,8 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, boost::optional<AutoGetOplog> autoOplog; const CollectionPtr* collection; + bool shouldWriteToChangeCollections = false; + auto nss = nsOrUUID.nss(); if (nss && nss->isOplog()) { // Simplify locking rules for oplog collection. @@ -355,6 +358,9 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, if (!*collection) { return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"}; } + + shouldWriteToChangeCollections = + ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive(); } else { autoColl.emplace(opCtx, nsOrUUID, MODE_IX); auto collectionResult = getCollection( @@ -371,6 +377,18 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, if (!status.isOK()) { return status; } + + // Insert oplog entries to change collections if we are running in the serverless and the 'nss' + // is 'local.oplog.rs'. + if (shouldWriteToChangeCollections) { + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + status = changeCollectionManager.insertDocumentsToChangeCollection( + opCtx, begin, end, nullOpDebug); + if (!status.isOK()) { + return status; + } + } + wunit.commit(); return Status::OK(); |