diff options
author | Jordi Serra Torrens <jordi.serra-torrens@mongodb.com> | 2022-03-09 08:46:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-09 12:46:36 +0000 |
commit | 2a4fcbbc9c063cb6a872e494e3870dec86d58540 (patch) | |
tree | 74f8d3ea5f275596945e719635b781f67f84e01a | |
parent | 1574ce6570aff2ff6cc5e86f17ca77e0c01e9ee3 (diff) | |
download | mongo-2a4fcbbc9c063cb6a872e494e3870dec86d58540.tar.gz |
SERVER-63519 Extend RecoverableCriticalSectionService to accommodate global user write blocking
This reverts commit 5a3ec65dd434f7700ba2f2b173c35a7c4b022437.
-rw-r--r-- | src/mongo/db/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 16 | ||||
-rw-r--r-- | src/mongo/db/commands/set_user_write_block_mode_command.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 16 | ||||
-rw-r--r-- | src/mongo/db/s/global_user_write_block_state.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/global_user_write_block_state.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/user_writes_critical_section_document.idl | 56 | ||||
-rw-r--r-- | src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp | 353 | ||||
-rw-r--r-- | src/mongo/db/s/user_writes_recoverable_critical_section_service.h | 150 | ||||
-rw-r--r-- | src/mongo/db/user_write_block_mode_op_observer.cpp | 117 | ||||
-rw-r--r-- | src/mongo/db/user_write_block_mode_op_observer.h | 5 |
13 files changed, 738 insertions, 17 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9f4b6b79cff..3f16237f99d 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1038,6 +1038,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/s/sharding_api_d', + '$BUILD_DIR/mongo/db/s/user_writes_recoverable_critical_section', ], ) @@ -2384,6 +2385,7 @@ env.Library( # mongod_initializers. '$BUILD_DIR/mongo/client/clientdriver_minimal', '$BUILD_DIR/mongo/db/change_stream_options_manager', + '$BUILD_DIR/mongo/db/commands/set_user_write_block_mode_command', '$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover', '$BUILD_DIR/mongo/idl/cluster_server_parameter', '$BUILD_DIR/mongo/idl/cluster_server_parameter_op_observer', diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index eb1eb949525..8e6d0105adb 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -463,6 +463,20 @@ env.Library( ) env.Library( + target='set_user_write_block_mode_command', + source=[ + 'set_user_write_block_mode_command.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/commands', + '$BUILD_DIR/mongo/db/rw_concern_d', + '$BUILD_DIR/mongo/db/s/user_writes_recoverable_critical_section', + 'set_user_write_block_mode_idl', + ], +) + +env.Library( target='set_user_write_block_mode_idl', source=[ 'set_user_write_block_mode.idl', @@ -528,7 +542,6 @@ env.Library( 'rwc_defaults_commands.cpp', "set_feature_compatibility_version_command.cpp", "set_index_commit_quorum_command.cpp", - 'set_user_write_block_mode_command.cpp', "shutdown_d.cpp", "snapshot_management.cpp", "tenant_migration_donor_cmds.cpp", @@ -597,7 +610,6 @@ env.Library( 'servers', 'set_feature_compatibility_version_idl', 'set_index_commit_quorum_idl', - 'set_user_write_block_mode_idl', 'shell_protocol', 'shutdown_idl', 'standalone', diff --git a/src/mongo/db/commands/set_user_write_block_mode_command.cpp b/src/mongo/db/commands/set_user_write_block_mode_command.cpp index b216597ef3e..3ecf129935a 100644 --- a/src/mongo/db/commands/set_user_write_block_mode_command.cpp +++ b/src/mongo/db/commands/set_user_write_block_mode_command.cpp @@ -32,10 +32,10 @@ #include "mongo/platform/basic.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/catalog_raii.h" #include "mongo/db/commands.h" #include "mongo/db/commands/set_user_write_block_mode_gen.h" -#include "mongo/db/s/global_user_write_block_state.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/user_writes_recoverable_critical_section_service.h" #include "mongo/logv2/log.h" @@ -62,12 +62,29 @@ public: void typedRun(OperationContext* opCtx) { { - Lock::GlobalLock lk(opCtx, MODE_X); if (request().getGlobal()) { - GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); + UserWritesRecoverableCriticalSectionService::get(opCtx) + ->acquireRecoverableCriticalSectionBlockingUserWrites( + opCtx, + UserWritesRecoverableCriticalSectionService:: + kGlobalUserWritesNamespace); } else { - GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); + UserWritesRecoverableCriticalSectionService::get(opCtx) + ->releaseRecoverableCriticalSection( + opCtx, + UserWritesRecoverableCriticalSectionService:: + kGlobalUserWritesNamespace); } + + // Wait for the writes to the UserWritesRecoverableCriticalSection collection to be + // majority commited. + auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient()); + WriteConcernResult writeConcernResult; + WriteConcernOptions majority(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kWriteConcernTimeoutUserCommand); + uassertStatusOK(waitForWriteConcern( + opCtx, replClient.getLastOp(), majority, &writeConcernResult)); } } diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 40dce12ff7b..ec4d89e756f 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -148,6 +148,9 @@ const NamespaceString NamespaceString::kConfigImagesNamespace(NamespaceString::k const NamespaceString NamespaceString::kConfigsvrCoordinatorsNamespace( NamespaceString::kConfigDb, "sharding_configsvr_coordinators"); +const NamespaceString NamespaceString::kUserWritesCriticalSectionsNamespace( + NamespaceString::kConfigDb, "user_writes_critical_sections"); + bool NamespaceString::isListCollectionsCursorNS() const { return coll() == listCollectionsCursorCol; } diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index d30348da0d5..c1d01550f90 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -207,6 +207,9 @@ public: // Namespace used for persisting ConfigsvrCoordinator state documents. static const NamespaceString kConfigsvrCoordinatorsNamespace; + // Namespace for storing user write blocking critical section documents + static const NamespaceString kUserWritesCriticalSectionsNamespace; + /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 531dbc57111..567b232862b 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -167,6 +167,22 @@ env.Library( ) env.Library( + target='user_writes_recoverable_critical_section', + source=[ + 'user_writes_critical_section_document.idl', + 'user_writes_recoverable_critical_section_service.cpp', + ], + LIBDEPS=[ + 'sharding_api_d', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/repl/replica_set_aware_service', + '$BUILD_DIR/mongo/db/rw_concern_d', + ] +) + +env.Library( target='transaction_coordinator', source=[ 'server_transaction_coordinators_metrics.cpp', diff --git a/src/mongo/db/s/global_user_write_block_state.cpp b/src/mongo/db/s/global_user_write_block_state.cpp index 2196206fe2d..9843abf90d6 100644 --- a/src/mongo/db/s/global_user_write_block_state.cpp +++ b/src/mongo/db/s/global_user_write_block_state.cpp @@ -48,12 +48,10 @@ GlobalUserWriteBlockState* GlobalUserWriteBlockState::get(OperationContext* opCt } void GlobalUserWriteBlockState::enableUserWriteBlocking(OperationContext* opCtx) { - invariant(opCtx->lockState()->isLockHeldForMode(resourceIdGlobal, MODE_X)); _globalUserWritesBlocked = true; } void GlobalUserWriteBlockState::disableUserWriteBlocking(OperationContext* opCtx) { - invariant(opCtx->lockState()->isLockHeldForMode(resourceIdGlobal, MODE_X)); _globalUserWritesBlocked = false; } diff --git a/src/mongo/db/s/global_user_write_block_state.h b/src/mongo/db/s/global_user_write_block_state.h index 72d8ce957b6..8325b717b15 100644 --- a/src/mongo/db/s/global_user_write_block_state.h +++ b/src/mongo/db/s/global_user_write_block_state.h @@ -42,8 +42,7 @@ public: static GlobalUserWriteBlockState* get(OperationContext* opCtx); /** - * Methods to control the global user write blocking state. Callers must be hold the GlobalLock - * in MODE_X. + * Methods to control the global user write blocking state. */ void enableUserWriteBlocking(OperationContext* opCtx); void disableUserWriteBlocking(OperationContext* opCtx); @@ -55,8 +54,6 @@ public: void checkUserWritesAllowed(OperationContext* opCtx, const NamespaceString& nss) const; private: - // Modifying the state below requires holding the GlobalLock in X mode; holding the DBLock in - // any mode is acceptable for reading it. bool _globalUserWritesBlocked{false}; }; diff --git a/src/mongo/db/s/user_writes_critical_section_document.idl b/src/mongo/db/s/user_writes_critical_section_document.idl new file mode 100644 index 00000000000..10b642c5964 --- /dev/null +++ b/src/mongo/db/s/user_writes_critical_section_document.idl @@ -0,0 +1,56 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file defines the format of documents stored in config.user_writes_critical_sections. +# Each document is used to represent that user writes are blocked, for a particular collection or +# globally. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + userWriteBlockingCriticalSectionDocument: + description: "Represents an ongoing user-writes blocking critical section." + generate_comparison_operators: false + strict: false + fields: + _id: + type: namespacestring + description: "The namespace of the collection that is under the collection critical + section. Empty namespacestring indicates global write blocking to all + collections." + cpp_name: nss + blockNewUserShardedDDL: + type: bool + default: false + blockUserWrites: + type: bool + default: false diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp new file mode 100644 index 00000000000..6215032f7b0 --- /dev/null +++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp @@ -0,0 +1,353 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/user_writes_recoverable_critical_section_service.h" + +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/s/global_user_write_block_state.h" +#include "mongo/db/s/user_writes_critical_section_document_gen.h" +#include "mongo/logv2/log.h" +#include "mongo/s/catalog/sharding_catalog_client.h" + +namespace mongo { + +namespace { +const auto serviceDecorator = + ServiceContext::declareDecoration<UserWritesRecoverableCriticalSectionService>(); + +BSONObj findRecoverableCriticalSectionDoc(OperationContext* opCtx, const NamespaceString& nss) { + DBDirectClient dbClient(opCtx); + + const auto queryNss = + BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()); + FindCommandRequest findRequest{NamespaceString::kUserWritesCriticalSectionsNamespace}; + findRequest.setFilter(queryNss); + return dbClient.findOne(std::move(findRequest)); +} + +void setBlockUserWritesDocumentField(OperationContext* opCtx, + const NamespaceString& nss, + bool blockUserWrites) { + PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( + NamespaceString::kUserWritesCriticalSectionsNamespace); + store.update( + opCtx, + BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()), + BSON("$set" << BSON(UserWriteBlockingCriticalSectionDocument::kBlockUserWritesFieldName + << blockUserWrites)), + ShardingCatalogClient::kLocalWriteConcern); +} + +void acquireRecoverableCriticalSection(OperationContext* opCtx, + const NamespaceString& nss, + bool blockShardedDDL, + bool blockUserWrites) { + LOGV2_DEBUG(6351900, + 3, + "Acquiring user writes recoverable critical section", + "namespace"_attr = nss, + "blockShardedDDL"_attr = blockShardedDDL, + "blockUserWrites"_attr = blockUserWrites); + + invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace); + invariant(!opCtx->lockState()->isLocked()); + + { + // If we intend to start blocking user writes, take the GlobalLock in MODE_X in order to + // ensure that any ongoing writes have completed. + Lock::GlobalLock globalLock(opCtx, blockUserWrites ? MODE_X : MODE_IX); + + const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss); + if (!bsonObj.isEmpty()) { + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("AcquireUserWritesCS"), bsonObj); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot acquire user writes critical section with different " + "options than the already existing one. blockShardedDDL: " + << blockShardedDDL + << ", current: " << collCSDoc.getBlockNewUserShardedDDL(), + !blockShardedDDL || collCSDoc.getBlockNewUserShardedDDL() == blockShardedDDL); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot acquire user writes critical section with different " + "options than the already existing one. blockUserWrites: " + << blockUserWrites + << ", current: " << collCSDoc.getBlockNewUserShardedDDL(), + !blockUserWrites || collCSDoc.getBlockUserWrites() == blockUserWrites); + + LOGV2_DEBUG(6351914, + 3, + "The user writes recoverable critical section was already acquired", + "namespace"_attr = nss); + return; + } + + // Acquire the critical section by inserting the critical section document. The OpObserver + // will take the in-memory CS when reacting to the insert event. + UserWriteBlockingCriticalSectionDocument newDoc(nss); + newDoc.setBlockNewUserShardedDDL(blockShardedDDL); + newDoc.setBlockUserWrites(blockUserWrites); + + PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( + NamespaceString::kUserWritesCriticalSectionsNamespace); + store.add(opCtx, newDoc, ShardingCatalogClient::kLocalWriteConcern); + } + + LOGV2_DEBUG(6351901, + 2, + "Acquired user writes recoverable critical section", + "namespace"_attr = nss, + "blockShardedDDL"_attr = blockShardedDDL, + "blockUserWrites"_attr = blockUserWrites); +} +} // namespace + +const NamespaceString UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace = + NamespaceString(); + +UserWritesRecoverableCriticalSectionService* UserWritesRecoverableCriticalSectionService::get( + ServiceContext* serviceContext) { + return &serviceDecorator(serviceContext); +} + +UserWritesRecoverableCriticalSectionService* UserWritesRecoverableCriticalSectionService::get( + OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +const ReplicaSetAwareServiceRegistry::Registerer<UserWritesRecoverableCriticalSectionService> + UserWritesRecoverableCriticalSectionServiceServiceRegisterer( + "UserWritesRecoverableCriticalSectionService"); + +bool UserWritesRecoverableCriticalSectionService::shouldRegisterReplicaSetAwareService() const { + return serverGlobalParams.clusterRole != ClusterRole::ConfigServer; +} + +void UserWritesRecoverableCriticalSectionService:: + acquireRecoverableCriticalSectionBlockingUserWrites(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(serverGlobalParams.clusterRole == ClusterRole::None, + "Acquiring the user writes recoverable critical section directly to start blocking " + "writes is only allowed on non-sharded cluster."); + + acquireRecoverableCriticalSection( + opCtx, nss, false /* blockShardedDDL */, true /* blockUserWrites */); +} + +void UserWritesRecoverableCriticalSectionService:: + acquireRecoverableCriticalSectionBlockNewShardedDDL(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(serverGlobalParams.clusterRole != ClusterRole::None, + "Acquiring the user writes recoverable critical section blocking only sharded DDL is " + "only allowed on sharded clusters"); + + // Take the user writes critical section blocking only ShardingDDLCoordinators. + acquireRecoverableCriticalSection( + opCtx, nss, true /* blockShardedDDL */, false /* blockUserWrites */); +} + +void UserWritesRecoverableCriticalSectionService:: + promoteRecoverableCriticalSectionToBlockUserWrites(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(serverGlobalParams.clusterRole != ClusterRole::None, + "Promoting the user writes recoverable critical section to also block user writes is " + "only allowed on sharded clusters"); + + LOGV2_DEBUG(6351902, + 3, + "Promoting user writes recoverable critical section to also block reads", + "namespace"_attr = nss); + + invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace); + invariant(!opCtx->lockState()->isLocked()); + + { + // Take the GlobalLock in MODE_X in order to ensure that any ongoing writes have completed + // before starting to block new writes. + Lock::GlobalLock globalLock(opCtx, MODE_X); + + const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss); + uassert(ErrorCodes::IllegalOperation, + "Cannot promote user writes critical section to block user writes if critical " + "section document not persisted first.", + !bsonObj.isEmpty()); + + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("PromoteUserWritesCS"), bsonObj); + + uassert(ErrorCodes::IllegalOperation, + "Cannot promote user writes critical section to block user writes if sharded DDL " + "operations have not been blocked first.", + collCSDoc.getBlockNewUserShardedDDL()); + + // If we are already blocking user writes, then we are done. + if (collCSDoc.getBlockUserWrites()) { + LOGV2_DEBUG(6351903, + 3, + "The user writes recoverable critical section was already promoted to also " + "block user " + "writes, do nothing", + "namespace"_attr = nss); + return; + } + + // Promote the critical section to block also user writes by updating the critical section + // document. The OpObserver will promote the in-memory CS when reacting to the update event. + setBlockUserWritesDocumentField(opCtx, nss, true /* blockUserWrites */); + } + + LOGV2_DEBUG(6351904, + 2, + "Promoted user writes recoverable critical section to also block user writes", + "namespace"_attr = nss); +} + +void UserWritesRecoverableCriticalSectionService:: + demoteRecoverableCriticalSectionToNoLongerBlockUserWrites(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(serverGlobalParams.clusterRole != ClusterRole::None, + "Demoting the user writes recoverable critical section to also block user writes is " + "only allowed on sharded clusters"); + + LOGV2_DEBUG(6351905, + 3, + "Demoting user writes recoverable critical section to no longer block user writes", + "namespace"_attr = nss); + + invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace); + invariant(!opCtx->lockState()->isLocked()); + + { + Lock::GlobalLock globalLock(opCtx, MODE_IX); + + const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss); + // If the critical section is not taken, then we are done. + if (bsonObj.isEmpty()) { + LOGV2_DEBUG( + 6351906, + 3, + "The user writes recoverable critical section was not currently taken, do nothing", + "namespace"_attr = nss); + return; + } + + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("DemoteUserWritesCS"), bsonObj); + + // If we are not currently blocking user writes, then we are done. + if (!collCSDoc.getBlockUserWrites()) { + LOGV2_DEBUG(6351907, + 3, + "The user writes recoverable critical section was already not blocking " + "user writes, do nothing", + "namespace"_attr = nss); + return; + } + + // Demote the critical section to block also user writes by updating the critical section + // document. The OpObserver will demote the in-memory CS when reacting to the update event. + setBlockUserWritesDocumentField(opCtx, nss, false /* blockUserWrites */); + } + + LOGV2_DEBUG(6351908, + 2, + "Demoted user writes recoverable critical section to no longer block user writes", + "namespace"_attr = nss); +} + + +void UserWritesRecoverableCriticalSectionService::releaseRecoverableCriticalSection( + OperationContext* opCtx, const NamespaceString& nss) { + LOGV2_DEBUG( + 6351909, 3, "Releasing user writes recoverable critical section", "namespace"_attr = nss); + + invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace); + invariant(!opCtx->lockState()->isLocked()); + + { + Lock::GlobalLock globalLock(opCtx, MODE_IX); + + const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss); + + // If there is no persisted document, then we are done. + if (bsonObj.isEmpty()) { + LOGV2_DEBUG( + 6351910, + 3, + "The user writes recoverable critical section was already released, do nothing", + "namespace"_attr = nss); + return; + } + + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("ReleaseUserWritesCS"), bsonObj); + + // Release the critical section by deleting the critical section document. The OpObserver + // will release the in-memory CS when reacting to the delete event. + PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( + NamespaceString::kUserWritesCriticalSectionsNamespace); + store.remove( + opCtx, + BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()), + ShardingCatalogClient::kLocalWriteConcern); + } + + LOGV2_DEBUG( + 6351911, 2, "Released user writes recoverable critical section", "namespace"_attr = nss); +} + +void UserWritesRecoverableCriticalSectionService::recoverRecoverableCriticalSections( + OperationContext* opCtx) { + LOGV2_DEBUG(6351912, 2, "Recovering all user writes recoverable critical sections"); + + GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); + + // Read the persisted critical section documents and restore the state into memory. + PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( + NamespaceString::kUserWritesCriticalSectionsNamespace); + store.forEach(opCtx, BSONObj{}, [&opCtx](const UserWriteBlockingCriticalSectionDocument& doc) { + invariant(doc.getNss().isEmpty()); + if (doc.getBlockUserWrites()) { + GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); + } + + return true; + }); + + LOGV2_DEBUG(6351913, 2, "Recovered all user writes recoverable critical sections"); +} + +} // namespace mongo diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.h b/src/mongo/db/s/user_writes_recoverable_critical_section_service.h new file mode 100644 index 00000000000..a08756b4a93 --- /dev/null +++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.h @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/replica_set_aware_service.h" +#include "mongo/db/service_context.h" + +namespace mongo { + +/** + * Represents the 'user writes blocking' critical section. The critical section status is persisted + * on disk and it's in-memory representation is kept in sync with the persisted state through an + * OpObserver that reacts to the inserts/updates/deletes on the persisted document. + * + * On replicaSets, enable blocking is depicted by transition (1); and disable blocking is depicted + * by transition (2): + * + * (1) acquireRecoverableCriticalSectionBlockingUserWrites() + * --------------------------------------------------------- + * | | + * | v + * + --------------------+ + --------------------+ + * | User writes allowed | | User writes blocked | + * + --------------------+ + --------------------+ + * ^ ^ + * | | + * --------------------------------------------------------- + * (2) releaseRecoverableCriticalSection() + * + * On sharded clusters, blocking/unblocking happens as a two-phase protocol. Enable blocking is + * depicted by transitions (1) and (2); and disable blocking is depicted by (3) and (4): + * + * (2) promoteRecoverableCriticalSectionToBlockUserWrites() + * ------------------------ + * | | + * (1) acquireRecoverableCriticalSectionBlockNewShardedDDL() | | + * --------------------------------- | | + * | | | | + * | V | v + * + -------------------------+ + -------------------------+ + -------------------------+ + * | User writes allowed, | | User writes allowed, | | User writes blocked | + * | User sharded DDL allowed | | User sharded DDL blocked | | User sharded DDL blocked | + * + -------------------------+ + -------------------------+ + -------------------------+ + * ^ ^ | + * | | | | + * -------------------------------- | | + * (4) releaseRecoverableCriticalSection() | | + * | | + * ----------------------------- + * (3) demoteRecoverableCriticalSectionToNoLongerBlockUserWrites() + * + */ +class UserWritesRecoverableCriticalSectionService + : public ReplicaSetAwareService<UserWritesRecoverableCriticalSectionService> { +public: + static const NamespaceString kGlobalUserWritesNamespace; + + UserWritesRecoverableCriticalSectionService() = default; + + static UserWritesRecoverableCriticalSectionService* get(ServiceContext* serviceContext); + static UserWritesRecoverableCriticalSectionService* get(OperationContext* opCtx); + + bool shouldRegisterReplicaSetAwareService() const override; + + /** + * Acquires the user writes critical section blocking user writes. This should be used only on + * replica sets. + */ + void acquireRecoverableCriticalSectionBlockingUserWrites(OperationContext* opCtx, + const NamespaceString& nss); + + /** + * Acquires the user writes critical section blocking only new sharded DDL operations, but not + * user writes nor local DDL. This is a 'prepare' state before user writes and local DDL can be + * blocked on sharded clusters. + */ + void acquireRecoverableCriticalSectionBlockNewShardedDDL(OperationContext* opCtx, + const NamespaceString& nss); + + /** + * Promotes a user writes critical section that is in the 'prepare' state (i.e. only blocking + * sharded DDL) to start blocking also user writes. This should be run only after all shards in + * the cluster have entered the 'prepare' state. + */ + void promoteRecoverableCriticalSectionToBlockUserWrites(OperationContext* opCtx, + const NamespaceString& nss); + + /** + * Demotes a user writes critical section that is blocking both sharded DDL and user writes to + * only block sharded DDL. This is a preparation step before allowing user writes again on + * sharded clusters. + */ + void demoteRecoverableCriticalSectionToNoLongerBlockUserWrites(OperationContext* opCtx, + const NamespaceString& nss); + + /** + * Releases the user writes critical section, allowing user writes again. On sharded clusters, + * before this method is called all shards must have first demoted their critical sections to no + * longer block user writes. + */ + void releaseRecoverableCriticalSection(OperationContext* opCtx, const NamespaceString& nss); + + /** + * This method is called when we have to mirror the state on disk of the recoverable critical + * section to memory (on startUp or on rollback). + */ + void recoverRecoverableCriticalSections(OperationContext* opCtx); + +private: + void onStartup(OperationContext* opCtx) override final { + recoverRecoverableCriticalSections(opCtx); + } + + void onShutdown() override final {} + void onStepUpBegin(OperationContext* opCtx, long long term) override final {} + void onStepUpComplete(OperationContext* opCtx, long long term) override final {} + void onStepDown() override final {} + void onBecomeArbiter() override final {} +}; + +} // namespace mongo diff --git a/src/mongo/db/user_write_block_mode_op_observer.cpp b/src/mongo/db/user_write_block_mode_op_observer.cpp index 42bd8e863d4..2d7c7de00f2 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.cpp +++ b/src/mongo/db/user_write_block_mode_op_observer.cpp @@ -32,8 +32,20 @@ #include "mongo/db/user_write_block_mode_op_observer.h" #include "mongo/db/s/global_user_write_block_state.h" +#include "mongo/db/s/user_writes_critical_section_document_gen.h" +#include "mongo/db/s/user_writes_recoverable_critical_section_service.h" namespace mongo { +namespace { + +const auto documentIdDecoration = OperationContext::declareDecoration<BSONObj>(); + +bool isStandaloneOrPrimary(OperationContext* opCtx) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + return replCoord->canAcceptWritesForDatabase(opCtx, NamespaceString::kAdminDb); +} + +} // namespace void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx, const NamespaceString& nss, @@ -42,11 +54,78 @@ void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { _checkWriteAllowed(opCtx, nss); + + if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) { + for (auto it = first; it != last; ++it) { + const auto& insertedDoc = it->doc; + + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isReplEnabled() || + (!replCoord->getMemberState().recovering() && + !replCoord->getMemberState().rollback())) { + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("UserWriteBlockOpObserver"), insertedDoc); + opCtx->recoveryUnit()->onCommit( + [opCtx, + blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(), + blockWrites = collCSDoc.getBlockUserWrites(), + insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { + invariant(insertedNss.isEmpty()); + boost::optional<Lock::GlobalLock> globalLockIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + globalLockIfNotPrimary.emplace(opCtx, MODE_IX); + } + if (blockWrites) { + GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); + } + }); + } + } + } } void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { _checkWriteAllowed(opCtx, args.nss); + + if (args.nss == NamespaceString::kUserWritesCriticalSectionsNamespace) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isReplEnabled() || + (!replCoord->getMemberState().recovering() && + !replCoord->getMemberState().rollback())) { + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("UserWriteBlockOpObserver"), args.updateArgs->updatedDoc); + + opCtx->recoveryUnit()->onCommit( + [opCtx, + updatedNss = collCSDoc.getNss(), + blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(), + blockWrites = collCSDoc.getBlockUserWrites(), + insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { + invariant(updatedNss.isEmpty()); + boost::optional<Lock::GlobalLock> globalLockIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + globalLockIfNotPrimary.emplace(opCtx, MODE_IX); + } + + if (blockWrites) { + GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); + } else { + GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); + } + }); + } + } +} + +void UserWriteBlockModeOpObserver::aboutToDelete(OperationContext* opCtx, + NamespaceString const& nss, + const UUID& uuid, + BSONObj const& doc) { + + if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) { + documentIdDecoration(opCtx) = doc; + } } void UserWriteBlockModeOpObserver::onDelete(OperationContext* opCtx, @@ -55,11 +134,47 @@ void UserWriteBlockModeOpObserver::onDelete(OperationContext* opCtx, StmtId stmtId, const OplogDeleteEntryArgs& args) { _checkWriteAllowed(opCtx, nss); + + if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) { + auto& documentId = documentIdDecoration(opCtx); + invariant(!documentId.isEmpty()); + + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isReplEnabled() || + (!replCoord->getMemberState().recovering() && + !replCoord->getMemberState().rollback())) { + const auto& deletedDoc = documentId; + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("UserWriteBlockOpObserver"), deletedDoc); + + opCtx->recoveryUnit()->onCommit( + [opCtx, deletedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { + invariant(deletedNss.isEmpty()); + boost::optional<Lock::GlobalLock> globalLockIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + globalLockIfNotPrimary.emplace(opCtx, MODE_IX); + } + + GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); + }); + } + } +} + +void UserWriteBlockModeOpObserver::_onReplicationRollback(OperationContext* opCtx, + const RollbackObserverInfo& rbInfo) { + if (rbInfo.rollbackNamespaces.find(NamespaceString::kUserWritesCriticalSectionsNamespace) != + rbInfo.rollbackNamespaces.end()) { + UserWritesRecoverableCriticalSectionService::get(opCtx)->recoverRecoverableCriticalSections( + opCtx); + } } void UserWriteBlockModeOpObserver::_checkWriteAllowed(OperationContext* opCtx, const NamespaceString& nss) { - GlobalUserWriteBlockState::get(opCtx)->checkUserWritesAllowed(opCtx, nss); + if (isStandaloneOrPrimary(opCtx)) { + GlobalUserWriteBlockState::get(opCtx)->checkUserWritesAllowed(opCtx, nss); + } } } // namespace mongo diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h index 8da805bb96f..d5ad0a2ecaa 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.h +++ b/src/mongo/db/user_write_block_mode_op_observer.h @@ -100,7 +100,7 @@ public: void aboutToDelete(OperationContext* opCtx, const NamespaceString& nss, const UUID& uuid, - const BSONObj& doc) final {} + const BSONObj& doc) final; void onInternalOpMessage(OperationContext* opCtx, const NamespaceString& nss, @@ -209,8 +209,7 @@ public: const repl::OpTime& newCommitPoint) final {} private: - void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final { - } + void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final; // uasserts that a write to the given namespace is allowed under the current user write blocking // setting. |