summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAntonio Fuschetto <antonio.fuschetto@mongodb.com>2022-11-04 11:25:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-04 11:56:30 +0000
commitd25aa2a766a5969eb690f4d97d0c52a6dc3dcf2f (patch)
tree97967092bda17bf9a68eb01d0649327b1715c7af /src
parent6800a766528950978fe4f67f88350c2da152c9dd (diff)
downloadmongo-d25aa2a766a5969eb690f4d97d0c52a6dc3dcf2f.tar.gz
SERVER-70660 Support for recoverable critical section on databases
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/collection_critical_section_document.idl32
-rw-r--r--src/mongo/db/s/database_sharding_state.cpp22
-rw-r--r--src/mongo/db/s/database_sharding_state.h6
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp114
-rw-r--r--src/mongo/db/s/sharding_recovery_service.cpp95
-rw-r--r--src/mongo/db/s/sharding_recovery_service.h53
-rw-r--r--src/mongo/db/s/sharding_recovery_service_test.cpp911
8 files changed, 1124 insertions, 111 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 3248c1b296d..2876d1ee99c 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -719,6 +719,7 @@ env.CppUnitTest(
'sharding_initialization_mongod_test.cpp',
'sharding_initialization_op_observer_test.cpp',
'sharding_logging_test.cpp',
+ 'sharding_recovery_service_test.cpp',
'split_chunk_request_test.cpp',
'split_vector_test.cpp',
'start_chunk_clone_request_test.cpp',
@@ -738,6 +739,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/catalog_test_fixture',
'$BUILD_DIR/mongo/db/catalog/database_holder',
+ '$BUILD_DIR/mongo/db/commands/create_command',
'$BUILD_DIR/mongo/db/commands/list_collections_filter',
'$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util',
'$BUILD_DIR/mongo/db/keys_collection_client_direct',
diff --git a/src/mongo/db/s/collection_critical_section_document.idl b/src/mongo/db/s/collection_critical_section_document.idl
index f05055dbc36..fc5ad608790 100644
--- a/src/mongo/db/s/collection_critical_section_document.idl
+++ b/src/mongo/db/s/collection_critical_section_document.idl
@@ -26,10 +26,9 @@
# it in the license file.
#
-# This file defines the format of documents stored in config.collectionCriticalSections.
-# Each document is used to represent that a collection is under an operation that holds
-# the collection critical section.
-
+# This file defines the format of documents stored in the `config.collectionCriticalSections`
+# collection. Each document is used to represent that a database or a collection is under an
+# operation that entered the critical section.
global:
cpp_namespace: "mongo"
@@ -39,30 +38,27 @@ imports:
structs:
collectionCriticalSectionDocument:
- description: "Represents that a collection is under an operation that holds the collection
- critical section."
+ description: "Represents that a database or a collection is under an operation that entered
+ the critical section."
generate_comparison_operators: false
strict: false
fields:
_id:
type: namespacestring
- description: "The namespace of the collection that is under the collection critical
- section."
+ description: "The name of the database or the namespace of the collection that is
+ under the critical section."
cpp_name: nss
reason:
type: object
- description: "An identifier of the operation that is holding the critical section.
-
- Only used for diagnostic purposes."
+ description: "An identifier of the operation that entered the critical section.
+ Only used for diagnostic purposes."
blockReads:
type: bool
- description: "It states whether this critical section should block reads
- (CS in commit phase) or not (CS in catch up phase)."
+ description: "It states whether this critical section should block reads (CS in
+ commit phase) or not (CS in catch up phase)."
additionalInfo:
type: object
- description: "Additional information associated to the operation that is holding
- the critical section.
-
- Only used for diagnostic purposes."
+ description: "Additional information associated to the operation that entered the
+ critical section.
+ Only used for diagnostic purposes."
optional: true
-
diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp
index 72dee5d07c0..f3c957e97a0 100644
--- a/src/mongo/db/s/database_sharding_state.cpp
+++ b/src/mongo/db/s/database_sharding_state.cpp
@@ -71,6 +71,16 @@ public:
return it->second.get();
}
+ std::vector<DatabaseName> getDatabaseNames() {
+ stdx::lock_guard lg(_mutex);
+ std::vector<DatabaseName> result;
+ result.reserve(_databases.size());
+ for (const auto& [dbName, _] : _databases) {
+ result.emplace_back(dbName);
+ }
+ return result;
+ }
+
private:
Mutex _mutex = MONGO_MAKE_LATCH("DatabaseShardingStateMap::_mutex");
@@ -122,17 +132,19 @@ DatabaseShardingState::ScopedDatabaseShardingState DatabaseShardingState::acquir
return ScopedDatabaseShardingState(std::move(lock), dssAndLock->dss.get());
}
+std::vector<DatabaseName> DatabaseShardingState::getDatabaseNames(OperationContext* opCtx) {
+ auto& databasesMap = DatabaseShardingStateMap::get(opCtx->getServiceContext());
+ return databasesMap.getDatabaseNames();
+}
+
void DatabaseShardingState::enterCriticalSectionCatchUpPhase(OperationContext* opCtx,
const BSONObj& reason) {
- invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X));
_critSec.enterCriticalSectionCatchUpPhase(reason);
-
cancelDbMetadataRefresh();
}
void DatabaseShardingState::enterCriticalSectionCommitPhase(OperationContext* opCtx,
const BSONObj& reason) {
- invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X));
_critSec.enterCriticalSectionCommitPhase(reason);
}
@@ -140,6 +152,10 @@ void DatabaseShardingState::exitCriticalSection(OperationContext* opCtx, const B
_critSec.exitCriticalSection(reason);
}
+void DatabaseShardingState::exitCriticalSectionNoChecks(OperationContext* opCtx) {
+ _critSec.exitCriticalSectionNoChecks();
+}
+
void DatabaseShardingState::setMovePrimarySourceManager(OperationContext* opCtx,
MovePrimarySourceManager* sourceMgr) {
invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X));
diff --git a/src/mongo/db/s/database_sharding_state.h b/src/mongo/db/s/database_sharding_state.h
index 70e788b746c..344635dd0e0 100644
--- a/src/mongo/db/s/database_sharding_state.h
+++ b/src/mongo/db/s/database_sharding_state.h
@@ -83,6 +83,11 @@ public:
DSSAcquisitionMode mode);
/**
+ * Returns the names of the databases that have a DatabaseShardingState.
+ */
+ static std::vector<DatabaseName> getDatabaseNames(OperationContext* opCtx);
+
+ /**
* Returns the name of the database related to the current sharding state.
*/
const DatabaseName& getDbName() const {
@@ -96,6 +101,7 @@ public:
void enterCriticalSectionCatchUpPhase(OperationContext* opCtx, const BSONObj& reason);
void enterCriticalSectionCommitPhase(OperationContext* opCtx, const BSONObj& reason);
void exitCriticalSection(OperationContext* opCtx, const BSONObj& reason);
+ void exitCriticalSectionNoChecks(OperationContext* opCtx);
auto getCriticalSectionSignal(ShardingMigrationCriticalSection::Operation op) const {
return _critSec.getSignal(op);
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index ae059d19512..352285de2ed 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -291,10 +291,21 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx,
!recoverable_critical_section_util::inRecoveryMode(opCtx)) {
const auto collCSDoc = CollectionCriticalSectionDocument::parse(
IDLParserContext("ShardServerOpObserver"), insertedDoc);
- opCtx->recoveryUnit()->onCommit(
- [opCtx,
- insertedNss = collCSDoc.getNss(),
- reason = collCSDoc.getReason().getOwned()](boost::optional<Timestamp>) {
+ opCtx->recoveryUnit()->onCommit([opCtx,
+ insertedNss = collCSDoc.getNss(),
+ reason = collCSDoc.getReason().getOwned()](
+ boost::optional<Timestamp>) {
+ if (nsIsDbOnly(insertedNss.ns())) {
+ boost::optional<AutoGetDb> lockDbIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ lockDbIfNotPrimary.emplace(opCtx, insertedNss.dbName(), MODE_IX);
+ }
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, insertedNss.dbName(), DSSAcquisitionMode::kExclusive);
+ scopedDss->enterCriticalSectionCatchUpPhase(opCtx, reason);
+ } else {
boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
if (!isStandaloneOrPrimary(opCtx)) {
lockCollectionIfNotPrimary.emplace(
@@ -309,7 +320,8 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx,
auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
opCtx, insertedNss, CSRAcquisitionMode::kExclusive);
scopedCsr->enterCriticalSectionCatchUpPhase(reason);
- });
+ }
+ });
}
if (metadata && metadata->isSharded()) {
@@ -452,20 +464,32 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
opCtx->recoveryUnit()->onCommit(
[opCtx, updatedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()](
boost::optional<Timestamp>) {
- boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
- if (!isStandaloneOrPrimary(opCtx)) {
- lockCollectionIfNotPrimary.emplace(
- opCtx,
- updatedNss,
- MODE_IX,
- AutoGetCollection::Options{}.viewMode(
- auto_get_collection::ViewMode::kViewsPermitted));
- }
+ if (nsIsDbOnly(updatedNss.ns())) {
+ boost::optional<AutoGetDb> lockDbIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ lockDbIfNotPrimary.emplace(opCtx, updatedNss.dbName(), MODE_IX);
+ }
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, updatedNss.dbName(), DSSAcquisitionMode::kExclusive);
+ scopedDss->enterCriticalSectionCommitPhase(opCtx, reason);
+ } else {
+ boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ lockCollectionIfNotPrimary.emplace(
+ opCtx,
+ updatedNss,
+ MODE_IX,
+ AutoGetCollection::Options{}.viewMode(
+ auto_get_collection::ViewMode::kViewsPermitted));
+ }
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- CollectionShardingRuntime::assertCollectionLockedAndAcquire(
- opCtx, updatedNss, CSRAcquisitionMode::kExclusive)
- ->enterCriticalSectionCommitPhase(reason);
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, updatedNss, CSRAcquisitionMode::kExclusive);
+ scopedCsr->enterCriticalSectionCommitPhase(reason);
+ }
});
}
@@ -594,26 +618,46 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx,
opCtx->recoveryUnit()->onCommit(
[opCtx, deletedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()](
boost::optional<Timestamp>) {
- boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
- if (!isStandaloneOrPrimary(opCtx)) {
- lockCollectionIfNotPrimary.emplace(
- opCtx,
- deletedNss,
- MODE_IX,
- AutoGetCollection::Options{}.viewMode(
- auto_get_collection::ViewMode::kViewsPermitted));
- }
+ if (nsIsDbOnly(deletedNss.ns())) {
+ boost::optional<AutoGetDb> lockDbIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ lockDbIfNotPrimary.emplace(opCtx, deletedNss.dbName(), MODE_IX);
+ }
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
- opCtx, deletedNss, CSRAcquisitionMode::kExclusive);
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, deletedNss.dbName(), DSSAcquisitionMode::kExclusive);
- // Secondary nodes must clear the filtering metadata before releasing the
- // in-memory critical section
- if (!isStandaloneOrPrimary(opCtx))
- scopedCsr->clearFilteringMetadata(opCtx);
+ // Secondary nodes must clear the database metadata before releasing the
+ // in-memory critical section.
+ if (!isStandaloneOrPrimary(opCtx)) {
+ DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, deletedNss.dbName());
+ }
- scopedCsr->exitCriticalSection(reason);
+ scopedDss->exitCriticalSection(opCtx, reason);
+ } else {
+ boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ lockCollectionIfNotPrimary.emplace(
+ opCtx,
+ deletedNss,
+ MODE_IX,
+ AutoGetCollection::Options{}.viewMode(
+ auto_get_collection::ViewMode::kViewsPermitted));
+ }
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, deletedNss, CSRAcquisitionMode::kExclusive);
+
+ // Secondary nodes must clear the collection filtering metadata before releasing
+ // the in-memory critical section.
+ if (!isStandaloneOrPrimary(opCtx)) {
+ scopedCsr->clearFilteringMetadata(opCtx);
+ }
+
+ scopedCsr->exitCriticalSection(reason);
+ }
});
}
diff --git a/src/mongo/db/s/sharding_recovery_service.cpp b/src/mongo/db/s/sharding_recovery_service.cpp
index 02dab8f3773..10505a5997b 100644
--- a/src/mongo/db/s/sharding_recovery_service.cpp
+++ b/src/mongo/db/s/sharding_recovery_service.cpp
@@ -27,7 +27,6 @@
* it in the license file.
*/
-
#include <set>
#include "mongo/platform/basic.h"
@@ -42,6 +41,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/collection_critical_section_document_gen.h"
#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/sharding_migration_critical_section.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_collection.h"
@@ -50,7 +50,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
namespace mongo {
namespace recoverable_critical_section_util {
@@ -154,9 +153,15 @@ void ShardingRecoveryService::acquireRecoverableCriticalSectionBlockWrites(
{
Lock::GlobalLock lk(opCtx, MODE_IX);
- // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
- // construct cCollLock.
- AutoGetCollection cCollLock(opCtx, nss, MODE_S);
+ boost::optional<AutoGetDb> dbLock;
+ boost::optional<AutoGetCollection> collLock;
+ if (nsIsDbOnly(nss.ns())) {
+ dbLock.emplace(opCtx, nss.dbName(), MODE_S);
+ } else {
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct collLock.
+ collLock.emplace(opCtx, nss, MODE_S);
+ }
DBDirectClient dbClient(opCtx);
FindCommandRequest findRequest{NamespaceString::kCollectionCriticalSectionsNamespace};
@@ -172,10 +177,9 @@ void ShardingRecoveryService::acquireRecoverableCriticalSectionBlockWrites(
invariant(collCSDoc.getReason().woCompare(reason) == 0,
str::stream()
- << "Trying to acquire a critical section blocking writes for namespace "
- << nss << " and reason " << reason
- << " but it is already taken by another operation with different reason "
- << collCSDoc.getReason());
+ << "Trying to acquire a critical section blocking writes for namespace "
+ << nss << " and reason " << reason << " but it is already taken by "
+ << "another operation with different reason " << collCSDoc.getReason());
LOGV2_DEBUG(
5656601,
@@ -242,9 +246,15 @@ void ShardingRecoveryService::promoteRecoverableCriticalSectionToBlockAlsoReads(
invariant(!opCtx->lockState()->isLocked());
{
- // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
- // construct cCollLock.
- AutoGetCollection cCollLock(opCtx, nss, MODE_X);
+ boost::optional<AutoGetDb> dbLock;
+ boost::optional<AutoGetCollection> collLock;
+ if (nsIsDbOnly(nss.ns())) {
+ dbLock.emplace(opCtx, nss.dbName(), MODE_X);
+ } else {
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct collLock.
+ collLock.emplace(opCtx, nss, MODE_X);
+ }
DBDirectClient dbClient(opCtx);
FindCommandRequest findRequest{NamespaceString::kCollectionCriticalSectionsNamespace};
@@ -345,9 +355,15 @@ void ShardingRecoveryService::releaseRecoverableCriticalSection(
invariant(!opCtx->lockState()->isLocked());
{
- // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
- // construct cCollLock.
- AutoGetCollection collLock(opCtx, nss, MODE_X);
+ boost::optional<AutoGetDb> dbLock;
+ boost::optional<AutoGetCollection> collLock;
+ if (nsIsDbOnly(nss.ns())) {
+ dbLock.emplace(opCtx, nss.dbName(), MODE_X);
+ } else {
+ // TODO SERVER-68084 add the AutoGetCollectionViewMode::kViewsPermitted parameter to
+ // construct collLock.
+ collLock.emplace(opCtx, nss, MODE_X);
+ }
DBDirectClient dbClient(opCtx);
@@ -442,21 +458,26 @@ void ShardingRecoveryService::recoverRecoverableCriticalSections(OperationContex
LOGV2_DEBUG(5604000, 2, "Recovering all recoverable critical sections");
// Release all in-memory critical sections
- const auto collectionNames = CollectionShardingState::getCollectionNames(opCtx);
- for (const auto& collName : collectionNames) {
+ for (const auto& nss : CollectionShardingState::getCollectionNames(opCtx)) {
try {
- AutoGetCollection collLock(opCtx, collName, MODE_X);
- CollectionShardingRuntime::assertCollectionLockedAndAcquire(
- opCtx, collName, CSRAcquisitionMode::kExclusive)
- ->exitCriticalSectionNoChecks();
+ AutoGetCollection collLock(opCtx, nss, MODE_X);
+ auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, nss, CSRAcquisitionMode::kExclusive);
+ scopedCsr->exitCriticalSectionNoChecks();
} catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
LOGV2_DEBUG(6050800,
2,
"Skipping attempting to exit critical section for view in "
"recoverRecoverableCriticalSections",
- "namespace"_attr = collName);
+ "namespace"_attr = nss);
}
}
+ for (const auto& dbName : DatabaseShardingState::getDatabaseNames(opCtx)) {
+ AutoGetDb dbLock(opCtx, dbName, MODE_X);
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, dbName, DSSAcquisitionMode::kExclusive);
+ scopedDss->exitCriticalSectionNoChecks(opCtx);
+ }
// Map the critical sections that are on disk to memory
PersistentTaskStore<CollectionCriticalSectionDocument> store(
@@ -464,13 +485,23 @@ void ShardingRecoveryService::recoverRecoverableCriticalSections(OperationContex
store.forEach(opCtx, BSONObj{}, [&opCtx](const CollectionCriticalSectionDocument& doc) {
const auto& nss = doc.getNss();
{
- AutoGetCollection collLock(opCtx, nss, MODE_X);
- auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
- opCtx, nss, CSRAcquisitionMode::kExclusive);
-
- scopedCsr->enterCriticalSectionCatchUpPhase(doc.getReason());
- if (doc.getBlockReads())
- scopedCsr->enterCriticalSectionCommitPhase(doc.getReason());
+ if (nsIsDbOnly(nss.ns())) {
+ AutoGetDb dbLock(opCtx, nss.dbName(), MODE_X);
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, nss.dbName(), DSSAcquisitionMode::kExclusive);
+ scopedDss->enterCriticalSectionCatchUpPhase(opCtx, doc.getReason());
+ if (doc.getBlockReads()) {
+ scopedDss->enterCriticalSectionCommitPhase(opCtx, doc.getReason());
+ }
+ } else {
+ AutoGetCollection collLock(opCtx, nss, MODE_X);
+ auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx, nss, CSRAcquisitionMode::kExclusive);
+ scopedCsr->enterCriticalSectionCatchUpPhase(doc.getReason());
+ if (doc.getBlockReads()) {
+ scopedCsr->enterCriticalSectionCommitPhase(doc.getReason());
+ }
+ }
return true;
}
@@ -494,6 +525,12 @@ void ShardingRecoveryService::recoverStates(OperationContext* opCtx,
}
}
+void ShardingRecoveryService::onInitialDataAvailable(OperationContext* opCtx,
+ bool isMajorityDataAvailable) {
+ recoverRecoverableCriticalSections(opCtx);
+ recoverIndexesCatalog(opCtx);
+}
+
void ShardingRecoveryService::recoverIndexesCatalog(OperationContext* opCtx) {
LOGV2_DEBUG(6686500, 2, "Recovering all sharding index catalog");
diff --git a/src/mongo/db/s/sharding_recovery_service.h b/src/mongo/db/s/sharding_recovery_service.h
index 2913571d180..f7d24436742 100644
--- a/src/mongo/db/s/sharding_recovery_service.h
+++ b/src/mongo/db/s/sharding_recovery_service.h
@@ -52,17 +52,17 @@ public:
static ShardingRecoveryService* get(OperationContext* opCtx);
/**
- * Acquires the collection critical section in the catch-up phase (i.e. blocking writes) for the
- * specified namespace and reason. It works even if the namespace's current metadata are
+ * Acquires the recoverable critical section in the catch-up phase (i.e. blocking writes) for
+ * the specified namespace and reason. It works even if the namespace's current metadata are
* UNKNOWN.
*
- * Entering into the Critical Section interrupts any ongoing filtering metadata refresh.
+ * Entering into the critical section interrupts any ongoing filtering metadata refresh.
*
- * It adds a doc to config.collectionCriticalSections with with writeConcern write concern.
+ * It adds a doc to `config.collectionCriticalSections` with with `writeConcern` write concern.
*
- * Do nothing if the collection critical section is taken for that nss and reason, and will
- * invariant otherwise since it is the responsibility of the caller to ensure that only one
- * thread is taking the critical section.
+ * Do nothing if the critical section is taken for that namespace and reason, and will invariant
+ * otherwise since it is the responsibility of the caller to ensure that only one thread is
+ * taking the critical section.
*/
void acquireRecoverableCriticalSectionBlockWrites(
OperationContext* opCtx,
@@ -73,26 +73,26 @@ public:
/**
* Advances the recoverable critical section from the catch-up phase (i.e. blocking writes) to
- * the commit phase (i.e. blocking reads) for the specified nss and reason. The recoverable
- * critical section must have been acquired first through
- * 'acquireRecoverableCriticalSectionBlockWrites' function.
+ * the commit phase (i.e. blocking reads) for the specified namespace and reason. The
+ * recoverable critical section must have been acquired first through
+ * `acquireRecoverableCriticalSectionBlockWrites` function.
*
- * It updates a doc from config.collectionCriticalSections with writeConcern write concern.
+ * It updates a doc from `config.collectionCriticalSections` with `writeConcern` write concern.
*
- * Do nothing if the collection critical section is already taken in commit phase.
+ * Do nothing if the critical section is already taken in commit phase.
*/
void promoteRecoverableCriticalSectionToBlockAlsoReads(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& reason,
const WriteConcernOptions& writeConcern);
/**
- * Releases the recoverable critical section for the given nss and reason.
+ * Releases the recoverable critical section for the given namespace and reason.
*
- * It removes a doc from config.collectionCriticalSections with writeConcern write concern. As
- * part of the removal, the filtering information is cleared on secondary nodes. It is
- * responsability of the caller to properly set the filtering information on the primary node.
+ * It removes a doc from `config.collectionCriticalSections` with `writeConcern` write concern.
+ * As part of the removal, the filtering information is cleared on secondary nodes. It is
+ * responsibility of the caller to properly set the filtering information on the primary node.
*
- * Do nothing if the collection critical section is not taken for that nss and reason.
+ * Do nothing if the critical section is not taken for that namespace and reason.
*
* Throw an invariant in case the collection critical section is already taken by another
* operation with a different reason unless the flag 'throwIfReasonDiffers' is set to false.
@@ -104,29 +104,30 @@ public:
bool throwIfReasonDiffers = true);
/**
- * Recover all sharding related in memory states from disk.
+ * Recovers all sharding related in memory states from disk.
*/
void recoverStates(OperationContext* opCtx,
const std::set<NamespaceString>& rollbackNamespaces);
+ /**
+ * Recovers critical sections and indexes from disk when either initial sync or startup recovery
+ * have completed.
+ */
+ void onInitialDataAvailable(OperationContext* opCtx,
+ bool isMajorityDataAvailable) override final;
+
private:
/**
* This method is called when we have to mirror the state on disk of the recoverable critical
- * section to memory (on startUp or on rollback).
+ * section to memory (on startup or on rollback).
*/
void recoverRecoverableCriticalSections(OperationContext* opCtx);
/**
- * Recover the index versions from disk into the CSR.
+ * Recovers the index versions from disk into the CSR.
*/
void recoverIndexesCatalog(OperationContext* opCtx);
- void onInitialDataAvailable(OperationContext* opCtx,
- bool isMajorityDataAvailable) override final {
- recoverRecoverableCriticalSections(opCtx);
- recoverIndexesCatalog(opCtx);
- }
-
void onStartup(OperationContext* opCtx) override final {}
void onShutdown() override final {}
void onStepUpBegin(OperationContext* opCtx, long long term) override final {}
diff --git a/src/mongo/db/s/sharding_recovery_service_test.cpp b/src/mongo/db/s/sharding_recovery_service_test.cpp
new file mode 100644
index 00000000000..4825ce6ce9b
--- /dev/null
+++ b/src/mongo/db/s/sharding_recovery_service_test.cpp
@@ -0,0 +1,911 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/database_name.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/collection_critical_section_document_gen.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/database_sharding_state.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/shard_server_op_observer.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/s/sharding_recovery_service.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+class ShardingRecoveryServiceTest : public ShardServerTestFixture {
+public:
+ inline static const NamespaceString collNss{"TestDB", "TestCollection"};
+ inline static const BSONObj collOpReason =
+ BSON("Dummy operation on collection" << collNss.ns());
+
+ inline static const NamespaceString dbName{"TestDB"};
+ inline static const BSONObj dbOpReason = BSON("Dummy operation on database" << dbName.ns());
+
+ inline static const BSONObj differentOpReason = BSON("Yet another dummy operation" << true);
+
+ void setUp() override {
+ ShardServerTestFixture::setUp();
+ _opCtx = operationContext();
+ }
+
+ OperationContext* opCtx() {
+ return _opCtx;
+ }
+
+ OpObserver& opObserver() {
+ return _opObserver;
+ }
+
+ boost::optional<CollectionCriticalSectionDocument> readCriticalSectionDocument(
+ const NamespaceString& nss, const BSONObj& reason) {
+ FindCommandRequest findOp(NamespaceString::kCollectionCriticalSectionsNamespace);
+ findOp.setFilter(BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()));
+
+ DBDirectClient dbClient(opCtx());
+ auto cursor = dbClient.find(std::move(findOp));
+
+ if (!cursor->more()) {
+ return boost::none;
+ }
+
+ auto bsonObj = cursor->next();
+ auto doc = CollectionCriticalSectionDocument::parse(
+ IDLParserContext("AcquireRecoverableCSBW"), bsonObj);
+
+ // The document exists, so the reason must match.
+ ASSERT(!doc.getReason().woCompare(reason));
+
+ return doc;
+ }
+
+ void writeReadCriticalSectionDocument(const NamespaceString& nss,
+ const BSONObj& reason,
+ bool blockReads) {
+ auto doc = readCriticalSectionDocument(nss, reason);
+ if (!doc) {
+ doc = CollectionCriticalSectionDocument(nss, reason, blockReads);
+
+ DBDirectClient dbClient(opCtx());
+ dbClient.insert(NamespaceString::kCollectionCriticalSectionsNamespace.ns(),
+ doc->toBSON());
+
+ return;
+ }
+
+ // The document exists, so the blockReads should change.
+ ASSERT_NE(doc->getBlockReads(), blockReads);
+
+ DBDirectClient dbClient(opCtx());
+ dbClient.update(NamespaceString::kCollectionCriticalSectionsNamespace.ns(),
+ BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()),
+ BSON("$set" << BSON(CollectionCriticalSectionDocument::kBlockReadsFieldName
+ << blockReads)));
+ }
+
+ void deleteReadCriticalSectionDocument(const NamespaceString& nss, const BSONObj& reason) {
+ // The document must exist.
+ ASSERT(readCriticalSectionDocument(nss, reason));
+
+ DBDirectClient dbClient(opCtx());
+ dbClient.remove(NamespaceString::kCollectionCriticalSectionsNamespace.ns(),
+ BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()),
+ false /* removeMany */);
+ }
+
+ void assertCriticalSectionCatchUpEnteredInMemory(const NamespaceString& nss) {
+ if (nsIsDbOnly(nss.ns())) {
+ AutoGetDb db(opCtx(), nss.dbName(), MODE_IS);
+ auto dss =
+ DatabaseShardingState::acquire(opCtx(), nss.dbName(), DSSAcquisitionMode::kShared);
+ ASSERT(dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kWrite));
+ ASSERT(!dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead));
+ } else {
+ AutoGetCollection coll(opCtx(), nss, MODE_IS);
+ auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx(), nss, CSRAcquisitionMode::kShared);
+ ASSERT(
+ csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kWrite));
+ ASSERT(
+ !csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kRead));
+ }
+ }
+
+ void assertCriticalSectionCommitEnteredInMemory(const NamespaceString& nss) {
+ if (nsIsDbOnly(nss.ns())) {
+ AutoGetDb db(opCtx(), nss.dbName(), MODE_IS);
+ auto dss =
+ DatabaseShardingState::acquire(opCtx(), nss.dbName(), DSSAcquisitionMode::kShared);
+ ASSERT(dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kWrite));
+ ASSERT(dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead));
+ } else {
+ AutoGetCollection coll(opCtx(), nss, MODE_IS);
+ auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx(), nss, CSRAcquisitionMode::kShared);
+ ASSERT(
+ csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kWrite));
+ ASSERT(csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kRead));
+ }
+ }
+
+ void assertCriticalSectionLeftInMemory(const NamespaceString& nss) {
+ if (nsIsDbOnly(nss.ns())) {
+ AutoGetDb db(opCtx(), nss.dbName(), MODE_IS);
+ auto dss =
+ DatabaseShardingState::acquire(opCtx(), nss.dbName(), DSSAcquisitionMode::kShared);
+ ASSERT(!dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kWrite));
+ ASSERT(!dss->getCriticalSectionSignal(ShardingMigrationCriticalSection::kRead));
+ } else {
+ AutoGetCollection coll(opCtx(), nss, MODE_IS);
+ auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
+ opCtx(), nss, CSRAcquisitionMode::kShared);
+ ASSERT(
+ !csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kWrite));
+ ASSERT(
+ !csr->getCriticalSectionSignal(opCtx(), ShardingMigrationCriticalSection::kRead));
+ }
+ }
+
+ void assertCriticalSectionCatchUpEnteredOnDisk(const NamespaceString& nss,
+ const BSONObj& reason) {
+ auto doc = readCriticalSectionDocument(nss, reason);
+ ASSERT(doc);
+ ASSERT(!doc->getBlockReads());
+ }
+
+ void assertCriticalSectionCommitEnteredOnDisk(const NamespaceString& nss,
+ const BSONObj& reason) {
+ auto doc = readCriticalSectionDocument(nss, reason);
+ ASSERT(doc);
+ ASSERT(doc->getBlockReads());
+ }
+
+ void assertCriticalSectionLeftOnDisk(const NamespaceString& nss, const BSONObj& reason) {
+ ASSERT(!readCriticalSectionDocument(nss, reason));
+ }
+
+private:
+ OperationContext* _opCtx;
+ ShardServerOpObserver _opObserver;
+};
+
+class ShardingRecoveryServiceTestOnPrimary : public ShardingRecoveryServiceTest {
+public:
+ void setUp() override {
+ ShardingRecoveryServiceTest::setUp();
+
+ auto replCoord = repl::ReplicationCoordinator::get(operationContext());
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ }
+};
+
+class ShardingRecoveryServiceTestOnSecondary : public ShardingRecoveryServiceTest {
+public:
+ void setUp() override {
+ ShardingRecoveryServiceTest::setUp();
+
+ auto replCoord = repl::ReplicationCoordinator::get(operationContext());
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_SECONDARY));
+
+ // Create and lock the `config.collection_critical_sections` collection to allow
+ // notifications on the operation observer.
+ OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
+ opCtx());
+ ASSERT_OK(createCollection(
+ opCtx(), CreateCommand(NamespaceString::kCollectionCriticalSectionsNamespace)));
+ _criticalSectionColl.emplace(
+ opCtx(), NamespaceString::kCollectionCriticalSectionsNamespace, MODE_IX);
+ }
+
+ void tearDown() override {
+ _criticalSectionColl = boost::none;
+
+ ShardingRecoveryServiceTest::tearDown();
+ }
+
+ const CollectionPtr& criticalSectionColl() {
+ return **_criticalSectionColl;
+ }
+
+private:
+ boost::optional<AutoGetCollection> _criticalSectionColl;
+};
+
+class ShardingRecoveryServiceTestonInitialData : public ShardingRecoveryServiceTest {
+public:
+ void setUp() override {
+ ShardingRecoveryServiceTest::setUp();
+
+ // Create the `config.collection_critical_sections` collection to allow notifications on the
+ // operation observer.
+ OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
+ opCtx());
+ ASSERT_OK(createCollection(
+ opCtx(), CreateCommand(NamespaceString::kCollectionCriticalSectionsNamespace)));
+ }
+};
+
+using ShardingRecoveryServiceTestAfterRollback = ShardingRecoveryServiceTestonInitialData;
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+TEST_F(ShardingRecoveryServiceTestOnPrimary, BlockAndUnblockOperationsOnDatabase) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(dbName);
+
+ // Check that the document has been appropriately saved.
+ assertCriticalSectionCatchUpEnteredOnDisk(dbName, dbOpReason);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(dbName);
+
+ // Check that the document has been appropriately updated.
+ assertCriticalSectionCommitEnteredOnDisk(dbName, dbOpReason);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(dbName);
+
+ // Check that the document has been deleted.
+ assertCriticalSectionLeftOnDisk(dbName, dbOpReason);
+}
+
+TEST_F(ShardingRecoveryServiceTestOnPrimary, BlockAndUnblockOperationsTwiceOnDatabase) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(dbName);
+
+ // Check that the document has been appropriately saved.
+ assertCriticalSectionCatchUpEnteredOnDisk(dbName, dbOpReason);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(dbName);
+
+ // Check that the document has been appropriately updated.
+ assertCriticalSectionCommitEnteredOnDisk(dbName, dbOpReason);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(dbName);
+
+ // Check that the document has been deleted.
+ assertCriticalSectionLeftOnDisk(dbName, dbOpReason);
+}
+
+DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary,
+ FailBlockingWritesTwiceOnDatabaseWithDifferentReasons,
+ "invariant") {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), dbName, differentOpReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary,
+ FailBlockingReadsOnDatabaseWithDifferentReasons,
+ "invariant") {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), dbName, differentOpReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary,
+ FailUnblockingOperationsOnDatabaseWithDifferentReasons,
+ "invariant") {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), dbName, dbOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection(
+ opCtx(), dbName, differentOpReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+TEST_F(ShardingRecoveryServiceTestOnPrimary, BlockAndUnblockOperationsOnCollection) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(collNss);
+
+ // Check that the document has been appropriately saved.
+ assertCriticalSectionCatchUpEnteredOnDisk(collNss, collOpReason);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(collNss);
+
+ // Check that the document has been appropriately updated.
+ assertCriticalSectionCommitEnteredOnDisk(collNss, collOpReason);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(collNss);
+
+ // Check that the document has been deleted.
+ assertCriticalSectionLeftOnDisk(collNss, collOpReason);
+}
+
+TEST_F(ShardingRecoveryServiceTestOnPrimary, BlockAndUnblockOperationsTwiceOnCollection) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(collNss);
+
+ // Check that the document has been appropriately saved.
+ assertCriticalSectionCatchUpEnteredOnDisk(collNss, collOpReason);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(collNss);
+
+ // Check that the document has been appropriately updated.
+ assertCriticalSectionCommitEnteredOnDisk(collNss, collOpReason);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(collNss);
+
+ // Check that the document has been deleted.
+ assertCriticalSectionLeftOnDisk(collNss, collOpReason);
+}
+
+DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary,
+ FailBlockingWritesTwiceOnCollectionWithDifferentReasons,
+ "invariant") {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), collNss, differentOpReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary,
+ FailBlockingReadsOnCollectionWithDifferentReasons,
+ "invariant") {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), collNss, differentOpReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+DEATH_TEST_F(ShardingRecoveryServiceTestOnPrimary,
+ FailUnblockingOperationsOnCollectionWithDifferentReasons,
+ "invariant") {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx(), collNss, collOpReason, ShardingCatalogClient::kLocalWriteConcern);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ ShardingRecoveryService::get(opCtx())->releaseRecoverableCriticalSection(
+ opCtx(), collNss, differentOpReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+TEST_F(ShardingRecoveryServiceTestOnSecondary, BlockAndUnblockOperationsOnDatabase) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ // Simulate an insert notification on the `config.collection_critical_sections` collection, that
+ // is what a secondary node would receive when the primary node enters the catch-up phase of the
+ // critical section.
+ auto doc = CollectionCriticalSectionDocument(dbName, dbOpReason, false);
+ {
+ std::vector<InsertStatement> inserts;
+ inserts.emplace_back(doc.toBSON());
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ opObserver().onInserts(
+ opCtx(), criticalSectionColl(), inserts.begin(), inserts.end(), false);
+ wuow.commit();
+ }
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(dbName);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ // Simulate an update notification on the `config.collection_critical_sections` collection, that
+ // is what a secondary node would receive when the primary node enters the commit phase of the
+ // critical section.
+ doc.setBlockReads(true); // NOTE: This has no semantic effect as the critical section is
+ // promoted to any update event on the document!
+ // TODO (SERVER-71056): React to `blockReads` changes only.
+ {
+ CollectionUpdateArgs updateArgs;
+ updateArgs.updatedDoc = doc.toBSON();
+ OplogUpdateEntryArgs update(&updateArgs, criticalSectionColl());
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ opObserver().onUpdate(opCtx(), update);
+ wuow.commit();
+ }
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(dbName);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ // Simulate an delete notification on the `config.collection_critical_sections` collection, that
+ // is what a secondary node would receive when the primary node leaves the critical section.
+ {
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ opObserver().aboutToDelete(
+ opCtx(), criticalSectionColl()->ns(), criticalSectionColl()->uuid(), doc.toBSON());
+ opObserver().onDelete(opCtx(),
+ criticalSectionColl()->ns(),
+ criticalSectionColl()->uuid(),
+ kUninitializedStmtId,
+ {});
+ wuow.commit();
+ }
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(dbName);
+}
+
+TEST_F(ShardingRecoveryServiceTestOnSecondary, BlockAndUnblockOperationsOnCollection) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ // Simulate an insert notification on the `config.collection_critical_sections` collection, that
+ // is what a secondary node would receive when the primary node enters the catch-up phase of the
+ // critical section.
+ auto doc = CollectionCriticalSectionDocument(collNss, collOpReason, false);
+ {
+ std::vector<InsertStatement> inserts;
+ inserts.emplace_back(doc.toBSON());
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ opObserver().onInserts(
+ opCtx(), criticalSectionColl(), inserts.begin(), inserts.end(), false);
+ wuow.commit();
+ }
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(collNss);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ // Simulate an update notification on the `config.collection_critical_sections` collection, that
+ // is what a secondary node would receive when the primary node enters the commit phase of the
+ // critical section.
+ doc.setBlockReads(true); // NOTE: This has no semantic effect as the critical section is
+ // promoted to any update event on the document!
+ // TODO (SERVER-71056): React to `blockReads` changes only.
+ {
+ CollectionUpdateArgs updateArgs;
+ updateArgs.updatedDoc = doc.toBSON();
+ OplogUpdateEntryArgs update(&updateArgs, criticalSectionColl());
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ opObserver().onUpdate(opCtx(), update);
+ wuow.commit();
+ }
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(collNss);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ // Simulate an delete notification on the `config.collection_critical_sections` collection, that
+ // is what a secondary node would receive when the primary node leaves the critical section.
+ {
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ opObserver().aboutToDelete(
+ opCtx(), criticalSectionColl()->ns(), criticalSectionColl()->uuid(), doc.toBSON());
+ opObserver().onDelete(opCtx(),
+ criticalSectionColl()->ns(),
+ criticalSectionColl()->uuid(),
+ kUninitializedStmtId,
+ {});
+ wuow.commit();
+ }
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(collNss);
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+TEST_F(ShardingRecoveryServiceTestonInitialData, BlockAndUnblockOperationsOnDatabase) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ // Insert a document in the `config.collection_critical_sections` collection to simulate
+ // entering the catch-up phase of the critical section for a database, then react to an
+ // hypothetical availability of initial data.
+ {
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ writeReadCriticalSectionDocument(dbName, dbOpReason, false /* blockReads */);
+ }
+ ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(dbName);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ // Update the document in the `config.collection_critical_sections` collection to simulate
+ // entering the commit phase of the critical section for the database, then react to an
+ // hypothetical availability of initial data.
+ {
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ writeReadCriticalSectionDocument(dbName, dbOpReason, true /* blockReads */);
+ }
+ ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(dbName);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ // Delete the document in the `config.collection_critical_sections` collection to simulate
+ // leaving the critical section for the database, then react to an hypothetical availability of
+ // initial data.
+ {
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ deleteReadCriticalSectionDocument(dbName, dbOpReason);
+ }
+ ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(dbName);
+}
+
+TEST_F(ShardingRecoveryServiceTestonInitialData, BlockAndUnblockOperationsOnCollection) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ // Insert a document in the `config.collection_critical_sections` collection to simulate
+ // entering the catch-up phase of the critical section for a collection, then react to an
+ // hypothetical availability of initial data.
+ {
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ writeReadCriticalSectionDocument(collNss, collOpReason, false /* blockReads */);
+ }
+ ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(collNss);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ // Update the document in the `config.collection_critical_sections` collection to simulate
+ // entering the commit phase of the critical section for the collection, then react to an
+ // hypothetical availability of initial data.
+ {
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ writeReadCriticalSectionDocument(collNss, collOpReason, true /* blockReads */);
+ }
+ ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(collNss);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ // Delete the document in the `config.collection_critical_sections` collection to simulate
+ // leaving the critical section for the collection, then react to an hypothetical availability
+ // of initial data.
+ {
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ deleteReadCriticalSectionDocument(collNss, collOpReason);
+ }
+ ShardingRecoveryService::get(opCtx())->onInitialDataAvailable(opCtx(), false);
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(collNss);
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+TEST_F(ShardingRecoveryServiceTestAfterRollback, BlockAndUnblockOperationsOnDatabase) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ // Insert a document in the `config.collection_critical_sections` collection to simulate
+ // entering the catch-up phase of the critical section for a database, then react to an
+ // hypothetical replication rollback.
+ {
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ writeReadCriticalSectionDocument(dbName, dbOpReason, false /* blockReads */);
+ }
+ ShardingRecoveryService::get(opCtx())->recoverStates(
+ opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace});
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(dbName);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ // Update the document in the `config.collection_critical_sections` collection to simulate
+ // entering the commit phase of the critical section for the database, then react to an
+ // hypothetical replication rollback.
+ {
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ writeReadCriticalSectionDocument(dbName, dbOpReason, true /* blockReads */);
+ }
+ ShardingRecoveryService::get(opCtx())->recoverStates(
+ opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace});
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(dbName);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ // Delete the document in the `config.collection_critical_sections` collection to simulate
+ // leaving the critical section for the database, then react to an hypothetical replication
+ // rollback.
+ {
+ AutoGetDb db(opCtx(), dbName.dbName(), MODE_IS);
+ deleteReadCriticalSectionDocument(dbName, dbOpReason);
+ }
+ ShardingRecoveryService::get(opCtx())->recoverStates(
+ opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace});
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(dbName);
+}
+
+TEST_F(ShardingRecoveryServiceTestAfterRollback, BlockAndUnblockOperationsOnCollection) {
+ ///////////////////////////////////////////
+ // Block write operations (catch-up phase)
+ ///////////////////////////////////////////
+
+ // Insert a document in the `config.collection_critical_sections` collection to simulate
+ // entering the catch-up phase of the critical section for a collection, then react to an
+ // hypothetical replication rollback.
+ {
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ writeReadCriticalSectionDocument(collNss, collOpReason, false /* blockReads */);
+ }
+ ShardingRecoveryService::get(opCtx())->recoverStates(
+ opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace});
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCatchUpEnteredInMemory(collNss);
+
+ //////////////////////////////////////////////
+ // Block read/write operations (commit phase)
+ //////////////////////////////////////////////
+
+ // Update the document in the `config.collection_critical_sections` collection to simulate
+ // entering the commit phase of the critical section for the collection, then react to an
+ // hypothetical replication rollback.
+ {
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ writeReadCriticalSectionDocument(collNss, collOpReason, true /* blockReads */);
+ }
+ ShardingRecoveryService::get(opCtx())->recoverStates(
+ opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace});
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionCommitEnteredInMemory(collNss);
+
+ /////////////////////////////////
+ // Unblock read/write operations
+ /////////////////////////////////
+
+ // Delete the document in the `config.collection_critical_sections` collection to simulate
+ // leaving the critical section for the collection, then react to an hypothetical replication
+ // rollback.
+ {
+ AutoGetCollection coll(opCtx(), collNss, MODE_IS);
+ deleteReadCriticalSectionDocument(collNss, collOpReason);
+ }
+ ShardingRecoveryService::get(opCtx())->recoverStates(
+ opCtx(), {NamespaceString::kCollectionCriticalSectionsNamespace});
+
+ // Check that the in-memory status has been appropriately updated.
+ assertCriticalSectionLeftInMemory(collNss);
+}
+
+} // namespace mongo