diff options
author | Jordi Serra Torrens <jordi.serra-torrens@mongodb.com> | 2022-03-08 08:28:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-08 09:25:08 +0000 |
commit | 4a1f01267e8723f4eba6aa264466fd6b27cb2ab9 (patch) | |
tree | 2c48e0debc899c23554a8162ee0f7da171483ce5 /src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp | |
parent | f70e73ccc422499080cfb4163efe42a64a4e59e1 (diff) | |
download | mongo-4a1f01267e8723f4eba6aa264466fd6b27cb2ab9.tar.gz |
SERVER-63519 Extend RecoverableCriticalSectionService to accommodate global user write blocking
Diffstat (limited to 'src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp')
-rw-r--r-- | src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp new file mode 100644 index 00000000000..6215032f7b0 --- /dev/null +++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp @@ -0,0 +1,353 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/user_writes_recoverable_critical_section_service.h" + +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/s/global_user_write_block_state.h" +#include "mongo/db/s/user_writes_critical_section_document_gen.h" +#include "mongo/logv2/log.h" +#include "mongo/s/catalog/sharding_catalog_client.h" + +namespace mongo { + +namespace { +const auto serviceDecorator = + ServiceContext::declareDecoration<UserWritesRecoverableCriticalSectionService>(); + +BSONObj findRecoverableCriticalSectionDoc(OperationContext* opCtx, const NamespaceString& nss) { + DBDirectClient dbClient(opCtx); + + const auto queryNss = + BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()); + FindCommandRequest findRequest{NamespaceString::kUserWritesCriticalSectionsNamespace}; + findRequest.setFilter(queryNss); + return dbClient.findOne(std::move(findRequest)); +} + +void setBlockUserWritesDocumentField(OperationContext* opCtx, + const NamespaceString& nss, + bool blockUserWrites) { + PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( + NamespaceString::kUserWritesCriticalSectionsNamespace); + store.update( + opCtx, + BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()), + BSON("$set" << BSON(UserWriteBlockingCriticalSectionDocument::kBlockUserWritesFieldName + << blockUserWrites)), + ShardingCatalogClient::kLocalWriteConcern); +} + +void acquireRecoverableCriticalSection(OperationContext* opCtx, + const NamespaceString& nss, + bool blockShardedDDL, + bool blockUserWrites) { + LOGV2_DEBUG(6351900, + 3, + "Acquiring user writes recoverable critical section", + "namespace"_attr = nss, + "blockShardedDDL"_attr = blockShardedDDL, + "blockUserWrites"_attr = blockUserWrites); + + invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace); + invariant(!opCtx->lockState()->isLocked()); + + { + // If we intend to start blocking user writes, take the GlobalLock in MODE_X in order to + // ensure that any ongoing writes have completed. + Lock::GlobalLock globalLock(opCtx, blockUserWrites ? MODE_X : MODE_IX); + + const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss); + if (!bsonObj.isEmpty()) { + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("AcquireUserWritesCS"), bsonObj); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot acquire user writes critical section with different " + "options than the already existing one. blockShardedDDL: " + << blockShardedDDL + << ", current: " << collCSDoc.getBlockNewUserShardedDDL(), + !blockShardedDDL || collCSDoc.getBlockNewUserShardedDDL() == blockShardedDDL); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot acquire user writes critical section with different " + "options than the already existing one. blockUserWrites: " + << blockUserWrites + << ", current: " << collCSDoc.getBlockNewUserShardedDDL(), + !blockUserWrites || collCSDoc.getBlockUserWrites() == blockUserWrites); + + LOGV2_DEBUG(6351914, + 3, + "The user writes recoverable critical section was already acquired", + "namespace"_attr = nss); + return; + } + + // Acquire the critical section by inserting the critical section document. The OpObserver + // will take the in-memory CS when reacting to the insert event. + UserWriteBlockingCriticalSectionDocument newDoc(nss); + newDoc.setBlockNewUserShardedDDL(blockShardedDDL); + newDoc.setBlockUserWrites(blockUserWrites); + + PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( + NamespaceString::kUserWritesCriticalSectionsNamespace); + store.add(opCtx, newDoc, ShardingCatalogClient::kLocalWriteConcern); + } + + LOGV2_DEBUG(6351901, + 2, + "Acquired user writes recoverable critical section", + "namespace"_attr = nss, + "blockShardedDDL"_attr = blockShardedDDL, + "blockUserWrites"_attr = blockUserWrites); +} +} // namespace + +const NamespaceString UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace = + NamespaceString(); + +UserWritesRecoverableCriticalSectionService* UserWritesRecoverableCriticalSectionService::get( + ServiceContext* serviceContext) { + return &serviceDecorator(serviceContext); +} + +UserWritesRecoverableCriticalSectionService* UserWritesRecoverableCriticalSectionService::get( + OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +const ReplicaSetAwareServiceRegistry::Registerer<UserWritesRecoverableCriticalSectionService> + UserWritesRecoverableCriticalSectionServiceServiceRegisterer( + "UserWritesRecoverableCriticalSectionService"); + +bool UserWritesRecoverableCriticalSectionService::shouldRegisterReplicaSetAwareService() const { + return serverGlobalParams.clusterRole != ClusterRole::ConfigServer; +} + +void UserWritesRecoverableCriticalSectionService:: + acquireRecoverableCriticalSectionBlockingUserWrites(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(serverGlobalParams.clusterRole == ClusterRole::None, + "Acquiring the user writes recoverable critical section directly to start blocking " + "writes is only allowed on non-sharded cluster."); + + acquireRecoverableCriticalSection( + opCtx, nss, false /* blockShardedDDL */, true /* blockUserWrites */); +} + +void UserWritesRecoverableCriticalSectionService:: + acquireRecoverableCriticalSectionBlockNewShardedDDL(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(serverGlobalParams.clusterRole != ClusterRole::None, + "Acquiring the user writes recoverable critical section blocking only sharded DDL is " + "only allowed on sharded clusters"); + + // Take the user writes critical section blocking only ShardingDDLCoordinators. + acquireRecoverableCriticalSection( + opCtx, nss, true /* blockShardedDDL */, false /* blockUserWrites */); +} + +void UserWritesRecoverableCriticalSectionService:: + promoteRecoverableCriticalSectionToBlockUserWrites(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(serverGlobalParams.clusterRole != ClusterRole::None, + "Promoting the user writes recoverable critical section to also block user writes is " + "only allowed on sharded clusters"); + + LOGV2_DEBUG(6351902, + 3, + "Promoting user writes recoverable critical section to also block reads", + "namespace"_attr = nss); + + invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace); + invariant(!opCtx->lockState()->isLocked()); + + { + // Take the GlobalLock in MODE_X in order to ensure that any ongoing writes have completed + // before starting to block new writes. + Lock::GlobalLock globalLock(opCtx, MODE_X); + + const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss); + uassert(ErrorCodes::IllegalOperation, + "Cannot promote user writes critical section to block user writes if critical " + "section document not persisted first.", + !bsonObj.isEmpty()); + + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("PromoteUserWritesCS"), bsonObj); + + uassert(ErrorCodes::IllegalOperation, + "Cannot promote user writes critical section to block user writes if sharded DDL " + "operations have not been blocked first.", + collCSDoc.getBlockNewUserShardedDDL()); + + // If we are already blocking user writes, then we are done. + if (collCSDoc.getBlockUserWrites()) { + LOGV2_DEBUG(6351903, + 3, + "The user writes recoverable critical section was already promoted to also " + "block user " + "writes, do nothing", + "namespace"_attr = nss); + return; + } + + // Promote the critical section to block also user writes by updating the critical section + // document. The OpObserver will promote the in-memory CS when reacting to the update event. + setBlockUserWritesDocumentField(opCtx, nss, true /* blockUserWrites */); + } + + LOGV2_DEBUG(6351904, + 2, + "Promoted user writes recoverable critical section to also block user writes", + "namespace"_attr = nss); +} + +void UserWritesRecoverableCriticalSectionService:: + demoteRecoverableCriticalSectionToNoLongerBlockUserWrites(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(serverGlobalParams.clusterRole != ClusterRole::None, + "Demoting the user writes recoverable critical section to also block user writes is " + "only allowed on sharded clusters"); + + LOGV2_DEBUG(6351905, + 3, + "Demoting user writes recoverable critical section to no longer block user writes", + "namespace"_attr = nss); + + invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace); + invariant(!opCtx->lockState()->isLocked()); + + { + Lock::GlobalLock globalLock(opCtx, MODE_IX); + + const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss); + // If the critical section is not taken, then we are done. + if (bsonObj.isEmpty()) { + LOGV2_DEBUG( + 6351906, + 3, + "The user writes recoverable critical section was not currently taken, do nothing", + "namespace"_attr = nss); + return; + } + + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("DemoteUserWritesCS"), bsonObj); + + // If we are not currently blocking user writes, then we are done. + if (!collCSDoc.getBlockUserWrites()) { + LOGV2_DEBUG(6351907, + 3, + "The user writes recoverable critical section was already not blocking " + "user writes, do nothing", + "namespace"_attr = nss); + return; + } + + // Demote the critical section to block also user writes by updating the critical section + // document. The OpObserver will demote the in-memory CS when reacting to the update event. + setBlockUserWritesDocumentField(opCtx, nss, false /* blockUserWrites */); + } + + LOGV2_DEBUG(6351908, + 2, + "Demoted user writes recoverable critical section to no longer block user writes", + "namespace"_attr = nss); +} + + +void UserWritesRecoverableCriticalSectionService::releaseRecoverableCriticalSection( + OperationContext* opCtx, const NamespaceString& nss) { + LOGV2_DEBUG( + 6351909, 3, "Releasing user writes recoverable critical section", "namespace"_attr = nss); + + invariant(nss == UserWritesRecoverableCriticalSectionService::kGlobalUserWritesNamespace); + invariant(!opCtx->lockState()->isLocked()); + + { + Lock::GlobalLock globalLock(opCtx, MODE_IX); + + const auto bsonObj = findRecoverableCriticalSectionDoc(opCtx, nss); + + // If there is no persisted document, then we are done. + if (bsonObj.isEmpty()) { + LOGV2_DEBUG( + 6351910, + 3, + "The user writes recoverable critical section was already released, do nothing", + "namespace"_attr = nss); + return; + } + + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("ReleaseUserWritesCS"), bsonObj); + + // Release the critical section by deleting the critical section document. The OpObserver + // will release the in-memory CS when reacting to the delete event. + PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( + NamespaceString::kUserWritesCriticalSectionsNamespace); + store.remove( + opCtx, + BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()), + ShardingCatalogClient::kLocalWriteConcern); + } + + LOGV2_DEBUG( + 6351911, 2, "Released user writes recoverable critical section", "namespace"_attr = nss); +} + +void UserWritesRecoverableCriticalSectionService::recoverRecoverableCriticalSections( + OperationContext* opCtx) { + LOGV2_DEBUG(6351912, 2, "Recovering all user writes recoverable critical sections"); + + GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); + + // Read the persisted critical section documents and restore the state into memory. + PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( + NamespaceString::kUserWritesCriticalSectionsNamespace); + store.forEach(opCtx, BSONObj{}, [&opCtx](const UserWriteBlockingCriticalSectionDocument& doc) { + invariant(doc.getNss().isEmpty()); + if (doc.getBlockUserWrites()) { + GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); + } + + return true; + }); + + LOGV2_DEBUG(6351913, 2, "Recovered all user writes recoverable critical sections"); +} + +} // namespace mongo |