summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2022-03-08 08:28:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-08 09:25:08 +0000
commit4a1f01267e8723f4eba6aa264466fd6b27cb2ab9 (patch)
tree2c48e0debc899c23554a8162ee0f7da171483ce5 /src/mongo/db
parentf70e73ccc422499080cfb4163efe42a64a4e59e1 (diff)
downloadmongo-4a1f01267e8723f4eba6aa264466fd6b27cb2ab9.tar.gz
SERVER-63519 Extend RecoverableCriticalSectionService to accommodate global user write blocking
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/commands/SConscript16
-rw-r--r--src/mongo/db/commands/set_user_write_block_mode_command.cpp27
-rw-r--r--src/mongo/db/namespace_string.cpp3
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/s/SConscript16
-rw-r--r--src/mongo/db/s/global_user_write_block_state.cpp2
-rw-r--r--src/mongo/db/s/global_user_write_block_state.h5
-rw-r--r--src/mongo/db/s/user_writes_critical_section_document.idl56
-rw-r--r--src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp353
-rw-r--r--src/mongo/db/s/user_writes_recoverable_critical_section_service.h150
-rw-r--r--src/mongo/db/user_write_block_mode_op_observer.cpp117
-rw-r--r--src/mongo/db/user_write_block_mode_op_observer.h5
13 files changed, 738 insertions, 17 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 9f4b6b79cff..3f16237f99d 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1038,6 +1038,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/s/sharding_api_d',
+ '$BUILD_DIR/mongo/db/s/user_writes_recoverable_critical_section',
],
)
@@ -2384,6 +2385,7 @@ env.Library(
# mongod_initializers.
'$BUILD_DIR/mongo/client/clientdriver_minimal',
'$BUILD_DIR/mongo/db/change_stream_options_manager',
+ '$BUILD_DIR/mongo/db/commands/set_user_write_block_mode_command',
'$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover',
'$BUILD_DIR/mongo/idl/cluster_server_parameter',
'$BUILD_DIR/mongo/idl/cluster_server_parameter_op_observer',
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 98c4b4c3a9e..e8eb26176a2 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -462,6 +462,20 @@ env.Library(
)
env.Library(
+ target='set_user_write_block_mode_command',
+ source=[
+ 'set_user_write_block_mode_command.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/commands',
+ '$BUILD_DIR/mongo/db/rw_concern_d',
+ '$BUILD_DIR/mongo/db/s/user_writes_recoverable_critical_section',
+ 'set_user_write_block_mode_idl',
+ ],
+)
+
+env.Library(
target='set_user_write_block_mode_idl',
source=[
'set_user_write_block_mode.idl',
@@ -527,7 +541,6 @@ env.Library(
'rwc_defaults_commands.cpp',
"set_feature_compatibility_version_command.cpp",
"set_index_commit_quorum_command.cpp",
- 'set_user_write_block_mode_command.cpp',
"shutdown_d.cpp",
"snapshot_management.cpp",
"tenant_migration_donor_cmds.cpp",
@@ -596,7 +609,6 @@ env.Library(
'servers',
'set_feature_compatibility_version_idl',
'set_index_commit_quorum_idl',
- 'set_user_write_block_mode_idl',
'shell_protocol',
'shutdown_idl',
'standalone',
diff --git a/src/mongo/db/commands/set_user_write_block_mode_command.cpp b/src/mongo/db/commands/set_user_write_block_mode_command.cpp
index b216597ef3e..3ecf129935a 100644
--- a/src/mongo/db/commands/set_user_write_block_mode_command.cpp
+++ b/src/mongo/db/commands/set_user_write_block_mode_command.cpp
@@ -32,10 +32,10 @@
#include "mongo/platform/basic.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/set_user_write_block_mode_gen.h"
-#include "mongo/db/s/global_user_write_block_state.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/user_writes_recoverable_critical_section_service.h"
#include "mongo/logv2/log.h"
@@ -62,12 +62,29 @@ public:
void typedRun(OperationContext* opCtx) {
{
- Lock::GlobalLock lk(opCtx, MODE_X);
if (request().getGlobal()) {
- GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
+ UserWritesRecoverableCriticalSectionService::get(opCtx)
+ ->acquireRecoverableCriticalSectionBlockingUserWrites(
+ opCtx,
+ UserWritesRecoverableCriticalSectionService::
+ kGlobalUserWritesNamespace);
} else {
- GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
+ UserWritesRecoverableCriticalSectionService::get(opCtx)
+ ->releaseRecoverableCriticalSection(
+ opCtx,
+ UserWritesRecoverableCriticalSectionService::
+ kGlobalUserWritesNamespace);
}
+
+ // Wait for the writes to the UserWritesRecoverableCriticalSection collection to be
+ // majority commited.
+ auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient());
+ WriteConcernResult writeConcernResult;
+ WriteConcernOptions majority(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kWriteConcernTimeoutUserCommand);
+ uassertStatusOK(waitForWriteConcern(
+ opCtx, replClient.getLastOp(), majority, &writeConcernResult));
}
}
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 40dce12ff7b..ec4d89e756f 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -148,6 +148,9 @@ const NamespaceString NamespaceString::kConfigImagesNamespace(NamespaceString::k
const NamespaceString NamespaceString::kConfigsvrCoordinatorsNamespace(
NamespaceString::kConfigDb, "sharding_configsvr_coordinators");
+const NamespaceString NamespaceString::kUserWritesCriticalSectionsNamespace(
+ NamespaceString::kConfigDb, "user_writes_critical_sections");
+
bool NamespaceString::isListCollectionsCursorNS() const {
return coll() == listCollectionsCursorCol;
}
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index d30348da0d5..c1d01550f90 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -207,6 +207,9 @@ public:
// Namespace used for persisting ConfigsvrCoordinator state documents.
static const NamespaceString kConfigsvrCoordinatorsNamespace;
+ // Namespace for storing user write blocking critical section documents
+ static const NamespaceString kUserWritesCriticalSectionsNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index e513a9c7a6d..e5ccb605749 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -165,6 +165,22 @@ env.Library(
)
env.Library(
+ target='user_writes_recoverable_critical_section',
+ source=[
+ 'user_writes_critical_section_document.idl',
+ 'user_writes_recoverable_critical_section_service.cpp',
+ ],
+ LIBDEPS=[
+ 'sharding_api_d',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/repl/replica_set_aware_service',
+ '$BUILD_DIR/mongo/db/rw_concern_d',
+ ]
+)
+
+env.Library(
target='transaction_coordinator',
source=[
'server_transaction_coordinators_metrics.cpp',
diff --git a/src/mongo/db/s/global_user_write_block_state.cpp b/src/mongo/db/s/global_user_write_block_state.cpp
index 2196206fe2d..9843abf90d6 100644
--- a/src/mongo/db/s/global_user_write_block_state.cpp
+++ b/src/mongo/db/s/global_user_write_block_state.cpp
@@ -48,12 +48,10 @@ GlobalUserWriteBlockState* GlobalUserWriteBlockState::get(OperationContext* opCt
}
void GlobalUserWriteBlockState::enableUserWriteBlocking(OperationContext* opCtx) {
- invariant(opCtx->lockState()->isLockHeldForMode(resourceIdGlobal, MODE_X));
_globalUserWritesBlocked = true;
}
void GlobalUserWriteBlockState::disableUserWriteBlocking(OperationContext* opCtx) {
- invariant(opCtx->lockState()->isLockHeldForMode(resourceIdGlobal, MODE_X));
_globalUserWritesBlocked = false;
}
diff --git a/src/mongo/db/s/global_user_write_block_state.h b/src/mongo/db/s/global_user_write_block_state.h
index 72d8ce957b6..8325b717b15 100644
--- a/src/mongo/db/s/global_user_write_block_state.h
+++ b/src/mongo/db/s/global_user_write_block_state.h
@@ -42,8 +42,7 @@ public:
static GlobalUserWriteBlockState* get(OperationContext* opCtx);
/**
- * Methods to control the global user write blocking state. Callers must be hold the GlobalLock
- * in MODE_X.
+ * Methods to control the global user write blocking state.
*/
void enableUserWriteBlocking(OperationContext* opCtx);
void disableUserWriteBlocking(OperationContext* opCtx);
@@ -55,8 +54,6 @@ public:
void checkUserWritesAllowed(OperationContext* opCtx, const NamespaceString& nss) const;
private:
- // Modifying the state below requires holding the GlobalLock in X mode; holding the DBLock in
- // any mode is acceptable for reading it.
bool _globalUserWritesBlocked{false};
};
diff --git a/src/mongo/db/s/user_writes_critical_section_document.idl b/src/mongo/db/s/user_writes_critical_section_document.idl
new file mode 100644
index 00000000000..10b642c5964
--- /dev/null
+++ b/src/mongo/db/s/user_writes_critical_section_document.idl
@@ -0,0 +1,56 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This file defines the format of documents stored in config.user_writes_critical_sections.
+# Each document is used to represent that user writes are blocked, for a particular collection or
+# globally.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ userWriteBlockingCriticalSectionDocument:
+ description: "Represents an ongoing user-writes blocking critical section."
+ generate_comparison_operators: false
+ strict: false
+ fields:
+ _id:
+ type: namespacestring
+ description: "The namespace of the collection that is under the collection critical
+ section. Empty namespacestring indicates global write blocking to all
+ collections."
+ cpp_name: nss
+ blockNewUserShardedDDL:
+ type: bool
+ default: false
+ blockUserWrites:
+ type: bool
+ default: false
diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
new file mode 100644
index 00000000000..6215032f7b0
--- /dev/null
+++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
@@ -0,0 +1,353 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/user_writes_recoverable_critical_section_service.h"
+
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/s/global_user_write_block_state.h"
+#include "mongo/db/s/user_writes_critical_section_document_gen.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+
+namespace mongo {
+
+namespace {
+const auto serviceDecorator =
+ ServiceContext::declareDecoration<UserWritesRecoverableCriticalSectionService>();
+
+BSONObj findRecoverableCriticalSectionDoc(OperationContext* opCtx, const NamespaceString& nss) {
+ DBDirectClient dbClient(opCtx);
+
+ const auto queryNss =
+ BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString());
+ FindCommandRequest findRequest{NamespaceString::kUserWritesCriticalSectionsNamespace};
+ findRequest.setFilter(queryNss);
+ return dbClient.findOne(std::move(findRequest));
+}
+
+void setBlockUserWritesDocumentField(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool blockUserWrites) {
+ PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store(
+ NamespaceString::kUserWritesCriticalSectionsNamespace);
+ store.update(
+ opCtx,
+ BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()),
+ BSON("$set" << BSON(UserWriteBlockingCriticalSectionDocument::kBlockUserWritesFieldName
+ << blockUserWrites)),
+ ShardingCatalogClient::kLocalWriteConcern);
+}
+
+void acquireRecoverableCriticalSection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool blockShardedDDL,
+ bool blockUserWrites) {
+ LOGV2_DEBUG(6351900,
+ 3,
+ "Acquiring user writes recoverable critical section",
+ "namespace"_attr = nss,
+ "blockShardedDDL"_attr = blockShardedDDL,
+ "blockUserWrites"_attr = blockUserWrites);
+
+ invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace);
+ invariant(!opCtx->lockState()->isLocked());
+
+ {
+ // If we intend to start blocking user writes, take the GlobalLock in MODE_X in order to
+ // ensure that any ongoing writes have completed.
+ Lock::GlobalLock globalLock(opCtx, blockUserWrites ? MODE_X : MODE_IX);
+
+ const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss);
+ if (!bsonObj.isEmpty()) {
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("AcquireUserWritesCS"), bsonObj);
+
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Cannot acquire user writes critical section with different "
+ "options than the already existing one. blockShardedDDL: "
+ << blockShardedDDL
+ << ", current: " << collCSDoc.getBlockNewUserShardedDDL(),
+ !blockShardedDDL || collCSDoc.getBlockNewUserShardedDDL() == blockShardedDDL);
+
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Cannot acquire user writes critical section with different "
+ "options than the already existing one. blockUserWrites: "
+ << blockUserWrites
+ << ", current: " << collCSDoc.getBlockNewUserShardedDDL(),
+ !blockUserWrites || collCSDoc.getBlockUserWrites() == blockUserWrites);
+
+ LOGV2_DEBUG(6351914,
+ 3,
+ "The user writes recoverable critical section was already acquired",
+ "namespace"_attr = nss);
+ return;
+ }
+
+ // Acquire the critical section by inserting the critical section document. The OpObserver
+ // will take the in-memory CS when reacting to the insert event.
+ UserWriteBlockingCriticalSectionDocument newDoc(nss);
+ newDoc.setBlockNewUserShardedDDL(blockShardedDDL);
+ newDoc.setBlockUserWrites(blockUserWrites);
+
+ PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store(
+ NamespaceString::kUserWritesCriticalSectionsNamespace);
+ store.add(opCtx, newDoc, ShardingCatalogClient::kLocalWriteConcern);
+ }
+
+ LOGV2_DEBUG(6351901,
+ 2,
+ "Acquired user writes recoverable critical section",
+ "namespace"_attr = nss,
+ "blockShardedDDL"_attr = blockShardedDDL,
+ "blockUserWrites"_attr = blockUserWrites);
+}
+} // namespace
+
+const NamespaceString UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace =
+ NamespaceString();
+
+UserWritesRecoverableCriticalSectionService* UserWritesRecoverableCriticalSectionService::get(
+ ServiceContext* serviceContext) {
+ return &serviceDecorator(serviceContext);
+}
+
+UserWritesRecoverableCriticalSectionService* UserWritesRecoverableCriticalSectionService::get(
+ OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+const ReplicaSetAwareServiceRegistry::Registerer<UserWritesRecoverableCriticalSectionService>
+ UserWritesRecoverableCriticalSectionServiceServiceRegisterer(
+ "UserWritesRecoverableCriticalSectionService");
+
+bool UserWritesRecoverableCriticalSectionService::shouldRegisterReplicaSetAwareService() const {
+ return serverGlobalParams.clusterRole != ClusterRole::ConfigServer;
+}
+
+void UserWritesRecoverableCriticalSectionService::
+ acquireRecoverableCriticalSectionBlockingUserWrites(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::None,
+ "Acquiring the user writes recoverable critical section directly to start blocking "
+ "writes is only allowed on non-sharded cluster.");
+
+ acquireRecoverableCriticalSection(
+ opCtx, nss, false /* blockShardedDDL */, true /* blockUserWrites */);
+}
+
+void UserWritesRecoverableCriticalSectionService::
+ acquireRecoverableCriticalSectionBlockNewShardedDDL(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ invariant(serverGlobalParams.clusterRole != ClusterRole::None,
+ "Acquiring the user writes recoverable critical section blocking only sharded DDL is "
+ "only allowed on sharded clusters");
+
+ // Take the user writes critical section blocking only ShardingDDLCoordinators.
+ acquireRecoverableCriticalSection(
+ opCtx, nss, true /* blockShardedDDL */, false /* blockUserWrites */);
+}
+
+void UserWritesRecoverableCriticalSectionService::
+ promoteRecoverableCriticalSectionToBlockUserWrites(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ invariant(serverGlobalParams.clusterRole != ClusterRole::None,
+ "Promoting the user writes recoverable critical section to also block user writes is "
+ "only allowed on sharded clusters");
+
+ LOGV2_DEBUG(6351902,
+ 3,
+ "Promoting user writes recoverable critical section to also block reads",
+ "namespace"_attr = nss);
+
+ invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace);
+ invariant(!opCtx->lockState()->isLocked());
+
+ {
+ // Take the GlobalLock in MODE_X in order to ensure that any ongoing writes have completed
+ // before starting to block new writes.
+ Lock::GlobalLock globalLock(opCtx, MODE_X);
+
+ const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss);
+ uassert(ErrorCodes::IllegalOperation,
+ "Cannot promote user writes critical section to block user writes if critical "
+ "section document not persisted first.",
+ !bsonObj.isEmpty());
+
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("PromoteUserWritesCS"), bsonObj);
+
+ uassert(ErrorCodes::IllegalOperation,
+ "Cannot promote user writes critical section to block user writes if sharded DDL "
+ "operations have not been blocked first.",
+ collCSDoc.getBlockNewUserShardedDDL());
+
+ // If we are already blocking user writes, then we are done.
+ if (collCSDoc.getBlockUserWrites()) {
+ LOGV2_DEBUG(6351903,
+ 3,
+ "The user writes recoverable critical section was already promoted to also "
+ "block user "
+ "writes, do nothing",
+ "namespace"_attr = nss);
+ return;
+ }
+
+ // Promote the critical section to block also user writes by updating the critical section
+ // document. The OpObserver will promote the in-memory CS when reacting to the update event.
+ setBlockUserWritesDocumentField(opCtx, nss, true /* blockUserWrites */);
+ }
+
+ LOGV2_DEBUG(6351904,
+ 2,
+ "Promoted user writes recoverable critical section to also block user writes",
+ "namespace"_attr = nss);
+}
+
+void UserWritesRecoverableCriticalSectionService::
+ demoteRecoverableCriticalSectionToNoLongerBlockUserWrites(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ invariant(serverGlobalParams.clusterRole != ClusterRole::None,
+ "Demoting the user writes recoverable critical section to also block user writes is "
+ "only allowed on sharded clusters");
+
+ LOGV2_DEBUG(6351905,
+ 3,
+ "Demoting user writes recoverable critical section to no longer block user writes",
+ "namespace"_attr = nss);
+
+ invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace);
+ invariant(!opCtx->lockState()->isLocked());
+
+ {
+ Lock::GlobalLock globalLock(opCtx, MODE_IX);
+
+ const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss);
+ // If the critical section is not taken, then we are done.
+ if (bsonObj.isEmpty()) {
+ LOGV2_DEBUG(
+ 6351906,
+ 3,
+ "The user writes recoverable critical section was not currently taken, do nothing",
+ "namespace"_attr = nss);
+ return;
+ }
+
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("DemoteUserWritesCS"), bsonObj);
+
+ // If we are not currently blocking user writes, then we are done.
+ if (!collCSDoc.getBlockUserWrites()) {
+ LOGV2_DEBUG(6351907,
+ 3,
+ "The user writes recoverable critical section was already not blocking "
+ "user writes, do nothing",
+ "namespace"_attr = nss);
+ return;
+ }
+
+ // Demote the critical section to block also user writes by updating the critical section
+ // document. The OpObserver will demote the in-memory CS when reacting to the update event.
+ setBlockUserWritesDocumentField(opCtx, nss, false /* blockUserWrites */);
+ }
+
+ LOGV2_DEBUG(6351908,
+ 2,
+ "Demoted user writes recoverable critical section to no longer block user writes",
+ "namespace"_attr = nss);
+}
+
+
+void UserWritesRecoverableCriticalSectionService::releaseRecoverableCriticalSection(
+ OperationContext* opCtx, const NamespaceString& nss) {
+ LOGV2_DEBUG(
+ 6351909, 3, "Releasing user writes recoverable critical section", "namespace"_attr = nss);
+
+ invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace);
+ invariant(!opCtx->lockState()->isLocked());
+
+ {
+ Lock::GlobalLock globalLock(opCtx, MODE_IX);
+
+ const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss);
+
+ // If there is no persisted document, then we are done.
+ if (bsonObj.isEmpty()) {
+ LOGV2_DEBUG(
+ 6351910,
+ 3,
+ "The user writes recoverable critical section was already released, do nothing",
+ "namespace"_attr = nss);
+ return;
+ }
+
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("ReleaseUserWritesCS"), bsonObj);
+
+ // Release the critical section by deleting the critical section document. The OpObserver
+ // will release the in-memory CS when reacting to the delete event.
+ PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store(
+ NamespaceString::kUserWritesCriticalSectionsNamespace);
+ store.remove(
+ opCtx,
+ BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()),
+ ShardingCatalogClient::kLocalWriteConcern);
+ }
+
+ LOGV2_DEBUG(
+ 6351911, 2, "Released user writes recoverable critical section", "namespace"_attr = nss);
+}
+
+void UserWritesRecoverableCriticalSectionService::recoverRecoverableCriticalSections(
+ OperationContext* opCtx) {
+ LOGV2_DEBUG(6351912, 2, "Recovering all user writes recoverable critical sections");
+
+ GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
+
+ // Read the persisted critical section documents and restore the state into memory.
+ PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store(
+ NamespaceString::kUserWritesCriticalSectionsNamespace);
+ store.forEach(opCtx, BSONObj{}, [&opCtx](const UserWriteBlockingCriticalSectionDocument& doc) {
+ invariant(doc.getNss().isEmpty());
+ if (doc.getBlockUserWrites()) {
+ GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
+ }
+
+ return true;
+ });
+
+ LOGV2_DEBUG(6351913, 2, "Recovered all user writes recoverable critical sections");
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.h b/src/mongo/db/s/user_writes_recoverable_critical_section_service.h
new file mode 100644
index 00000000000..a08756b4a93
--- /dev/null
+++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.h
@@ -0,0 +1,150 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replica_set_aware_service.h"
+#include "mongo/db/service_context.h"
+
+namespace mongo {
+
+/**
+ * Represents the 'user writes blocking' critical section. The critical section status is persisted
+ * on disk and it's in-memory representation is kept in sync with the persisted state through an
+ * OpObserver that reacts to the inserts/updates/deletes on the persisted document.
+ *
+ * On replicaSets, enable blocking is depicted by transition (1); and disable blocking is depicted
+ * by transition (2):
+ *
+ * (1) acquireRecoverableCriticalSectionBlockingUserWrites()
+ * ---------------------------------------------------------
+ * | |
+ * | v
+ * + --------------------+ + --------------------+
+ * | User writes allowed | | User writes blocked |
+ * + --------------------+ + --------------------+
+ * ^ ^
+ * | |
+ * ---------------------------------------------------------
+ * (2) releaseRecoverableCriticalSection()
+ *
+ * On sharded clusters, blocking/unblocking happens as a two-phase protocol. Enable blocking is
+ * depicted by transitions (1) and (2); and disable blocking is depicted by (3) and (4):
+ *
+ * (2) promoteRecoverableCriticalSectionToBlockUserWrites()
+ * ------------------------
+ * | |
+ * (1) acquireRecoverableCriticalSectionBlockNewShardedDDL() | |
+ * --------------------------------- | |
+ * | | | |
+ * | V | v
+ * + -------------------------+ + -------------------------+ + -------------------------+
+ * | User writes allowed, | | User writes allowed, | | User writes blocked |
+ * | User sharded DDL allowed | | User sharded DDL blocked | | User sharded DDL blocked |
+ * + -------------------------+ + -------------------------+ + -------------------------+
+ * ^ ^ |
+ * | | | |
+ * -------------------------------- | |
+ * (4) releaseRecoverableCriticalSection() | |
+ * | |
+ * -----------------------------
+ * (3) demoteRecoverableCriticalSectionToNoLongerBlockUserWrites()
+ *
+ */
+class UserWritesRecoverableCriticalSectionService
+ : public ReplicaSetAwareService<UserWritesRecoverableCriticalSectionService> {
+public:
+ static const NamespaceString kGlobalUserWritesNamespace;
+
+ UserWritesRecoverableCriticalSectionService() = default;
+
+ static UserWritesRecoverableCriticalSectionService* get(ServiceContext* serviceContext);
+ static UserWritesRecoverableCriticalSectionService* get(OperationContext* opCtx);
+
+ bool shouldRegisterReplicaSetAwareService() const override;
+
+ /**
+ * Acquires the user writes critical section blocking user writes. This should be used only on
+ * replica sets.
+ */
+ void acquireRecoverableCriticalSectionBlockingUserWrites(OperationContext* opCtx,
+ const NamespaceString& nss);
+
+ /**
+ * Acquires the user writes critical section blocking only new sharded DDL operations, but not
+ * user writes nor local DDL. This is a 'prepare' state before user writes and local DDL can be
+ * blocked on sharded clusters.
+ */
+ void acquireRecoverableCriticalSectionBlockNewShardedDDL(OperationContext* opCtx,
+ const NamespaceString& nss);
+
+ /**
+ * Promotes a user writes critical section that is in the 'prepare' state (i.e. only blocking
+ * sharded DDL) to start blocking also user writes. This should be run only after all shards in
+ * the cluster have entered the 'prepare' state.
+ */
+ void promoteRecoverableCriticalSectionToBlockUserWrites(OperationContext* opCtx,
+ const NamespaceString& nss);
+
+ /**
+ * Demotes a user writes critical section that is blocking both sharded DDL and user writes to
+ * only block sharded DDL. This is a preparation step before allowing user writes again on
+ * sharded clusters.
+ */
+ void demoteRecoverableCriticalSectionToNoLongerBlockUserWrites(OperationContext* opCtx,
+ const NamespaceString& nss);
+
+ /**
+ * Releases the user writes critical section, allowing user writes again. On sharded clusters,
+ * before this method is called all shards must have first demoted their critical sections to no
+ * longer block user writes.
+ */
+ void releaseRecoverableCriticalSection(OperationContext* opCtx, const NamespaceString& nss);
+
+ /**
+ * This method is called when we have to mirror the state on disk of the recoverable critical
+ * section to memory (on startUp or on rollback).
+ */
+ void recoverRecoverableCriticalSections(OperationContext* opCtx);
+
+private:
+ void onStartup(OperationContext* opCtx) override final {
+ recoverRecoverableCriticalSections(opCtx);
+ }
+
+ void onShutdown() override final {}
+ void onStepUpBegin(OperationContext* opCtx, long long term) override final {}
+ void onStepUpComplete(OperationContext* opCtx, long long term) override final {}
+ void onStepDown() override final {}
+ void onBecomeArbiter() override final {}
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/user_write_block_mode_op_observer.cpp b/src/mongo/db/user_write_block_mode_op_observer.cpp
index 42bd8e863d4..2d7c7de00f2 100644
--- a/src/mongo/db/user_write_block_mode_op_observer.cpp
+++ b/src/mongo/db/user_write_block_mode_op_observer.cpp
@@ -32,8 +32,20 @@
#include "mongo/db/user_write_block_mode_op_observer.h"
#include "mongo/db/s/global_user_write_block_state.h"
+#include "mongo/db/s/user_writes_critical_section_document_gen.h"
+#include "mongo/db/s/user_writes_recoverable_critical_section_service.h"
namespace mongo {
+namespace {
+
+const auto documentIdDecoration = OperationContext::declareDecoration<BSONObj>();
+
+bool isStandaloneOrPrimary(OperationContext* opCtx) {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ return replCoord->canAcceptWritesForDatabase(opCtx, NamespaceString::kAdminDb);
+}
+
+} // namespace
void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx,
const NamespaceString& nss,
@@ -42,11 +54,78 @@ void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
_checkWriteAllowed(opCtx, nss);
+
+ if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) {
+ for (auto it = first; it != last; ++it) {
+ const auto& insertedDoc = it->doc;
+
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isReplEnabled() ||
+ (!replCoord->getMemberState().recovering() &&
+ !replCoord->getMemberState().rollback())) {
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("UserWriteBlockOpObserver"), insertedDoc);
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx,
+ blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(),
+ blockWrites = collCSDoc.getBlockUserWrites(),
+ insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
+ invariant(insertedNss.isEmpty());
+ boost::optional<Lock::GlobalLock> globalLockIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
+ }
+ if (blockWrites) {
+ GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
+ }
+ });
+ }
+ }
+ }
}
void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args) {
_checkWriteAllowed(opCtx, args.nss);
+
+ if (args.nss == NamespaceString::kUserWritesCriticalSectionsNamespace) {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isReplEnabled() ||
+ (!replCoord->getMemberState().recovering() &&
+ !replCoord->getMemberState().rollback())) {
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("UserWriteBlockOpObserver"), args.updateArgs->updatedDoc);
+
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx,
+ updatedNss = collCSDoc.getNss(),
+ blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(),
+ blockWrites = collCSDoc.getBlockUserWrites(),
+ insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
+ invariant(updatedNss.isEmpty());
+ boost::optional<Lock::GlobalLock> globalLockIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
+ }
+
+ if (blockWrites) {
+ GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
+ } else {
+ GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
+ }
+ });
+ }
+ }
+}
+
+void UserWriteBlockModeOpObserver::aboutToDelete(OperationContext* opCtx,
+ NamespaceString const& nss,
+ const UUID& uuid,
+ BSONObj const& doc) {
+
+ if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) {
+ documentIdDecoration(opCtx) = doc;
+ }
}
void UserWriteBlockModeOpObserver::onDelete(OperationContext* opCtx,
@@ -55,11 +134,47 @@ void UserWriteBlockModeOpObserver::onDelete(OperationContext* opCtx,
StmtId stmtId,
const OplogDeleteEntryArgs& args) {
_checkWriteAllowed(opCtx, nss);
+
+ if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) {
+ auto& documentId = documentIdDecoration(opCtx);
+ invariant(!documentId.isEmpty());
+
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isReplEnabled() ||
+ (!replCoord->getMemberState().recovering() &&
+ !replCoord->getMemberState().rollback())) {
+ const auto& deletedDoc = documentId;
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("UserWriteBlockOpObserver"), deletedDoc);
+
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, deletedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
+ invariant(deletedNss.isEmpty());
+ boost::optional<Lock::GlobalLock> globalLockIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
+ }
+
+ GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
+ });
+ }
+ }
+}
+
+void UserWriteBlockModeOpObserver::_onReplicationRollback(OperationContext* opCtx,
+ const RollbackObserverInfo& rbInfo) {
+ if (rbInfo.rollbackNamespaces.find(NamespaceString::kUserWritesCriticalSectionsNamespace) !=
+ rbInfo.rollbackNamespaces.end()) {
+ UserWritesRecoverableCriticalSectionService::get(opCtx)->recoverRecoverableCriticalSections(
+ opCtx);
+ }
}
void UserWriteBlockModeOpObserver::_checkWriteAllowed(OperationContext* opCtx,
const NamespaceString& nss) {
- GlobalUserWriteBlockState::get(opCtx)->checkUserWritesAllowed(opCtx, nss);
+ if (isStandaloneOrPrimary(opCtx)) {
+ GlobalUserWriteBlockState::get(opCtx)->checkUserWritesAllowed(opCtx, nss);
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h
index 8da805bb96f..d5ad0a2ecaa 100644
--- a/src/mongo/db/user_write_block_mode_op_observer.h
+++ b/src/mongo/db/user_write_block_mode_op_observer.h
@@ -100,7 +100,7 @@ public:
void aboutToDelete(OperationContext* opCtx,
const NamespaceString& nss,
const UUID& uuid,
- const BSONObj& doc) final {}
+ const BSONObj& doc) final;
void onInternalOpMessage(OperationContext* opCtx,
const NamespaceString& nss,
@@ -209,8 +209,7 @@ public:
const repl::OpTime& newCommitPoint) final {}
private:
- void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final {
- }
+ void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final;
// uasserts that a write to the given namespace is allowed under the current user write blocking
// setting.