From 04b55aa4c74a1ebf5e1ef50294f06e0b91acc45a Mon Sep 17 00:00:00 2001 From: Rishab Joshi Date: Fri, 8 Jul 2022 11:10:33 +0000 Subject: SERVER-66634 Make changes to the startup recovery and initial-sync for the change collections. --- .../serverless/basic_write_to_change_collection.js | 48 +-------- .../basic_read_from_change_collection.js | 1 - .../serverless/initial_sync_change_collection.js | 96 ++++++++++++++++++ jstests/serverless/libs/change_collection_util.js | 47 +++++++++ ...ite_to_change_collection_in_startup_recovery.js | 108 +++++++++++++++++++++ src/mongo/db/catalog_raii.cpp | 18 ++-- src/mongo/db/catalog_raii.h | 9 +- .../db/change_stream_change_collection_manager.cpp | 22 ++++- .../db/change_stream_change_collection_manager.h | 1 + src/mongo/db/repl/collection_cloner.cpp | 4 +- src/mongo/db/repl/oplog_applier_impl.cpp | 103 ++++++++++++++++---- src/mongo/db/repl/storage_interface_impl.cpp | 50 +--------- src/mongo/db/storage/storage_util.h | 63 ++++++++++-- 13 files changed, 438 insertions(+), 132 deletions(-) create mode 100644 jstests/serverless/initial_sync_change_collection.js create mode 100644 jstests/serverless/libs/change_collection_util.js create mode 100644 jstests/serverless/write_to_change_collection_in_startup_recovery.js diff --git a/jstests/serverless/basic_write_to_change_collection.js b/jstests/serverless/basic_write_to_change_collection.js index 0105f33d344..401c8916880 100644 --- a/jstests/serverless/basic_write_to_change_collection.js +++ b/jstests/serverless/basic_write_to_change_collection.js @@ -1,13 +1,14 @@ // Tests that entries are written to the change collection for collection create, drop and document // modification operations. // @tags: [ -// multiversion_incompatible, // featureFlagMongoStore, // requires_fcv_61, // ] (function() { "use strict"; +load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries. + const replSetTest = new ReplSetTest({nodes: 2}); // TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and @@ -21,51 +22,6 @@ const primary = replSetTest.getPrimary(); const secondary = replSetTest.getSecondary(); const testDb = primary.getDB("test"); -// Verifies that the oplog and change collection entries are the same for the specified start and -// end duration of the oplog timestamp. -function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) { - const oplogColl = connection.getDB("local").oplog.rs; - const changeColl = connection.getDB("config").system.change_collection; - - // Fetch all oplog and change collection entries for the duration: [startOplogTimestamp, - // endOplogTimestamp]. - const oplogEntries = - oplogColl.find({$and: [{ts: {$gte: startOplogTimestamp}}, {ts: {$lte: endOplogTimestamp}}]}) - .toArray(); - const changeCollectionEntries = - changeColl - .find({$and: [{_id: {$gte: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]}) - .toArray(); - - assert.eq( - oplogEntries.length, - changeCollectionEntries.length, - "Number of entries in the oplog and the change collection is not the same. Oplog has total " + - oplogEntries.length + " entries , change collection has total " + - changeCollectionEntries.length + " entries"); - - for (let idx = 0; idx < oplogEntries.length; idx++) { - const oplogEntry = oplogEntries[idx]; - const changeCollectionEntry = changeCollectionEntries[idx]; - - // Remove the '_id' field from the change collection as oplog does not have it. - assert(changeCollectionEntry.hasOwnProperty("_id")); - assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts), - 0, - "Change collection '_id' field: " + tojson(changeCollectionEntry._id) + - " is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts)); - delete changeCollectionEntry["_id"]; - - // Verify that the oplog and change collecton entry (after removing the '_id') field are - // the same. - assert.eq( - oplogEntry, - changeCollectionEntry, - "Oplog and change collection entries are not same. Oplog entry: " + tojson(oplogEntry) + - ", change collection entry: " + tojson(changeCollectionEntry)); - } -} - // Performs writes on the specified collection. function performWrites(coll) { const docIds = [1, 2, 3, 4, 5]; diff --git a/jstests/serverless/change_streams/basic_read_from_change_collection.js b/jstests/serverless/change_streams/basic_read_from_change_collection.js index fa457e96905..6c2edc1da4d 100644 --- a/jstests/serverless/change_streams/basic_read_from_change_collection.js +++ b/jstests/serverless/change_streams/basic_read_from_change_collection.js @@ -1,7 +1,6 @@ // Tests that a change stream can be opened on a change collection when one exists, and that an // exception is thrown if we attempt to open a stream while change streams are disabled. // @tags: [ -// multiversion_incompatible, // featureFlagMongoStore, // requires_fcv_61, // assumes_against_mongod_not_mongos, diff --git a/jstests/serverless/initial_sync_change_collection.js b/jstests/serverless/initial_sync_change_collection.js new file mode 100644 index 00000000000..81dd18b3a93 --- /dev/null +++ b/jstests/serverless/initial_sync_change_collection.js @@ -0,0 +1,96 @@ +// Tests that the data cloning phase of initial sync does not clone the change collection documents +// and when the initial sync has completed the change collection and oplog entries are exactly same +// in the new secondary. +// @tags: [ +// featureFlagServerlessChangeStreams, +// featureFlagMongoStore, +// requires_fcv_61, +// ] +// +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); // For waitForFailPoint. +load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries. + +const replSetTest = new ReplSetTest({nodes: 1}); + +// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and +// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. +replSetTest.startSet( + {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})}); + +replSetTest.initiate(); + +const primary = replSetTest.getPrimary(); +const primaryChangeColl = primary.getDB("config").system.change_collection; + +const mdbStockPriceDoc = { + _id: "mdb", + price: 250 +}; + +// The document 'mdbStockPriceDoc' is inserted before starting the initial sync. As such the +// document 'mdbStockPriceDoc' should not be cloned in the secondary after initial sync is complete. +assert.commandWorked(primary.getDB("test").stockPrice.insert(mdbStockPriceDoc)); +assert.eq(primaryChangeColl.find({o: mdbStockPriceDoc}).toArray().length, 1); + +// Add a new secondary to the replica set and block the initial sync after the data cloning is done. +const secondary = replSetTest.add({ + setParameter: { + // Hang after the data cloning phase is completed. + "failpoint.initialSyncHangAfterDataCloning": tojson({mode: "alwaysOn"}), + "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}) + } +}); + +replSetTest.reInitiate(); + +// Wait for the cloning phase to complete. The cloning phase should not clone documents of the +// change collection from the primary. +assert.commandWorked(secondary.adminCommand({ + waitForFailPoint: "initialSyncHangAfterDataCloning", + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout +})); + +const tslaStockPriceDoc = { + _id: "tsla", + price: 650 +}; + +// The document 'tslaStockPriceDoc' is inserted in the primary after the data cloning phase has +// completed, as such this should be inserted in the secondary's change change collection. +assert.commandWorked(primary.getDB("test").stockPrice.insert(tslaStockPriceDoc)); +assert.eq(primaryChangeColl.find({o: tslaStockPriceDoc}).toArray().length, 1); + +// Unblock the initial sync process. +assert.commandWorked(secondary.getDB("test").adminCommand( + {configureFailPoint: "initialSyncHangAfterDataCloning", mode: "off"})); + +// Wait for the initial sync to complete. +replSetTest.waitForState(secondary, ReplSetTest.State.SECONDARY); + +// Verify that the document 'mdbStockPriceDoc' does not exist and the document 'tslaStockPriceDoc' +// exists in the secondary's change collection. +const changeCollDocs = + secondary.getDB("config") + .system.change_collection.find({$or: [{o: mdbStockPriceDoc}, {o: tslaStockPriceDoc}]}) + .toArray(); +assert.eq(changeCollDocs.length, 1); +assert.eq(changeCollDocs[0].o, tslaStockPriceDoc); + +// Get the timestamp of the first and the last entry from the secondary's oplog. +const oplogDocs = secondary.getDB("local").oplog.rs.find().toArray(); +assert.gt(oplogDocs.length, 0); +const startOplogTimestamp = oplogDocs[0].ts; +const endOplogTimestamp = oplogDocs.at(-1).ts; + +// The change collection gets created at the data cloning phase and documents are written to the +// oplog only after the data cloning is done. And so, the change collection already exists in place +// to capture all oplog entries. As such, the change collection entries and the oplog entries from +// the 'startOplogTimestamp' to the 'endOplogTimestamp' must be exactly the same. +verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp); + +replSetTest.stopSet(); +})(); diff --git a/jstests/serverless/libs/change_collection_util.js b/jstests/serverless/libs/change_collection_util.js new file mode 100644 index 00000000000..4026ea84f81 --- /dev/null +++ b/jstests/serverless/libs/change_collection_util.js @@ -0,0 +1,47 @@ +// Contains functions for testing the change collections. + +// Verifies that the oplog and change collection entries are the same for the specified start and +// end duration of the oplog timestamp. +function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) { + const oplogColl = connection.getDB("local").oplog.rs; + const changeColl = connection.getDB("config").system.change_collection; + + // Fetch all oplog and change collection entries for the duration: [startOplogTimestamp, + // endOplogTimestamp]. + const oplogEntries = + oplogColl.find({$and: [{ts: {$gte: startOplogTimestamp}}, {ts: {$lte: endOplogTimestamp}}]}) + .toArray(); + const changeCollectionEntries = + changeColl + .find({$and: [{_id: {$gte: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]}) + .toArray(); + + assert.eq( + oplogEntries.length, + changeCollectionEntries.length, + "Number of entries in the oplog and the change collection is not the same. Oplog has total " + + oplogEntries.length + " entries , change collection has total " + + changeCollectionEntries.length + " entries" + + "change collection entries " + tojson(changeCollectionEntries)); + + for (let idx = 0; idx < oplogEntries.length; idx++) { + const oplogEntry = oplogEntries[idx]; + const changeCollectionEntry = changeCollectionEntries[idx]; + + // Remove the '_id' field from the change collection as oplog does not have it. + assert(changeCollectionEntry.hasOwnProperty("_id")); + assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts), + 0, + "Change collection '_id' field: " + tojson(changeCollectionEntry._id) + + " is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts)); + delete changeCollectionEntry["_id"]; + + // Verify that the oplog and change collecton entry (after removing the '_id') field are + // the same. + assert.eq( + oplogEntry, + changeCollectionEntry, + "Oplog and change collection entries are not same. Oplog entry: " + tojson(oplogEntry) + + ", change collection entry: " + tojson(changeCollectionEntry)); + } +} diff --git a/jstests/serverless/write_to_change_collection_in_startup_recovery.js b/jstests/serverless/write_to_change_collection_in_startup_recovery.js new file mode 100644 index 00000000000..a14b5e28600 --- /dev/null +++ b/jstests/serverless/write_to_change_collection_in_startup_recovery.js @@ -0,0 +1,108 @@ +// Tests that replaying the oplog entries during the startup recovery also writes to the change +// collection. +// @tags: [ +// featureFlagServerlessChangeStreams, +// multiversion_incompatible, +// featureFlagMongoStore, +// ] + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); // For configureFailPoint. +load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries. + +const replSetTest = new ReplSetTest({nodes: 1}); + +// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and +// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. +replSetTest.startSet( + {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})}); + +replSetTest.initiate(); + +let primary = replSetTest.getPrimary(); + +// Insert a document to the collection and then capture the corresponding oplog timestamp. This +// timestamp will be the start timestamp beyond (inclusive) which we will validate the oplog and the +// change collection entries. +const startTimestamp = assert + .commandWorked(primary.getDB("test").runCommand( + {insert: "seedCollection", documents: [{_id: "beginTs"}]})) + .operationTime; + +// Pause the checkpointing, as such non-journaled collection including the change collection will +// not be persisted. +const pauseCheckpointThreadFailPoint = configureFailPoint(primary, "pauseCheckpointThread"); +pauseCheckpointThreadFailPoint.wait(); + +// Insert a document to the collection. +assert.commandWorked(primary.getDB("test").stockPrice.insert({_id: "mdb", price: 250})); + +// Verify that the inserted document can be queried from the 'stockPrice', the 'oplog.rs', and +// the 'system.change_collection'. +assert.eq(primary.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 1); +assert.eq(primary.getDB("local") + .oplog.rs.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}}) + .toArray() + .length, + 1); +assert.eq(primary.getDB("config") + .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}}) + .toArray() + .length, + 1); + +// Perform ungraceful shutdown of the primary node and do not clean the db path directory. +replSetTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}, {forRestart: true}); + +// Run a new mongoD instance with db path pointing to the replica set primary db directory. +const standalone = + MongoRunner.runMongod({dbpath: primary.dbpath, noReplSet: true, noCleanData: true}); +assert.neq(null, standalone, "Fail to restart the node as standalone"); + +// Verify that the inserted document does not exist both in the 'stockPrice' and +// the 'system.change_collection' but exists in the 'oplog.rs'. +assert.eq(standalone.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 0); +assert.eq(standalone.getDB("local") + .oplog.rs.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}}) + .toArray() + .length, + 1); +assert.eq(standalone.getDB("config") + .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}}) + .toArray() + .length, + 0); + +// Stop the mongoD instance and do not clean the db directory. +MongoRunner.stopMongod(standalone, null, {noCleanData: true, skipValidation: true, wait: true}); + +// Start the replica set primary with the same db path. +replSetTest.start(primary, { + noCleanData: true, + setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"}) +}); + +primary = replSetTest.getPrimary(); + +// Verify that the 'stockPrice' and the 'system.change_collection' now have the inserted document. +// This document was inserted by applying oplog entries during the startup recovery. +assert.eq(primary.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 1); +assert.eq(primary.getDB("config") + .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}}) + .toArray() + .length, + 1); + +// Get the oplog timestamp up to this point. All oplog entries upto this timestamp must exist in the +// change collection. +const endTimestamp = primary.getDB("local").oplog.rs.find().toArray().at(-1).ts; +assert(endTimestamp !== undefined); + +// Verify that the oplog and the change collection entries between the ['startTimestamp', +// 'endTimestamp'] window are exactly same and in the same order. +verifyChangeCollectionEntries(primary, startTimestamp, endTimestamp); + +replSetTest.stopSet(); +})(); diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp index c3c87a95844..e3ae1132263 100644 --- a/src/mongo/db/catalog_raii.cpp +++ b/src/mongo/db/catalog_raii.cpp @@ -572,17 +572,19 @@ AutoGetChangeCollection::AutoGetChangeCollection(OperationContext* opCtx, AutoGetChangeCollection::AccessMode mode, boost::optional tenantId, Date_t deadline) { - auto nss = NamespaceString::makeChangeCollectionNSS(tenantId); - if (mode == AccessMode::kWrite) { + if (mode == AccessMode::kWriteInOplogContext) { // The global lock must already be held. invariant(opCtx->lockState()->isWriteLocked()); - - // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove - // 'AllowLockAcquisitionOnTimestampedUnitOfWork'. - AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); - _coll.emplace( - opCtx, nss, LockMode::MODE_IX, AutoGetCollectionViewMode::kViewsForbidden, deadline); } + + // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove + // 'AllowLockAcquisitionOnTimestampedUnitOfWork'. + AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); + _coll.emplace(opCtx, + NamespaceString::makeChangeCollectionNSS(tenantId), + LockMode::MODE_IX, + AutoGetCollectionViewMode::kViewsForbidden, + deadline); } const Collection* AutoGetChangeCollection::operator->() const { diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h index 2c48422f8fb..62d63404e9f 100644 --- a/src/mongo/db/catalog_raii.h +++ b/src/mongo/db/catalog_raii.h @@ -474,12 +474,15 @@ private: * A RAII-style class to acquire lock to a particular tenant's change collection. * * A change collection can be accessed in the following modes: - * kWrite - This mode assumes that the global IX lock is already held before writing to the change - * collection. + * kWriteInOplogContext - perform writes to the change collection by taking the IX lock on a + * tenant's change collection. The change collection is written along with + * the oplog in the same 'WriteUnitOfWork' and assumes that the global IX + * lock is already held. + * kWrite - takes the IX lock on a tenant's change collection to perform any writes. */ class AutoGetChangeCollection { public: - enum class AccessMode { kWrite }; + enum class AccessMode { kWriteInOplogContext, kWrite }; AutoGetChangeCollection(OperationContext* opCtx, AccessMode mode, diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 9a1efe8b8f7..5a79d14953f 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -71,6 +71,8 @@ BSONObj createChangeCollectionEntryFromOplog(const BSONObj& oplogEntry) { */ class ChangeCollectionsWriter { public: + explicit ChangeCollectionsWriter(const AutoGetChangeCollection::AccessMode& accessMode) + : _accessMode{accessMode} {} /** * Adds the insert statement for the provided tenant that will be written to the change * collection when the 'write()' method is called. @@ -88,7 +90,7 @@ public: Status write(OperationContext* opCtx, OpDebug* opDebug) { for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) { AutoGetChangeCollection tenantChangeCollection( - opCtx, AutoGetChangeCollection::AccessMode::kWrite, boost::none /* tenantId */); + opCtx, _accessMode, boost::none /* tenantId */); // The change collection does not exist for a particular tenant because either the // change collection is not enabled or is in the process of enablement. Ignore this @@ -143,6 +145,9 @@ private: return true; } + // Mode required to access change collections. + const AutoGetChangeCollection::AccessMode _accessMode; + // Maps inserts statements for each tenant. stdx::unordered_map, TenantId::Hasher> _tenantStatementsMap; @@ -220,7 +225,8 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( // commiting the unit of work. invariant(opCtx->lockState()->inAWriteUnitOfWork()); - ChangeCollectionsWriter changeCollectionsWriter; + ChangeCollectionsWriter changeCollectionsWriter{ + AutoGetChangeCollection::AccessMode::kWriteInOplogContext}; for (size_t idx = 0; idx < oplogRecords.size(); idx++) { auto& record = oplogRecords[idx]; @@ -248,8 +254,18 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( OperationContext* opCtx, std::vector::const_iterator beginOplogEntries, std::vector::const_iterator endOplogEntries, + bool isGlobalIXLockAcquired, OpDebug* opDebug) { - ChangeCollectionsWriter changeCollectionsWriter; + // This method must be called within a 'WriteUnitOfWork'. The caller must be responsible for + // commiting the unit of work. + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + // If the global IX lock is already acquired, then change collections entries will be written + // within the oplog context as such acquire the correct access mode for change collections. + const auto changeCollAccessMode = isGlobalIXLockAcquired + ? AutoGetChangeCollection::AccessMode::kWriteInOplogContext + : AutoGetChangeCollection::AccessMode::kWrite; + ChangeCollectionsWriter changeCollectionsWriter{changeCollAccessMode}; // Transform oplog entries to change collections entries and group them by tenant id. for (auto oplogEntryIter = beginOplogEntries; oplogEntryIter != endOplogEntries; diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index f9fe6d6f414..37a9dbaef27 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -115,6 +115,7 @@ public: OperationContext* opCtx, std::vector::const_iterator beginOplogEntries, std::vector::const_iterator endOplogEntries, + bool isGlobalIXLockAcquired, OpDebug* opDebug); }; diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index e380fbe6238..8a141ac0214 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -109,8 +109,8 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss, BaseCloner::ClonerStages CollectionCloner::getStages() { if (_sourceNss.isChangeStreamPreImagesCollection() || _sourceNss.isChangeCollection()) { - // Only the change stream pre-images collection and change collection needs to be created - - // its documents should not be copied. + // The change stream pre-images collection and the change collection only need to be created + // - their documents should not be copied. return {&_listIndexesStage, &_createCollectionStage, &_setupIndexBuildersForUnfinishedIndexesStage}; diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 5eed8504f83..f7403a15209 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -36,6 +36,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/logical_session_id.h" @@ -46,6 +47,7 @@ #include "mongo/db/stats/counters.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/db/storage/control/journal_flusher.h" +#include "mongo/db/storage/storage_util.h" #include "mongo/logv2/log.h" #include "mongo/platform/basic.h" #include "mongo/util/fail_point.h" @@ -126,6 +128,47 @@ void _addOplogChainOpsToWriterVectors(OperationContext* opCtx, opCtx, &derivedOps->back(), writerVectors, collPropertiesCache, shouldSerialize); } +Status _insertDocumentsToOplogAndChangeCollections( + OperationContext* opCtx, + std::vector::const_iterator begin, + std::vector::const_iterator end, + bool skipWritesToOplog) { + WriteUnitOfWork wunit(opCtx); + + if (!skipWritesToOplog) { + AutoGetOplog autoOplog(opCtx, OplogAccessMode::kWrite); + auto& oplogColl = autoOplog.getCollection(); + if (!oplogColl) { + return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"}; + } + + auto status = oplogColl->insertDocuments( + opCtx, begin, end, nullptr /* OpDebug */, false /* fromMigrate */); + if (!status.isOK()) { + return status; + } + } + + // Write the corresponding oplog entries to tenants respective change + // collections in the serverless. + if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + auto status = + ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection( + opCtx, + begin, + end, + !skipWritesToOplog /* hasAcquiredGlobalIXLock */, + nullptr /* OpDebug */); + if (!status.isOK()) { + return status; + } + } + + wunit.commit(); + + return Status::OK(); +} + } // namespace @@ -365,19 +408,27 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { } -// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that -// 'ops' stays valid until all scheduled work in the thread pool completes. -void scheduleWritesToOplog(OperationContext* opCtx, - StorageInterface* storageInterface, - ThreadPool* writerPool, - const std::vector& ops) { - auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) { - // The returned function will be run in a separate thread after this returns. Therefore all - // captures other than 'ops' must be by value since they will not be available. The caller - // guarantees that 'ops' will stay in scope until the spawned threads complete. - return [storageInterface, &ops, begin, end](auto status) { - invariant(status); +// Schedules the writes to the oplog and the change collection for 'ops' into threadPool. The caller +// must guarantee that 'ops' stays valid until all scheduled work in the thread pool completes. +void scheduleWritesToOplogAndChangeCollection(OperationContext* opCtx, + StorageInterface* storageInterface, + ThreadPool* writerPool, + const std::vector& ops, + bool skipWritesToOplog) { + // Skip performing any writes during the startup recovery when running in the non-serverless + // environment. + if (skipWritesToOplog && + !ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + return; + } + auto makeOplogWriterForRange = [storageInterface, &ops, skipWritesToOplog](size_t begin, + size_t end) { + // The returned function will be run in a separate thread after this returns. Therefore + // all captures other than 'ops' must be by value since they will not be available. The + // caller guarantees that 'ops' will stay in scope until the spawned threads complete. + return [storageInterface, &ops, begin, end, skipWritesToOplog](auto status) { + invariant(status); auto opCtx = cc().makeOperationContext(); // This code path is only executed on secondaries and initial syncing nodes, so it is @@ -396,9 +447,23 @@ void scheduleWritesToOplog(OperationContext* opCtx, ops[i].getOpTime().getTerm()}); } - fassert(40141, - storageInterface->insertDocuments( - opCtx.get(), NamespaceString::kRsOplogNamespace, docs)); + // TODO SERVER-67168 the 'nsOrUUID' is used only to log the debug message when retrying + // inserts on the oplog and change collections. The 'writeConflictRetry' assumes + // operations are done on a single namespace. But the method + // '_insertDocumentsToOplogAndChangeCollections' can perform inserts on the oplog and + // multiple change collections, ie. several namespaces. As such 'writeConflictRetry' + // will not log the correct namespace when retrying. Refactor this code to log the + // correct namespace in the log message. + NamespaceStringOrUUID nsOrUUID = !skipWritesToOplog + ? NamespaceString::kRsOplogNamespace + : NamespaceString::makeChangeCollectionNSS(boost::none /* tenantId */); + + fassert(6663400, + storage_helpers::insertBatchAndHandleRetry( + opCtx.get(), nsOrUUID, docs, [&](auto* opCtx, auto begin, auto end) { + return _insertDocumentsToOplogAndChangeCollections( + opCtx, begin, end, skipWritesToOplog); + })); }; }; @@ -463,9 +528,11 @@ StatusWith OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, if (!getOptions().skipWritesToOplog) { _consistencyMarkers->setOplogTruncateAfterPoint( opCtx, _replCoord->getMyLastAppliedOpTime().getTimestamp()); - scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops); } + scheduleWritesToOplogAndChangeCollection( + opCtx, _storageInterface, _writerPool, ops, getOptions().skipWritesToOplog); + // Holds 'pseudo operations' generated by secondaries to aid in replication. // Keep in scope until all operations in 'ops' and 'derivedOps' have been applied. // Pseudo operations include: @@ -520,8 +587,8 @@ StatusWith OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, auto opCtx = cc().makeOperationContext(); - // This code path is only executed on secondaries and initial syncing nodes, - // so it is safe to exclude any writes from Flow Control. + // This code path is only executed on secondaries and initial syncing nodes, so + // it is safe to exclude any writes from Flow Control. opCtx->setShouldParticipateInFlowControl(false); opCtx->setEnforceConstraints(false); diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index b2a0560d145..9cfc1b0f4e7 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -82,6 +82,7 @@ #include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/control/storage_control.h" #include "mongo/db/storage/oplog_cap_maintainer_thread.h" +#include "mongo/db/storage/storage_util.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" #include "mongo/util/background.h" @@ -342,8 +343,6 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, boost::optional autoOplog; const CollectionPtr* collection; - bool shouldWriteToChangeCollections = false; - auto nss = nsOrUUID.nss(); if (nss && nss->isOplog()) { // Simplify locking rules for oplog collection. @@ -352,9 +351,6 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, if (!*collection) { return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"}; } - - shouldWriteToChangeCollections = - ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive(); } else { autoColl.emplace(opCtx, nsOrUUID, MODE_IX); auto collectionResult = getCollection( @@ -372,17 +368,6 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, return status; } - // Insert oplog entries to change collections if we are running in the serverless and the 'nss' - // is 'local.oplog.rs'. - if (shouldWriteToChangeCollections) { - auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); - status = changeCollectionManager.insertDocumentsToChangeCollection( - opCtx, begin, end, nullOpDebug); - if (!status.isOK()) { - return status; - } - } - wunit.commit(); return Status::OK(); @@ -393,35 +378,10 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, Status StorageInterfaceImpl::insertDocuments(OperationContext* opCtx, const NamespaceStringOrUUID& nsOrUUID, const std::vector& docs) { - if (docs.size() > 1U) { - try { - if (insertDocumentsSingleBatch(opCtx, nsOrUUID, docs.cbegin(), docs.cend()).isOK()) { - return Status::OK(); - } - } catch (...) { - // Ignore this failure and behave as-if we never tried to do the combined batch insert. - // The loop below will handle reporting any non-transient errors. - } - } - - // Try to insert the batch one-at-a-time because the batch failed all-at-once inserting. - for (auto it = docs.cbegin(); it != docs.cend(); ++it) { - auto status = writeConflictRetry( - opCtx, "StorageInterfaceImpl::insertDocuments", nsOrUUID.toString(), [&] { - auto status = insertDocumentsSingleBatch(opCtx, nsOrUUID, it, it + 1); - if (!status.isOK()) { - return status; - } - - return Status::OK(); - }); - - if (!status.isOK()) { - return status; - } - } - - return Status::OK(); + return storage_helpers::insertBatchAndHandleRetry( + opCtx, nsOrUUID, docs, [&](auto* opCtx, auto begin, auto end) { + return insertDocumentsSingleBatch(opCtx, nsOrUUID, begin, end); + }); } Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* opCtx) { diff --git a/src/mongo/db/storage/storage_util.h b/src/mongo/db/storage/storage_util.h index 3d8b59bb79a..d6eaeea2f6f 100644 --- a/src/mongo/db/storage/storage_util.h +++ b/src/mongo/db/storage/storage_util.h @@ -29,16 +29,13 @@ #pragma once +#include "mongo/db/concurrency/exception_util.h" #include "mongo/db/record_id.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/storage/write_unit_of_work.h" #include "mongo/util/uuid.h" namespace mongo { - -class Collection; -class Ident; -class OperationContext; -class NamespaceString; - namespace catalog { /** @@ -89,4 +86,58 @@ Status dropCollection(OperationContext* opCtx, } // namespace catalog + +namespace storage_helpers { + +/** + * Inserts the batch of documents 'docs' using the provided callable object 'insertFn'. + * + * 'insertFnType' type should be Callable and have the following call signature: + * Status insertFn(OperationContext* opCtx, + * std::vector::const_iterator begin, + * std::vector::const_iterator end); + * + * where 'begin' (inclusive) and 'end' (exclusive) are the iterators for the range of documents + * 'docs'. + * + * The function first attempts to insert documents as one batch. If the insertion fails, then it + * falls back to inserting documents one at a time. The insertion is retried in case of write + * conflicts. + */ +template +Status insertBatchAndHandleRetry(OperationContext* opCtx, + const NamespaceStringOrUUID& nsOrUUID, + const std::vector& docs, + insertFnType&& insertFn) { + if (docs.size() > 1U) { + try { + if (insertFn(opCtx, docs.cbegin(), docs.cend()).isOK()) { + return Status::OK(); + } + } catch (...) { + // Ignore this failure and behave as-if we never tried to do the combined batch insert. + // The loop below will handle reporting any non-transient errors. + } + } + + // Try to insert the batch one-at-a-time because the batch failed all-at-once inserting. + for (auto it = docs.cbegin(); it != docs.cend(); ++it) { + auto status = writeConflictRetry(opCtx, "batchInsertDocuments", nsOrUUID.toString(), [&] { + auto status = insertFn(opCtx, it, it + 1); + if (!status.isOK()) { + return status; + } + + return Status::OK(); + }); + + if (!status.isOK()) { + return status; + } + } + + return Status::OK(); +} +} // namespace storage_helpers + } // namespace mongo -- cgit v1.2.1