diff options
author | Antonio Fuschetto <antonio.fuschetto@mongodb.com> | 2022-11-04 11:25:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-04 11:56:30 +0000 |
commit | d25aa2a766a5969eb690f4d97d0c52a6dc3dcf2f (patch) | |
tree | 97967092bda17bf9a68eb01d0649327b1715c7af /src | |
parent | 6800a766528950978fe4f67f88350c2da152c9dd (diff) | |
download | mongo-d25aa2a766a5969eb690f4d97d0c52a6dc3dcf2f.tar.gz |
SERVER-70660 Support for recoverable critical section on databases
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/collection_critical_section_document.idl | 32 | ||||
-rw-r--r-- | src/mongo/db/s/database_sharding_state.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/s/database_sharding_state.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.cpp | 114 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_recovery_service.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_recovery_service.h | 53 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_recovery_service_test.cpp | 911 |
8 files changed, 1124 insertions, 111 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 3248c1b296d..2876d1ee99c 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -719,6 +719,7 @@ env.CppUnitTest( 'sharding_initialization_mongod_test.cpp', 'sharding_initialization_op_observer_test.cpp', 'sharding_logging_test.cpp', + 'sharding_recovery_service_test.cpp', 'split_chunk_request_test.cpp', 'split_vector_test.cpp', 'start_chunk_clone_request_test.cpp', @@ -738,6 +739,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/catalog_test_fixture', '$BUILD_DIR/mongo/db/catalog/database_holder', + '$BUILD_DIR/mongo/db/commands/create_command', '$BUILD_DIR/mongo/db/commands/list_collections_filter', '$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util', '$BUILD_DIR/mongo/db/keys_collection_client_direct', diff --git a/src/mongo/db/s/collection_critical_section_document.idl b/src/mongo/db/s/collection_critical_section_document.idl index f05055dbc36..fc5ad608790 100644 --- a/src/mongo/db/s/collection_critical_section_document.idl +++ b/src/mongo/db/s/collection_critical_section_document.idl @@ -26,10 +26,9 @@ # it in the license file. # -# This file defines the format of documents stored in config.collectionCriticalSections. -# Each document is used to represent that a collection is under an operation that holds -# the collection critical section. - +# This file defines the format of documents stored in the `config.collectionCriticalSections` +# collection. Each document is used to represent that a database or a collection is under an +# operation that entered the critical section. global: cpp_namespace: "mongo" @@ -39,30 +38,27 @@ imports: structs: collectionCriticalSectionDocument: - description: "Represents that a collection is under an operation that holds the collection - critical section." + description: "Represents that a database or a collection is under an operation that entered + the critical section." generate_comparison_operators: false strict: false fields: _id: type: namespacestring - description: "The namespace of the collection that is under the collection critical - section." + description: "The name of the database or the namespace of the collection that is + under the critical section." cpp_name: nss reason: type: object - description: "An identifier of the operation that is holding the critical section. - - Only used for diagnostic purposes." + description: "An identifier of the operation that entered the critical section. + Only used for diagnostic purposes." blockReads: type: bool - description: "It states whether this critical section should block reads - (CS in commit phase) or not (CS in catch up phase)." + description: "It states whether this critical section should block reads (CS in + commit phase) or not (CS in catch up phase)." additionalInfo: type: object - description: "Additional information associated to the operation that is holding - the critical section. - - Only used for diagnostic purposes." + description: "Additional information associated to the operation that entered the + critical section. + Only used for diagnostic purposes." optional: true - diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp index 72dee5d07c0..f3c957e97a0 100644 --- a/src/mongo/db/s/database_sharding_state.cpp +++ b/src/mongo/db/s/database_sharding_state.cpp @@ -71,6 +71,16 @@ public: return it->second.get(); } + std::vector<DatabaseName> getDatabaseNames() { + stdx::lock_guard lg(_mutex); + std::vector<DatabaseName> result; + result.reserve(_databases.size()); + for (const auto& [dbName, _] : _databases) { + result.emplace_back(dbName); + } + return result; + } + private: Mutex _mutex = MONGO_MAKE_LATCH("DatabaseShardingStateMap::_mutex"); @@ -122,17 +132,19 @@ DatabaseShardingState::ScopedDatabaseShardingState DatabaseShardingState::acquir return ScopedDatabaseShardingState(std::move(lock), dssAndLock->dss.get()); } +std::vector<DatabaseName> DatabaseShardingState::getDatabaseNames(OperationContext* opCtx) { + auto& databasesMap = DatabaseShardingStateMap::get(opCtx->getServiceContext()); + return databasesMap.getDatabaseNames(); +} + void DatabaseShardingState::enterCriticalSectionCatchUpPhase(OperationContext* opCtx, const BSONObj& reason) { - invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X)); _critSec.enterCriticalSectionCatchUpPhase(reason); - cancelDbMetadataRefresh(); } void DatabaseShardingState::enterCriticalSectionCommitPhase(OperationContext* opCtx, const BSONObj& reason) { - invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X)); _critSec.enterCriticalSectionCommitPhase(reason); } @@ -140,6 +152,10 @@ void DatabaseShardingState::exitCriticalSection(OperationContext* opCtx, const B _critSec.exitCriticalSection(reason); } +void DatabaseShardingState::exitCriticalSectionNoChecks(OperationContext* opCtx) { + _critSec.exitCriticalSectionNoChecks(); +} + void DatabaseShardingState::setMovePrimarySourceManager(OperationContext* opCtx, MovePrimarySourceManager* sourceMgr) { invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X)); diff --git a/src/mongo/db/s/database_sharding_state.h b/src/mongo/db/s/database_sharding_state.h index 70e788b746c..344635dd0e0 100644 --- a/src/mongo/db/s/database_sharding_state.h +++ b/src/mongo/db/s/database_sharding_state.h @@ -83,6 +83,11 @@ public: DSSAcquisitionMode mode); /** + * Returns the names of the databases that have a DatabaseShardingState. + */ + static std::vector<DatabaseName> getDatabaseNames(OperationContext* opCtx); + + /** * Returns the name of the database related to the current sharding state. */ const DatabaseName& getDbName() const { @@ -96,6 +101,7 @@ public: void enterCriticalSectionCatchUpPhase(OperationContext* opCtx, const BSONObj& reason); void enterCriticalSectionCommitPhase(OperationContext* opCtx, const BSONObj& reason); void exitCriticalSection(OperationContext* opCtx, const BSONObj& reason); + void exitCriticalSectionNoChecks(OperationContext* opCtx); auto getCriticalSectionSignal(ShardingMigrationCriticalSection::Operation op) const { return _critSec.getSignal(op); diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index ae059d19512..352285de2ed 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -291,10 +291,21 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, !recoverable_critical_section_util::inRecoveryMode(opCtx)) { const auto collCSDoc = CollectionCriticalSectionDocument::parse( IDLParserContext("ShardServerOpObserver"), insertedDoc); - opCtx->recoveryUnit()->onCommit( - [opCtx, - insertedNss = collCSDoc.getNss(), - reason = collCSDoc.getReason().getOwned()](boost::optional<Timestamp>) { + opCtx->recoveryUnit()->onCommit([opCtx, + insertedNss = collCSDoc.getNss(), + reason = collCSDoc.getReason().getOwned()]( + boost::optional<Timestamp>) { + if (nsIsDbOnly(insertedNss.ns())) { + boost::optional<AutoGetDb> lockDbIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + lockDbIfNotPrimary.emplace(opCtx, insertedNss.dbName(), MODE_IX); + } + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( + opCtx, insertedNss.dbName(), DSSAcquisitionMode::kExclusive); + scopedDss->enterCriticalSectionCatchUpPhase(opCtx, reason); + } else { boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; if (!isStandaloneOrPrimary(opCtx)) { lockCollectionIfNotPrimary.emplace( @@ -309,7 +320,8 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( opCtx, insertedNss, CSRAcquisitionMode::kExclusive); scopedCsr->enterCriticalSectionCatchUpPhase(reason); - }); + } + }); } if (metadata && metadata->isSharded()) { @@ -452,20 +464,32 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE opCtx->recoveryUnit()->onCommit( [opCtx, updatedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( boost::optional<Timestamp>) { - boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; - if (!isStandaloneOrPrimary(opCtx)) { - lockCollectionIfNotPrimary.emplace( - opCtx, - updatedNss, - MODE_IX, - AutoGetCollection::Options{}.viewMode( - auto_get_collection::ViewMode::kViewsPermitted)); - } + if (nsIsDbOnly(updatedNss.ns())) { + boost::optional<AutoGetDb> lockDbIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + lockDbIfNotPrimary.emplace(opCtx, updatedNss.dbName(), MODE_IX); + } + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( + opCtx, updatedNss.dbName(), DSSAcquisitionMode::kExclusive); + scopedDss->enterCriticalSectionCommitPhase(opCtx, reason); + } else { + boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + lockCollectionIfNotPrimary.emplace( + opCtx, + updatedNss, + MODE_IX, + AutoGetCollection::Options{}.viewMode( + auto_get_collection::ViewMode::kViewsPermitted)); + } - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - CollectionShardingRuntime::assertCollectionLockedAndAcquire( - opCtx, updatedNss, CSRAcquisitionMode::kExclusive) - ->enterCriticalSectionCommitPhase(reason); + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, updatedNss, CSRAcquisitionMode::kExclusive); + scopedCsr->enterCriticalSectionCommitPhase(reason); + } }); } @@ -594,26 +618,46 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, opCtx->recoveryUnit()->onCommit( [opCtx, deletedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( boost::optional<Timestamp>) { - boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; - if (!isStandaloneOrPrimary(opCtx)) { - lockCollectionIfNotPrimary.emplace( - opCtx, - deletedNss, - MODE_IX, - AutoGetCollection::Options{}.viewMode( - auto_get_collection::ViewMode::kViewsPermitted)); - } + if (nsIsDbOnly(deletedNss.ns())) { + boost::optional<AutoGetDb> lockDbIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + lockDbIfNotPrimary.emplace(opCtx, deletedNss.dbName(), MODE_IX); + } - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( - opCtx, deletedNss, CSRAcquisitionMode::kExclusive); + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( + opCtx, deletedNss.dbName(), DSSAcquisitionMode::kExclusive); - // Secondary nodes must clear the filtering metadata before releasing the - // in-memory critical section - if (!isStandaloneOrPrimary(opCtx)) - scopedCsr->clearFilteringMetadata(opCtx); + // Secondary nodes must clear the database metadata before releasing the + // in-memory critical section. + if (!isStandaloneOrPrimary(opCtx)) { + DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, deletedNss.dbName()); + } - scopedCsr->exitCriticalSection(reason); + scopedDss->exitCriticalSection(opCtx, reason); + } else { + boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + lockCollectionIfNotPrimary.emplace( + opCtx, + deletedNss, + MODE_IX, + AutoGetCollection::Options{}.viewMode( + auto_get_collection::ViewMode::kViewsPermitted)); + } + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, deletedNss, CSRAcquisitionMode::kExclusive); + + // Secondary nodes must clear the collection filtering metadata before releasing + // the in-memory critical section. + if (!isStandaloneOrPrimary(opCtx)) { + scopedCsr->clearFilteringMetadata(opCtx); + } + + scopedCsr->exitCriticalSection(reason); + } }); } diff --git a/src/mongo/db/s/sharding_recovery_service.cpp b/src/mongo/db/s/sharding_recovery_service.cpp index 02dab8f3773..10505a5997b 100644 --- a/src/mongo/db/s/sharding_recovery_service.cpp +++ b/src/mongo/db/s/sharding_recovery_service.cpp @@ -27,7 +27,6 @@ * it in the license file. */ - #include <set> #include "mongo/platform/basic.h" @@ -42,6 +41,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/collection_critical_section_document_gen.h" #include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/sharding_migration_critical_section.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_collection.h" @@ -50,7 +50,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - namespace mongo { namespace recoverable_critical_section_util { @@ -154,9 +153,15 @@ void ShardingRecoveryService::acquireRecoverableCriticalSectionBlockWrites( { Lock::GlobalLock lk(opCtx, MODE_IX); - // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to - // construct cCollLock. - AutoGetCollection cCollLock(opCtx, nss, MODE_S); + boost::optional<AutoGetDb> dbLock; + boost::optional<AutoGetCollection> collLock; + if (nsIsDbOnly(nss.ns())) { + dbLock.emplace(opCtx, nss.dbName(), MODE_S); + } else { + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct collLock. + collLock.emplace(opCtx, nss, MODE_S); + } DBDirectClient dbClient(opCtx); FindCommandRequest findRequest{NamespaceString::kCollectionCriticalSectionsNamespace}; @@ -172,10 +177,9 @@ void ShardingRecoveryService::acquireRecoverableCriticalSectionBlockWrites( invariant(collCSDoc.getReason().woCompare(reason) == 0, str::stream() - << "Trying to acquire a critical section blocking writes for namespace " - << nss << " and reason " << reason - << " but it is already taken by another operation with different reason " - << collCSDoc.getReason()); + << "Trying to acquire a critical section blocking writes for namespace " + << nss << " and reason " << reason << " but it is already taken by " + << "another operation with different reason " << collCSDoc.getReason()); LOGV2_DEBUG( 5656601, @@ -242,9 +246,15 @@ void ShardingRecoveryService::promoteRecoverableCriticalSectionToBlockAlsoReads( invariant(!opCtx->lockState()->isLocked()); { - // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to - // construct cCollLock. - AutoGetCollection cCollLock(opCtx, nss, MODE_X); + boost::optional<AutoGetDb> dbLock; + boost::optional<AutoGetCollection> collLock; + if (nsIsDbOnly(nss.ns())) { + dbLock.emplace(opCtx, nss.dbName(), MODE_X); + } else { + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct collLock. + collLock.emplace(opCtx, nss, MODE_X); + } DBDirectClient dbClient(opCtx); FindCommandRequest findRequest{NamespaceString::kCollectionCriticalSectionsNamespace}; @@ -345,9 +355,15 @@ void ShardingRecoveryService::releaseRecoverableCriticalSection( invariant(!opCtx->lockState()->isLocked()); { - // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to - // construct cCollLock. - AutoGetCollection collLock(opCtx, nss, MODE_X); + boost::optional<AutoGetDb> dbLock; + boost::optional<AutoGetCollection> collLock; + if (nsIsDbOnly(nss.ns())) { + dbLock.emplace(opCtx, nss.dbName(), MODE_X); + } else { + // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to + // construct collLock. + collLock.emplace(opCtx, nss, MODE_X); + } DBDirectClient dbClient(opCtx); @@ -442,21 +458,26 @@ void ShardingRecoveryService::recoverRecoverableCriticalSections(OperationContex LOGV2_DEBUG(5604000, 2, "Recovering all recoverable critical sections"); // Release all in-memory critical sections - const auto collectionNames = CollectionShardingState::getCollectionNames(opCtx); - for (const auto& collName : collectionNames) { + for (const auto& nss : CollectionShardingState::getCollectionNames(opCtx)) { try { - AutoGetCollection collLock(opCtx, collName, MODE_X); - CollectionShardingRuntime::assertCollectionLockedAndAcquire( - opCtx, collName, CSRAcquisitionMode::kExclusive) - ->exitCriticalSectionNoChecks(); + AutoGetCollection collLock(opCtx, nss, MODE_X); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive); + scopedCsr->exitCriticalSectionNoChecks(); } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { LOGV2_DEBUG(6050800, 2, "Skipping attempting to exit critical section for view in " "recoverRecoverableCriticalSections", - "namespace"_attr = collName); + "namespace"_attr = nss); } } + for (const auto& dbName : DatabaseShardingState::getDatabaseNames(opCtx)) { + AutoGetDb dbLock(opCtx, dbName, MODE_X); + auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( + opCtx, dbName, DSSAcquisitionMode::kExclusive); + scopedDss->exitCriticalSectionNoChecks(opCtx); + } // Map the critical sections that are on disk to memory PersistentTaskStore<CollectionCriticalSectionDocument> store( @@ -464,13 +485,23 @@ void ShardingRecoveryService::recoverRecoverableCriticalSections(OperationContex store.forEach(opCtx, BSONObj{}, [&opCtx](const CollectionCriticalSectionDocument& doc) { const auto& nss = doc.getNss(); { - AutoGetCollection collLock(opCtx, nss, MODE_X); - auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( - opCtx, nss, CSRAcquisitionMode::kExclusive); - - scopedCsr->enterCriticalSectionCatchUpPhase(doc.getReason()); - if (doc.getBlockReads()) - scopedCsr->enterCriticalSectionCommitPhase(doc.getReason()); + if (nsIsDbOnly(nss.ns())) { + AutoGetDb dbLock(opCtx, nss.dbName(), MODE_X); + auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( + opCtx, nss.dbName(), DSSAcquisitionMode::kExclusive); + scopedDss->enterCriticalSectionCatchUpPhase(opCtx, doc.getReason()); + if (doc.getBlockReads()) { + scopedDss->enterCriticalSectionCommitPhase(opCtx, doc.getReason()); + } + } else { + AutoGetCollection collLock(opCtx, nss, MODE_X); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive); + scopedCsr->enterCriticalSectionCatchUpPhase(doc.getReason()); + if (doc.getBlockReads()) { + scopedCsr->enterCriticalSectionCommitPhase(doc.getReason()); + } + } return true; } @@ -494,6 +525,12 @@ void ShardingRecoveryService::recoverStates(OperationContext* opCtx, } } +void ShardingRecoveryService::onInitialDataAvailable(OperationContext* opCtx, + bool isMajorityDataAvailable) { + recoverRecoverableCriticalSections(opCtx); + recoverIndexesCatalog(opCtx); +} + void ShardingRecoveryService::recoverIndexesCatalog(OperationContext* opCtx) { LOGV2_DEBUG(6686500, 2, "Recovering all sharding index catalog"); diff --git a/src/mongo/db/s/sharding_recovery_service.h b/src/mongo/db/s/sharding_recovery_service.h index 2913571d180..f7d24436742 100644 --- a/src/mongo/db/s/sharding_recovery_service.h +++ b/src/mongo/db/s/sharding_recovery_service.h @@ -52,17 +52,17 @@ public: static ShardingRecoveryService* get(OperationContext* opCtx); /** - * Acquires the collection critical section in the catch-up phase (i.e. blocking writes) for the - * specified namespace and reason. It works even if the namespace's current metadata are + * Acquires the recoverable critical section in the catch-up phase (i.e. blocking writes) for + * the specified namespace and reason. It works even if the namespace's current metadata are * UNKNOWN. * - * Entering into the Critical Section interrupts any ongoing filtering metadata refresh. + * Entering into the critical section interrupts any ongoing filtering metadata refresh. * - * It adds a doc to config.collectionCriticalSections with with writeConcern write concern. + * It adds a doc to `config.collectionCriticalSections` with with `writeConcern` write concern. * - * Do nothing if the collection critical section is taken for that nss and reason, and will - * invariant otherwise since it is the responsibility of the caller to ensure that only one - * thread is taking the critical section. + * Do nothing if the critical section is taken for that namespace and reason, and will invariant + * otherwise since it is the responsibility of the caller to ensure that only one thread is + * taking the critical section. */ void acquireRecoverableCriticalSectionBlockWrites( OperationContext* opCtx, @@ -73,26 +73,26 @@ public: /** * Advances the recoverable critical section from the catch-up phase (i.e. blocking writes) to - * the commit phase (i.e. blocking reads) for the specified nss and reason. The recoverable - * critical section must have been acquired first through - * 'acquireRecoverableCriticalSectionBlockWrites' function. + * the commit phase (i.e. blocking reads) for the specified namespace and reason. The + * recoverable critical section must have been acquired first through + * `acquireRecoverableCriticalSectionBlockWrites` function. * - * It updates a doc from config.collectionCriticalSections with writeConcern write concern. + * It updates a doc from `config.collectionCriticalSections` with `writeConcern` write concern. * - * Do nothing if the collection critical section is already taken in commit phase. + * Do nothing if the critical section is already taken in commit phase. */ void promoteRecoverableCriticalSectionToBlockAlsoReads(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& reason, const WriteConcernOptions& writeConcern); /** - * Releases the recoverable critical section for the given nss and reason. + * Releases the recoverable critical section for the given namespace and reason. * - * It removes a doc from config.collectionCriticalSections with writeConcern write concern. As - * part of the removal, the filtering information is cleared on secondary nodes. It is - * responsability of the caller to properly set the filtering information on the primary node. + * It removes a doc from `config.collectionCriticalSections` with `writeConcern` write concern. + * As part of the removal, the filtering information is cleared on secondary nodes. It is + * responsibility of the caller to properly set the filtering information on the primary node. * - * Do nothing if the collection critical section is not taken for that nss and reason. + * Do nothing if the critical section is not taken for that namespace and reason. * * Throw an invariant in case the collection critical section is already taken by another * operation with a different reason unless the flag 'throwIfReasonDiffers' is set to false. @@ -104,29 +104,30 @@ public: bool throwIfReasonDiffers = true); /** - * Recover all sharding related in memory states from disk. + * Recovers all sharding related in memory states from disk. */ void recoverStates(OperationContext* opCtx, const std::set<NamespaceString>& rollbackNamespaces); + /** + * Recovers critical sections and indexes from disk when either initial sync or startup recovery + * have completed. + */ + void onInitialDataAvailable(OperationContext* opCtx, + bool isMajorityDataAvailable) override final; + private: /** * This method is called when we have to mirror the state on disk of the recoverable critical - * section to memory (on startUp or on rollback). + * section to memory (on startup or on rollback). */ void recoverRecoverableCriticalSections(OperationContext* opCtx); /** - * Recover the index versions from disk into the CSR. + * Recovers the index versions from disk into the CSR. */ void recoverIndexesCatalog(OperationContext* opCtx); - void onInitialDataAvailable(OperationContext* opCtx, - bool isMajorityDataAvailable) override final { - recoverRecoverableCriticalSections(opCtx); - recoverIndexesCatalog(opCtx); - } - void onStartup(OperationContext* opCtx) override final {} void onShutdown() override final {} void onStepUpBegin(OperationContext* opCtx, long long term) override final {} diff --git a/src/mongo/db/s/sharding_recovery_service_test.cpp b/src/mongo/db/s/sharding_recovery_service_test.cpp new file mode 100644 index 00000000000..4825ce6ce9b --- /dev/null +++ b/src/mongo/db/s/sharding_recovery_service_test.cpp @@ -0,0 +1,911 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/database_name.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/s/collection_critical_section_document_gen.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/database_sharding_state.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/shard_server_op_observer.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/s/sharding_recovery_service.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +class ShardingRecoveryServiceTest : public ShardServerTestFixture { +public: + inline static const NamespaceString collNss{"TestDB", "TestCollection"}; + inline static const BSONObj collOpReason = + BSON("Dummy operation on collection" << collNss.ns()); + + inline static const NamespaceString dbName{"TestDB"}; + inline static const BSONObj dbOpReason = BSON("Dummy operation on database" << dbName.ns()); + + inline static const BSONObj differentOpReason = BSON("Yet another dummy operation" << true); + + void setUp() override { + ShardServerTestFixture::setUp(); + _opCtx = operationContext(); + } + + OperationContext* opCtx() { + return _opCtx; + } + + OpObserver& opObserver() { + return _opObserver; + } + + boost::optional<CollectionCriticalSectionDocument> readCriticalSectionDocument( + const NamespaceString& nss, const BSONObj& reason) { + FindCommandRequest findOp(NamespaceString::kCollectionCriticalSectionsNamespace); + findOp.setFilter(BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString())); + + DBDirectClient dbClient(opCtx()); + auto cursor = dbClient.find(std::move(findOp)); + + if (!cursor->more()) { + return boost::none; + } + + auto bsonObj = cursor->next(); + auto doc = CollectionCriticalSectionDocument::parse( + IDLParserContext("AcquireRecoverableCSBW"), bsonObj); + + // The document exists, so the reason must match. + ASSERT(!doc.getReason().woCompare(reason)); + + return doc; + } + + void writeReadCriticalSectionDocument(const NamespaceString& nss, + const BSONObj& reason, + bool blockReads) { + auto doc = readCriticalSectionDocument(nss, reason); + if (!doc) { + doc = CollectionCriticalSectionDocument(nss, reason, blockReads); + + DBDirectClient dbClient(opCtx()); + dbClient.insert(NamespaceString::kCollectionCriticalSectionsNamespace.ns(), + doc->toBSON()); + + return; + } + + // The document exists, so the blockReads should change. + ASSERT_NE(doc->getBlockReads(), blockReads); + + DBDirectClient dbClient(opCtx()); + dbClient.update(NamespaceString::kCollectionCriticalSectionsNamespace.ns(), + BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()), + BSON("$set" << BSON(CollectionCriticalSectionDocument::kBlockReadsFieldName + << blockReads))); + } + + void deleteReadCriticalSectionDocument(const NamespaceString& nss, const BSONObj& reason) { + // The document must exist. + ASSERT(readCriticalSectionDocument(nss, reason)); + + DBDirectClient dbClient(opCtx()); + dbClient.remove(NamespaceString::kCollectionCriticalSectionsNamespace.ns(), + BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()), + false /* removeMany */); + } + + void assertCriticalSectionCatchUpEnteredInMemory(const NamespaceString& nss) { + if (nsIsDbOnly(nss.ns())) { + AutoGetDb db(opCtx(), nss.dbName(), MODE_IS); + auto dss = + DatabaseShardingState::acquire(opCtx(), nss.dbName(), DSSAcquisitionMode::kShared); + ASSERT(dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kWrite)); + ASSERT(!dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead)); + } else { + AutoGetCollection coll(opCtx(), nss, MODE_IS); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx(), nss, CSRAcquisitionMode::kShared); + ASSERT( + csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kWrite)); + ASSERT( + !csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kRead)); + } + } + + void assertCriticalSectionCommitEnteredInMemory(const NamespaceString& nss) { + if (nsIsDbOnly(nss.ns())) { + AutoGetDb db(opCtx(), nss.dbName(), MODE_IS); + auto dss = + DatabaseShardingState::acquire(opCtx(), nss.dbName(), DSSAcquisitionMode::kShared); + ASSERT(dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kWrite)); + ASSERT(dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead)); + } else { + AutoGetCollection coll(opCtx(), nss, MODE_IS); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx(), nss, CSRAcquisitionMode::kShared); + ASSERT( + csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kWrite)); + ASSERT(csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kRead)); + } + } + + void assertCriticalSectionLeftInMemory(const NamespaceString& nss) { + if (nsIsDbOnly(nss.ns())) { + AutoGetDb db(opCtx(), nss.dbName(), MODE_IS); + auto dss = + DatabaseShardingState::acquire(opCtx(), nss.dbName(), DSSAcquisitionMode::kShared); + ASSERT(!dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kWrite)); + ASSERT(!dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead)); + } else { + AutoGetCollection coll(opCtx(), nss, MODE_IS); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx(), nss, CSRAcquisitionMode::kShared); + ASSERT( + !csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kWrite)); + ASSERT( + !csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kRead)); + } + } + + void assertCriticalSectionCatchUpEnteredOnDisk(const NamespaceString& nss, + const BSONObj& reason) { + auto doc = readCriticalSectionDocument(nss, reason); + ASSERT(doc); + ASSERT(!doc->getBlockReads()); + } + + void assertCriticalSectionCommitEnteredOnDisk(const NamespaceString& nss, + const BSONObj& reason) { + auto doc = readCriticalSectionDocument(nss, reason); + ASSERT(doc); + ASSERT(doc->getBlockReads()); + } + + void assertCriticalSectionLeftOnDisk(const NamespaceString& nss, const BSONObj& reason) { + ASSERT(!readCriticalSectionDocument(nss, reason)); + } + +private: + OperationContext* _opCtx; + ShardServerOpObserver _opObserver; +}; + +class ShardingRecoveryServiceTestOnPrimary : public ShardingRecoveryServiceTest { +public: + void setUp() override { + ShardingRecoveryServiceTest::setUp(); + + auto replCoord = repl::ReplicationCoordinator::get(operationContext()); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + } +}; + +class ShardingRecoveryServiceTestOnSecondary : public ShardingRecoveryServiceTest { +public: + void setUp() override { + ShardingRecoveryServiceTest::setUp(); + + auto replCoord = repl::ReplicationCoordinator::get(operationContext()); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_SECONDARY)); + + // Create and lock the `config.collection_critical_sections` collection to allow + // notifications on the operation observer. + OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( + opCtx()); + ASSERT_OK(createCollection( + opCtx(), CreateCommand(NamespaceString::kCollectionCriticalSectionsNamespace))); + _criticalSectionColl.emplace( + opCtx(), NamespaceString::kCollectionCriticalSectionsNamespace, MODE_IX); + } + + void tearDown() override { + _criticalSectionColl = boost::none; + + ShardingRecoveryServiceTest::tearDown(); + } + + const CollectionPtr& criticalSectionColl() { + return **_criticalSectionColl; + } + +private: + boost::optional<AutoGetCollection> _criticalSectionColl; +}; + +class ShardingRecoveryServiceTestonInitialData : public ShardingRecoveryServiceTest { +public: + void setUp() override { + ShardingRecoveryServiceTest::setUp(); + + // Create the `config.collection_critical_sections` collection to allow notifications on the + // operation observer. + OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( + opCtx()); + ASSERT_OK(createCollection( + opCtx(), CreateCommand(NamespaceString::kCollectionCriticalSectionsNamespace))); + } +}; + +using ShardingRecoveryServiceTestAfterRollback = ShardingRecoveryServiceTestonInitialData; + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +TEST_F(ShardingRecoveryServiceTestOnPrimary, BlockAndUnblockOperationsOnDatabase) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(dbName); + + // Check that the document has been appropriately saved. + assertCriticalSectionCatchUpEnteredOnDisk(dbName, dbOpReason); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(dbName); + + // Check that the document has been appropriately updated. + assertCriticalSectionCommitEnteredOnDisk(dbName, dbOpReason); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(dbName); + + // Check that the document has been deleted. + assertCriticalSectionLeftOnDisk(dbName, dbOpReason); +} + +TEST_F(ShardingRecoveryServiceTestOnPrimary, BlockAndUnblockOperationsTwiceOnDatabase) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(dbName); + + // Check that the document has been appropriately saved. + assertCriticalSectionCatchUpEnteredOnDisk(dbName, dbOpReason); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(dbName); + + // Check that the document has been appropriately updated. + assertCriticalSectionCommitEnteredOnDisk(dbName, dbOpReason); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(dbName); + + // Check that the document has been deleted. + assertCriticalSectionLeftOnDisk(dbName, dbOpReason); +} + +DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary, + FailBlockingWritesTwiceOnDatabaseWithDifferentReasons, + "invariant") { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), dbName, differentOpReason, ShardingCatalogClient::kLocalWriteConcern); +} + +DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary, + FailBlockingReadsOnDatabaseWithDifferentReasons, + "invariant") { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), dbName, differentOpReason, ShardingCatalogClient::kLocalWriteConcern); +} + +DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary, + FailUnblockingOperationsOnDatabaseWithDifferentReasons, + "invariant") { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection( + opCtx(), dbName, differentOpReason, ShardingCatalogClient::kLocalWriteConcern); +} + +TEST_F(ShardingRecoveryServiceTestOnPrimary, BlockAndUnblockOperationsOnCollection) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(collNss); + + // Check that the document has been appropriately saved. + assertCriticalSectionCatchUpEnteredOnDisk(collNss, collOpReason); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(collNss); + + // Check that the document has been appropriately updated. + assertCriticalSectionCommitEnteredOnDisk(collNss, collOpReason); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(collNss); + + // Check that the document has been deleted. + assertCriticalSectionLeftOnDisk(collNss, collOpReason); +} + +TEST_F(ShardingRecoveryServiceTestOnPrimary, BlockAndUnblockOperationsTwiceOnCollection) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(collNss); + + // Check that the document has been appropriately saved. + assertCriticalSectionCatchUpEnteredOnDisk(collNss, collOpReason); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(collNss); + + // Check that the document has been appropriately updated. + assertCriticalSectionCommitEnteredOnDisk(collNss, collOpReason); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(collNss); + + // Check that the document has been deleted. + assertCriticalSectionLeftOnDisk(collNss, collOpReason); +} + +DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary, + FailBlockingWritesTwiceOnCollectionWithDifferentReasons, + "invariant") { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), collNss, differentOpReason, ShardingCatalogClient::kLocalWriteConcern); +} + +DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary, + FailBlockingReadsOnCollectionWithDifferentReasons, + "invariant") { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), collNss, differentOpReason, ShardingCatalogClient::kLocalWriteConcern); +} + +DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary, + FailUnblockingOperationsOnCollectionWithDifferentReasons, + "invariant") { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection( + opCtx(), collNss, differentOpReason, ShardingCatalogClient::kLocalWriteConcern); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +TEST_F(ShardingRecoveryServiceTestOnSecondary, BlockAndUnblockOperationsOnDatabase) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + // Simulate an insert notification on the `config.collection_critical_sections` collection, that + // is what a secondary node would receive when the primary node enters the catch-up phase of the + // critical section. + auto doc = CollectionCriticalSectionDocument(dbName, dbOpReason, false); + { + std::vector<InsertStatement> inserts; + inserts.emplace_back(doc.toBSON()); + + WriteUnitOfWork wuow(opCtx()); + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + opObserver().onInserts( + opCtx(), criticalSectionColl(), inserts.begin(), inserts.end(), false); + wuow.commit(); + } + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(dbName); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + // Simulate an update notification on the `config.collection_critical_sections` collection, that + // is what a secondary node would receive when the primary node enters the commit phase of the + // critical section. + doc.setBlockReads(true); // NOTE: This has no semantic effect as the critical section is + // promoted to any update event on the document! + // TODO (SERVER-71056): React to `blockReads` changes only. + { + CollectionUpdateArgs updateArgs; + updateArgs.updatedDoc = doc.toBSON(); + OplogUpdateEntryArgs update(&updateArgs, criticalSectionColl()); + + WriteUnitOfWork wuow(opCtx()); + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + opObserver().onUpdate(opCtx(), update); + wuow.commit(); + } + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(dbName); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + // Simulate an delete notification on the `config.collection_critical_sections` collection, that + // is what a secondary node would receive when the primary node leaves the critical section. + { + WriteUnitOfWork wuow(opCtx()); + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + opObserver().aboutToDelete( + opCtx(), criticalSectionColl()->ns(), criticalSectionColl()->uuid(), doc.toBSON()); + opObserver().onDelete(opCtx(), + criticalSectionColl()->ns(), + criticalSectionColl()->uuid(), + kUninitializedStmtId, + {}); + wuow.commit(); + } + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(dbName); +} + +TEST_F(ShardingRecoveryServiceTestOnSecondary, BlockAndUnblockOperationsOnCollection) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + // Simulate an insert notification on the `config.collection_critical_sections` collection, that + // is what a secondary node would receive when the primary node enters the catch-up phase of the + // critical section. + auto doc = CollectionCriticalSectionDocument(collNss, collOpReason, false); + { + std::vector<InsertStatement> inserts; + inserts.emplace_back(doc.toBSON()); + + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + opObserver().onInserts( + opCtx(), criticalSectionColl(), inserts.begin(), inserts.end(), false); + wuow.commit(); + } + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(collNss); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + // Simulate an update notification on the `config.collection_critical_sections` collection, that + // is what a secondary node would receive when the primary node enters the commit phase of the + // critical section. + doc.setBlockReads(true); // NOTE: This has no semantic effect as the critical section is + // promoted to any update event on the document! + // TODO (SERVER-71056): React to `blockReads` changes only. + { + CollectionUpdateArgs updateArgs; + updateArgs.updatedDoc = doc.toBSON(); + OplogUpdateEntryArgs update(&updateArgs, criticalSectionColl()); + + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + opObserver().onUpdate(opCtx(), update); + wuow.commit(); + } + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(collNss); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + // Simulate an delete notification on the `config.collection_critical_sections` collection, that + // is what a secondary node would receive when the primary node leaves the critical section. + { + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + opObserver().aboutToDelete( + opCtx(), criticalSectionColl()->ns(), criticalSectionColl()->uuid(), doc.toBSON()); + opObserver().onDelete(opCtx(), + criticalSectionColl()->ns(), + criticalSectionColl()->uuid(), + kUninitializedStmtId, + {}); + wuow.commit(); + } + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(collNss); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +TEST_F(ShardingRecoveryServiceTestonInitialData, BlockAndUnblockOperationsOnDatabase) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + // Insert a document in the `config.collection_critical_sections` collection to simulate + // entering the catch-up phase of the critical section for a database, then react to an + // hypothetical availability of initial data. + { + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + writeReadCriticalSectionDocument(dbName, dbOpReason, false /* blockReads */); + } + ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(dbName); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + // Update the document in the `config.collection_critical_sections` collection to simulate + // entering the commit phase of the critical section for the database, then react to an + // hypothetical availability of initial data. + { + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + writeReadCriticalSectionDocument(dbName, dbOpReason, true /* blockReads */); + } + ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(dbName); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + // Delete the document in the `config.collection_critical_sections` collection to simulate + // leaving the critical section for the database, then react to an hypothetical availability of + // initial data. + { + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + deleteReadCriticalSectionDocument(dbName, dbOpReason); + } + ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(dbName); +} + +TEST_F(ShardingRecoveryServiceTestonInitialData, BlockAndUnblockOperationsOnCollection) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + // Insert a document in the `config.collection_critical_sections` collection to simulate + // entering the catch-up phase of the critical section for a collection, then react to an + // hypothetical availability of initial data. + { + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + writeReadCriticalSectionDocument(collNss, collOpReason, false /* blockReads */); + } + ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(collNss); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + // Update the document in the `config.collection_critical_sections` collection to simulate + // entering the commit phase of the critical section for the collection, then react to an + // hypothetical availability of initial data. + { + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + writeReadCriticalSectionDocument(collNss, collOpReason, true /* blockReads */); + } + ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(collNss); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + // Delete the document in the `config.collection_critical_sections` collection to simulate + // leaving the critical section for the collection, then react to an hypothetical availability + // of initial data. + { + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + deleteReadCriticalSectionDocument(collNss, collOpReason); + } + ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(collNss); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +TEST_F(ShardingRecoveryServiceTestAfterRollback, BlockAndUnblockOperationsOnDatabase) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + // Insert a document in the `config.collection_critical_sections` collection to simulate + // entering the catch-up phase of the critical section for a database, then react to an + // hypothetical replication rollback. + { + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + writeReadCriticalSectionDocument(dbName, dbOpReason, false /* blockReads */); + } + ShardingRecoveryService::get(opCtx())->recoverStates( + opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace}); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(dbName); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + // Update the document in the `config.collection_critical_sections` collection to simulate + // entering the commit phase of the critical section for the database, then react to an + // hypothetical replication rollback. + { + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + writeReadCriticalSectionDocument(dbName, dbOpReason, true /* blockReads */); + } + ShardingRecoveryService::get(opCtx())->recoverStates( + opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace}); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(dbName); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + // Delete the document in the `config.collection_critical_sections` collection to simulate + // leaving the critical section for the database, then react to an hypothetical replication + // rollback. + { + AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS); + deleteReadCriticalSectionDocument(dbName, dbOpReason); + } + ShardingRecoveryService::get(opCtx())->recoverStates( + opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace}); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(dbName); +} + +TEST_F(ShardingRecoveryServiceTestAfterRollback, BlockAndUnblockOperationsOnCollection) { + /////////////////////////////////////////// + // Block write operations (catch-up phase) + /////////////////////////////////////////// + + // Insert a document in the `config.collection_critical_sections` collection to simulate + // entering the catch-up phase of the critical section for a collection, then react to an + // hypothetical replication rollback. + { + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + writeReadCriticalSectionDocument(collNss, collOpReason, false /* blockReads */); + } + ShardingRecoveryService::get(opCtx())->recoverStates( + opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace}); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCatchUpEnteredInMemory(collNss); + + ////////////////////////////////////////////// + // Block read/write operations (commit phase) + ////////////////////////////////////////////// + + // Update the document in the `config.collection_critical_sections` collection to simulate + // entering the commit phase of the critical section for the collection, then react to an + // hypothetical replication rollback. + { + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + writeReadCriticalSectionDocument(collNss, collOpReason, true /* blockReads */); + } + ShardingRecoveryService::get(opCtx())->recoverStates( + opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace}); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionCommitEnteredInMemory(collNss); + + ///////////////////////////////// + // Unblock read/write operations + ///////////////////////////////// + + // Delete the document in the `config.collection_critical_sections` collection to simulate + // leaving the critical section for the collection, then react to an hypothetical replication + // rollback. + { + AutoGetCollection coll(opCtx(), collNss, MODE_IS); + deleteReadCriticalSectionDocument(collNss, collOpReason); + } + ShardingRecoveryService::get(opCtx())->recoverStates( + opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace}); + + // Check that the in-memory status has been appropriately updated. + assertCriticalSectionLeftInMemory(collNss); +} + +} // namespace mongo |