diff options
author | Sergi Mateo Bellido <sergi.mateo-bellido@mongodb.com> | 2021-05-06 10:17:54 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-12 03:58:06 +0000 |
commit | ad4a810cd00939054d4609b807fcb296d8332cd9 (patch) | |
tree | e3091e6a36bf35729cb9da2ff43dea5b8212abfe | |
parent | a34847b4091f1c86a94df78acee403a973dc9329 (diff) | |
download | mongo-ad4a810cd00939054d4609b807fcb296d8332cd9.tar.gz |
SERVER-56040 Changing the behavior of recoverable critical sections on secondary nodes
* New implementation of recoverable critical sections based on
ReplicasetAwareServices + fixing its usages
21 files changed, 659 insertions, 523 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index cab39ffea0a..ae3e0167ea1 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -83,7 +83,6 @@ #include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/periodic_sharded_index_consistency_checker.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" -#include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/s/transaction_coordinator_service.h" @@ -906,8 +905,6 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook migrationutil::resubmitRangeDeletionsOnStepUp(_service); migrationutil::resumeMigrationCoordinationsOnStepUp(opCtx); - sharding_ddl_util::retakeInMemoryRecoverableCriticalSections(opCtx); - const bool scheduleAsyncRefresh = true; resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh); } else { // unsharded diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 6a33106b528..ee7e5ee055c 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -60,6 +60,7 @@ env.Library( 'range_deletion_task.idl', 'range_deletion_util.cpp', 'read_only_catalog_cache_loader.cpp', + 'recoverable_critical_section_service.cpp', 'resharding/resharding_collection_cloner.cpp', 'resharding/resharding_coordinator_commit_monitor.cpp', 'resharding/resharding_coordinator_observer.cpp', diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index 21d5550a775..8f5d04410ec 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -166,6 +166,10 @@ void CollectionShardingRuntime::enterCriticalSectionCommitPhase(const CSRLock&) _critSec.enterCriticalSectionCommitPhase(); } +void CollectionShardingRuntime::rollbackCriticalSectionCommitPhaseToCatchUpPhase(const CSRLock&) { + _critSec.rollbackCriticalSectionCommitPhaseToCatchUpPhase(); +} + void CollectionShardingRuntime::exitCriticalSection(const CSRLock&) { _critSec.exitCriticalSection(); } diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h index decb27cd1b3..03622031c12 100644 --- a/src/mongo/db/s/collection_sharding_runtime.h +++ b/src/mongo/db/s/collection_sharding_runtime.h @@ -130,6 +130,11 @@ public: void enterCriticalSectionCommitPhase(const CSRLock&); /** + * It transitions the critical section back to the catch up phase. + */ + void rollbackCriticalSectionCommitPhaseToCatchUpPhase(const CSRLock&); + + /** * Method to control the collection's critical secion. Method listed below must be called with * the CSRLock in exclusive mode. * diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 58055904265..3af06c388e7 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -100,6 +100,16 @@ public: builder->appendNumber("rangeDeleterTasks", totalNumberOfRangesScheduledForDeletion); } + std::vector<NamespaceString> getCollectionNames() { + stdx::lock_guard lg(_mutex); + std::vector<NamespaceString> result; + result.reserve(_collections.size()); + for (const auto& [ns, _] : _collections) { + result.emplace_back(ns); + } + return result; + } + private: using CollectionsMap = StringMap<std::shared_ptr<CollectionShardingState>>; @@ -148,6 +158,11 @@ void CollectionShardingState::appendInfoForServerStatus(OperationContext* opCtx, collectionsMap->appendInfoForServerStatus(builder); } +std::vector<NamespaceString> CollectionShardingState::getCollectionNames(OperationContext* opCtx) { + auto& collectionsMap = CollectionShardingStateMap::get(opCtx->getServiceContext()); + return collectionsMap->getCollectionNames(); +} + void CollectionShardingStateFactory::set(ServiceContext* service, std::unique_ptr<CollectionShardingStateFactory> factory) { auto& collectionsMap = CollectionShardingStateMap::get(service); diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index d206a603b78..f4aeafba0c0 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -29,6 +29,8 @@ #pragma once +#include <vector> + #include "mongo/db/logical_time.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/scoped_collection_metadata.h" @@ -72,6 +74,11 @@ public: static CollectionShardingState* get(OperationContext* opCtx, const NamespaceString& nss); /** + * Returns the names of the collections that have a CollectionShardingState. + */ + static std::vector<NamespaceString> getCollectionNames(OperationContext* opCtx); + + /** * Obtain a pointer to the CollectionShardingState that remains safe to access without holding * a collection lock. Should be called instead of the regular get() if no collection lock is * held. The returned CollectionShardingState instance should not be modified! diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 31e113d8af5..da209d46523 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -39,6 +39,7 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/create_collection_coordinator.h" +#include "mongo/db/s/recoverable_critical_section_service.h" #include "mongo/db/s/shard_key_util.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_logging.h" @@ -391,8 +392,12 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( _collectionVersion = _result->getCollectionVersion(); // The collection was already created and commited but there was a // stepdown after the commit. - sharding_ddl_util::releaseRecoverableCriticalSection( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + RecoverableCriticalSectionService::get(opCtx) + ->releaseRecoverableCriticalSection( + opCtx, + nss(), + _critSecReason, + ShardingCatalogClient::kMajorityWriteConcern); return; } @@ -400,8 +405,9 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // calling this method, we need the coordinator document to be persisted (and hence // the kCheck state), otherwise nothing will release the critical section in the // presence of a stepdown. - sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + RecoverableCriticalSectionService::get(opCtx) + ->acquireRecoverableCriticalSectionBlockWrites( + opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); if (_recoveredFromDisk) { auto uuid = sharding_ddl_util::getCollectionUUID(opCtx, nss()); @@ -430,14 +436,18 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // the collection on other shards, this way we prevent // reads/writes that should be redirected to another // shard. - sharding_ddl_util::acquireRecoverableCriticalSectionBlockReads( - opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); + RecoverableCriticalSectionService::get(opCtx) + ->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx, + nss(), + _critSecReason, + ShardingCatalogClient::kMajorityWriteConcern); _createCollectionOnNonPrimaryShards(opCtx); _commit(opCtx); } - sharding_ddl_util::releaseRecoverableCriticalSection( + RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); if (!_splitPolicy->isOptimized()) { @@ -469,32 +479,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); - sharding_ddl_util::releaseRecoverableCriticalSection( + RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); } return status; }); } -void CreateCollectionCoordinator::_interrupt(Status status) noexcept { - // Only free the in memory critical section if we reached the commit phase (in which we might've - // acquired it). - if (_doc.getPhase() >= Phase::kCommit && - (status.isA<ErrorCategory::NotPrimaryError>() || - status.isA<ErrorCategory::ShutdownError>())) { - auto client = cc().getServiceContext()->makeClient("CreateCollectionCleanupClient"); - AlternativeClientRegion acr(client); - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - - auto* const csr = CollectionShardingRuntime::get_UNSAFE(opCtx->getServiceContext(), nss()); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock); - csr->clearFilteringMetadata(opCtx); - } -} - void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) { LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = nss()); diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index cf5dcec69d6..fa5a69f0c14 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -69,8 +69,6 @@ private: ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; - void _interrupt(Status status) noexcept override; - template <typename Func> auto _executePhase(const Phase& newPhase, Func&& func) { return [=] { diff --git a/src/mongo/db/s/recoverable_critical_section_service.cpp b/src/mongo/db/s/recoverable_critical_section_service.cpp new file mode 100644 index 00000000000..1a61edc3e5c --- /dev/null +++ b/src/mongo/db/s/recoverable_critical_section_service.cpp @@ -0,0 +1,401 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include <set> + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/recoverable_critical_section_service.h" + +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/collection_critical_section_document_gen.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/sharding_migration_critical_section.h" +#include "mongo/logv2/log.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/s/write_ops/batched_command_response.h" + +namespace mongo { + +namespace { +const auto serviceDecorator = + ServiceContext::declareDecoration<RecoverableCriticalSectionService>(); +} + +RecoverableCriticalSectionService* RecoverableCriticalSectionService::get( + ServiceContext* serviceContext) { + return &serviceDecorator(serviceContext); +} + +RecoverableCriticalSectionService* RecoverableCriticalSectionService::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +const ReplicaSetAwareServiceRegistry::Registerer<RecoverableCriticalSectionService> + recoverableCriticalSectionServiceServiceRegisterer("RecoverableCriticalSectionService"); + +void RecoverableCriticalSectionService::acquireRecoverableCriticalSectionBlockWrites( + OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& reason, + const WriteConcernOptions& writeConcern, + const boost::optional<BSONObj>& additionalInfo) { + LOGV2_DEBUG(5656600, + 3, + "Acquiring recoverable critical section blocking writes", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); + + invariant(!opCtx->lockState()->isLocked()); + + { + Lock::GlobalLock lk(opCtx, MODE_IX); + AutoGetCollection cCollLock(opCtx, nss, MODE_S); + + DBDirectClient dbClient(opCtx); + auto cursor = dbClient.query( + NamespaceString::kCollectionCriticalSectionsNamespace, + BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString())); + + // if there is a doc with the same nss -> in order to not fail it must have the same reason + if (cursor->more()) { + const auto bsonObj = cursor->next(); + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("AcquireRecoverableCSBW"), bsonObj); + + invariant(collCSDoc.getReason().woCompare(reason) == 0, + str::stream() + << "Trying to acquire a critical section blocking writes for namespace " + << nss << " and reason " << reason + << " but it is already taken by another operation with different reason " + << collCSDoc.getReason()); + + LOGV2_DEBUG( + 5656601, + 3, + "The recoverable critical section was already acquired to block writes, do nothing", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); + + return; + } + + // The collection critical section is not taken, try to acquire it. + + // The following code will try to add a doc to config.criticalCollectionSections: + // - If everything goes well, the shard server op observer will acquire the in-memory CS. + // - Otherwise this call will fail and the CS won't be taken (neither persisted nor in-mem) + CollectionCriticalSectionDocument newDoc(nss, reason, false /* blockReads */); + newDoc.setAdditionalInfo(additionalInfo); + + const auto commandResponse = dbClient.runCommand([&] { + write_ops::InsertCommandRequest insertOp( + NamespaceString::kCollectionCriticalSectionsNamespace); + insertOp.setDocuments({newDoc.toBSON()}); + return insertOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + BatchedCommandResponse batchedResponse; + std::string unusedErrmsg; + batchedResponse.parseBSON(commandReply, &unusedErrmsg); + invariant(batchedResponse.getN() > 0, + str::stream() << "Insert did not add any doc to collection " + << NamespaceString::kCollectionCriticalSectionsNamespace + << " for namespace " << nss << " and reason " << reason); + } + + WriteConcernResult ignoreResult; + const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult)); + + LOGV2_DEBUG(5656602, + 2, + "Acquired recoverable critical section blocking writes", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); +} + +void RecoverableCriticalSectionService::promoteRecoverableCriticalSectionToBlockAlsoReads( + OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& reason, + const WriteConcernOptions& writeConcern) { + LOGV2_DEBUG(5656603, + 3, + "Promoting recoverable critical section to also block reads", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); + + invariant(!opCtx->lockState()->isLocked()); + + { + AutoGetCollection cCollLock(opCtx, nss, MODE_X); + + DBDirectClient dbClient(opCtx); + auto cursor = dbClient.query( + NamespaceString::kCollectionCriticalSectionsNamespace, + BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString())); + + invariant( + cursor->more(), + str::stream() << "Trying to acquire a critical section blocking reads for namespace " + << nss << " and reason " << reason + << " but the critical section wasn't acquired first blocking writers."); + BSONObj bsonObj = cursor->next(); + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("AcquireRecoverableCSBR"), bsonObj); + + invariant( + collCSDoc.getReason().woCompare(reason) == 0, + str::stream() << "Trying to acquire a critical section blocking reads for namespace " + << nss << " and reason " << reason + << " but it is already taken by another operation with different reason " + << collCSDoc.getReason()); + + // if there is a document with the same nss, reason and blocking reads -> do nothing, the CS + // is already taken! + if (collCSDoc.getBlockReads()) { + LOGV2_DEBUG(5656604, + 3, + "The recoverable critical section was already promoted to also block " + "reads, do nothing", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); + return; + } + + // The CS is in the catch-up phase, try to advance it to the commit phase. + + // The following code will try to update a doc from config.criticalCollectionSections: + // - If everything goes well, the shard server op observer will advance the in-memory CS to + // the + // commit phase (blocking readers). + // - Otherwise this call will fail and the CS won't be advanced (neither persisted nor + // in-mem) + auto commandResponse = dbClient.runCommand([&] { + const auto query = BSON( + CollectionCriticalSectionDocument::kNssFieldName + << nss.toString() << CollectionCriticalSectionDocument::kReasonFieldName << reason); + const auto update = BSON( + "$set" << BSON(CollectionCriticalSectionDocument::kBlockReadsFieldName << true)); + + write_ops::UpdateCommandRequest updateOp( + NamespaceString::kCollectionCriticalSectionsNamespace); + auto updateModification = write_ops::UpdateModification::parseFromClassicUpdate(update); + write_ops::UpdateOpEntry updateEntry(query, updateModification); + updateOp.setUpdates({updateEntry}); + + return updateOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + BatchedCommandResponse batchedResponse; + std::string unusedErrmsg; + batchedResponse.parseBSON(commandReply, &unusedErrmsg); + invariant(batchedResponse.getNModified() > 0, + str::stream() << "Update did not modify any doc from collection " + << NamespaceString::kCollectionCriticalSectionsNamespace + << " for namespace " << nss << " and reason " << reason); + } + + WriteConcernResult ignoreResult; + const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult)); + + LOGV2_DEBUG(5656605, + 2, + "Promoted recoverable critical section to also block reads", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); +} + +void RecoverableCriticalSectionService::releaseRecoverableCriticalSection( + OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& reason, + const WriteConcernOptions& writeConcern) { + LOGV2_DEBUG(5656606, + 3, + "Releasing recoverable critical section", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); + + invariant(!opCtx->lockState()->isLocked()); + + { + AutoGetCollection collLock(opCtx, nss, MODE_X); + + DBDirectClient dbClient(opCtx); + + const auto queryNss = + BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()); + auto cursor = + dbClient.query(NamespaceString::kCollectionCriticalSectionsNamespace, queryNss); + + // if there is no document with the same nss -> do nothing! + if (!cursor->more()) { + LOGV2_DEBUG(5656607, + 3, + "The recoverable critical section was already released, do nothing", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); + return; + } + + BSONObj bsonObj = cursor->next(); + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("ReleaseRecoverableCS"), bsonObj); + + invariant( + collCSDoc.getReason().woCompare(reason) == 0, + str::stream() << "Trying to release a critical for namespace " << nss << " and reason " + << reason + << " but it is already taken by another operation with different reason " + << collCSDoc.getReason()); + + + // The collection critical section is taken (in any phase), try to release it. + + // The following code will try to remove a doc from config.criticalCollectionSections: + // - If everything goes well, the shard server op observer will release the in-memory CS + // - Otherwise this call will fail and the CS won't be released (neither persisted nor + // in-mem) + + auto commandResponse = dbClient.runCommand([&] { + write_ops::DeleteCommandRequest deleteOp( + NamespaceString::kCollectionCriticalSectionsNamespace); + + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(queryNss); + entry.setMulti(true); + return entry; + }()}); + + return deleteOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + BatchedCommandResponse batchedResponse; + std::string unusedErrmsg; + batchedResponse.parseBSON(commandReply, &unusedErrmsg); + invariant(batchedResponse.getN() > 0, + str::stream() << "Delete did not remove any doc from collection " + << NamespaceString::kCollectionCriticalSectionsNamespace + << " for namespace " << nss << " and reason " << reason); + } + + WriteConcernResult ignoreResult; + const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult)); + + LOGV2_DEBUG(5656608, + 2, + "Released recoverable critical section", + "namespace"_attr = nss, + "reason"_attr = reason, + "writeConcern"_attr = writeConcern); +} + +void RecoverableCriticalSectionService::recoverRecoverableCriticalSections( + OperationContext* opCtx) { + LOGV2_DEBUG(5604000, 2, "Recovering all recoverable critical sections"); + + std::set<NamespaceString> nssPresentOnDisk; + PersistentTaskStore<CollectionCriticalSectionDocument> store( + NamespaceString::kCollectionCriticalSectionsNamespace); + store.forEach( + opCtx, Query{}, [&opCtx, &nssPresentOnDisk](const CollectionCriticalSectionDocument& doc) { + const auto& nss = doc.getNss(); + { + AutoGetCollection collLock(opCtx, nss, MODE_X); + auto* const csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + + const bool blockingWritesCS = (bool)csr->getCriticalSectionSignal( + opCtx, ShardingMigrationCriticalSection::Operation::kWrite); + + const bool blockingReadsAndWritesCS = blockingWritesCS && + csr->getCriticalSectionSignal( + opCtx, ShardingMigrationCriticalSection::Operation::kRead); + + if (!doc.getBlockReads()) { + if (!blockingWritesCS) + csr->enterCriticalSectionCatchUpPhase(csrLock); + else if (blockingReadsAndWritesCS) + csr->rollbackCriticalSectionCommitPhaseToCatchUpPhase(csrLock); + } else { + if (!blockingWritesCS) + csr->enterCriticalSectionCatchUpPhase(csrLock); + if (!blockingReadsAndWritesCS) + csr->enterCriticalSectionCommitPhase(csrLock); + } + + nssPresentOnDisk.insert(nss); + + return true; + } + }); + + // Release in-memory CS that are not present on the disk + const auto collectionNames = CollectionShardingState::getCollectionNames(opCtx); + for (const auto& collName : collectionNames) { + if (nssPresentOnDisk.find(collName) != nssPresentOnDisk.end()) + continue; + + AutoGetCollection collLock(opCtx, collName, MODE_X); + auto* const csr = CollectionShardingRuntime::get(opCtx, collName); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + csr->exitCriticalSection(csrLock); + } + + LOGV2_DEBUG(5604001, 2, "Recovered all recoverable critical sections"); +} + +} // namespace mongo diff --git a/src/mongo/db/s/recoverable_critical_section_service.h b/src/mongo/db/s/recoverable_critical_section_service.h new file mode 100644 index 00000000000..f4d7f94820a --- /dev/null +++ b/src/mongo/db/s/recoverable_critical_section_service.h @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/replica_set_aware_service.h" +#include "mongo/db/service_context.h" +#include "mongo/db/write_concern_options.h" + +namespace mongo { + +class RecoverableCriticalSectionService + : public ReplicaSetAwareServiceShardSvr<RecoverableCriticalSectionService> { + +public: + RecoverableCriticalSectionService() = default; + + static RecoverableCriticalSectionService* get(ServiceContext* serviceContext); + static RecoverableCriticalSectionService* get(OperationContext* opCtx); + + /** + * Acquires the collection critical section in the catch-up phase (i.e. blocking writes) for the + * specified namespace and reason. It works even if the namespace's current metadata are + * UNKNOWN. + * + * It adds a doc to config.collectionCriticalSections with with writeConcern write concern. + * + * Do nothing if the collection critical section is taken for that nss and reason, and will + * invariant otherwise since it is the responsibility of the caller to ensure that only one + * thread is taking the critical section. + */ + void acquireRecoverableCriticalSectionBlockWrites( + OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& reason, + const WriteConcernOptions& writeConcern, + const boost::optional<BSONObj>& additionalInfo = boost::none); + + /** + * Advances the recoverable critical section from the catch-up phase (i.e. blocking writes) to + * the commit phase (i.e. blocking reads) for the specified nss and reason. The recoverable + * critical section must have been acquired first through + * 'acquireRecoverableCriticalSectionBlockWrites' function. + * + * It updates a doc from config.collectionCriticalSections with writeConcern write concern. + * + * Do nothing if the collection critical section is already taken in commit phase. + */ + void promoteRecoverableCriticalSectionToBlockAlsoReads(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& reason, + const WriteConcernOptions& writeConcern); + /** + * Releases the recoverable critical section for the given nss and reason. + * + * It removes a doc from config.collectionCriticalSections with writeConcern write concern. + * + * Do nothing if the collection critical section is not taken for that nss and reason. + */ + void releaseRecoverableCriticalSection(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& reason, + const WriteConcernOptions& writeConcern); + + + /** + * This method is called when we have to mirror the state on disk of the recoverable critical + * section to memory (on startUp or on rollback). + */ + void recoverRecoverableCriticalSections(OperationContext* opCtx); + +private: + void onStartup(OperationContext* opCtx) override final { + recoverRecoverableCriticalSections(opCtx); + } + + void onShutdown() override final {} + void onStepUpBegin(OperationContext* opCtx, long long term) override final {} + void onStepUpComplete(OperationContext* opCtx, long long term) override final {} + void onStepDown() override final {} + void onBecomeArbiter() override final {} +}; + +} // namespace mongo diff --git a/src/mongo/db/s/rename_collection_participant_service.cpp b/src/mongo/db/s/rename_collection_participant_service.cpp index c3ac3a04042..f987d7d9d90 100644 --- a/src/mongo/db/s/rename_collection_participant_service.cpp +++ b/src/mongo/db/s/rename_collection_participant_service.cpp @@ -39,6 +39,7 @@ #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/range_deletion_util.h" +#include "mongo/db/s/recoverable_critical_section_service.h" #include "mongo/db/s/rename_collection_participant_service.h" #include "mongo/db/s/shard_metadata_util.h" #include "mongo/db/s/sharding_ddl_util.h" @@ -230,13 +231,14 @@ SemiFuture<void> RenameParticipantInstance::run( BSON("command" << "rename" << "from" << fromNss().toString() << "to" << toNss().toString()); - sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites( + auto service = RecoverableCriticalSectionService::get(opCtx); + service->acquireRecoverableCriticalSectionBlockWrites( opCtx, fromNss(), reason, ShardingCatalogClient::kLocalWriteConcern); - sharding_ddl_util::acquireRecoverableCriticalSectionBlockReads( + service->promoteRecoverableCriticalSectionToBlockAlsoReads( opCtx, fromNss(), reason, ShardingCatalogClient::kLocalWriteConcern); - sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites( + service->acquireRecoverableCriticalSectionBlockWrites( opCtx, toNss(), reason, ShardingCatalogClient::kLocalWriteConcern); - sharding_ddl_util::acquireRecoverableCriticalSectionBlockReads( + service->promoteRecoverableCriticalSectionToBlockAlsoReads( opCtx, toNss(), reason, ShardingCatalogClient::kLocalWriteConcern); snapshotRangeDeletionsForRename(opCtx, fromNss(), toNss()); @@ -290,9 +292,10 @@ SemiFuture<void> RenameParticipantInstance::run( BSON("command" << "rename" << "from" << fromNss().toString() << "to" << toNss().toString()); - sharding_ddl_util::releaseRecoverableCriticalSection( + auto service = RecoverableCriticalSectionService::get(opCtx); + service->releaseRecoverableCriticalSection( opCtx, fromNss(), reason, ShardingCatalogClient::kLocalWriteConcern); - sharding_ddl_util::releaseRecoverableCriticalSection( + service->releaseRecoverableCriticalSection( opCtx, toNss(), reason, ShardingCatalogClient::kMajorityWriteConcern); Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(fromNss()); @@ -347,24 +350,9 @@ void RenameParticipantInstance::interrupt(Status status) noexcept { "toNs"_attr = toNss(), "error"_attr = redact(status)); - auto releaseInMemoryCritSec = [](const NamespaceString& nss) { - auto client = cc().getServiceContext()->makeClient("RenameParticipantCleanupClient"); - AlternativeClientRegion acr(client); - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get_UNSAFE(opCtx->getServiceContext(), nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock); - csr->clearFilteringMetadata(opCtx); - }; - invariant(status.isA<ErrorCategory::NotPrimaryError>() || status.isA<ErrorCategory::ShutdownError>()); - releaseInMemoryCritSec(fromNss()); - releaseInMemoryCritSec(toNss()); _invalidateFutures(status); } diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index d303ad6adb5..606407c81b6 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -46,11 +46,11 @@ #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/recoverable_critical_section_service.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" -#include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/sharding_catalog_client.h" @@ -331,22 +331,6 @@ void ReshardingDonorService::DonorStateMachine::_runMandatoryCleanup(Status stat _completionPromise.setError(status); } } - - // TODO SERVER-56040: this if-stmt must be removed. - if (status.isA<ErrorCategory::NotPrimaryError>() || - status.isA<ErrorCategory::ShutdownError>()) { - auto client = cc().getServiceContext()->makeClient("ReshardingDonorCleanupClient"); - AlternativeClientRegion acr(client); - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - - auto* const csr = CollectionShardingRuntime::get_UNSAFE(opCtx->getServiceContext(), - _metadata.getSourceNss()); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock); - csr->clearFilteringMetadata(opCtx); - } } SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( @@ -431,7 +415,7 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( } if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) { - sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites( + RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( opCtx, _metadata.getSourceNss(), _critSecReason, @@ -441,11 +425,12 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( } if (coordinatorState >= CoordinatorStateEnum::kDecisionPersisted) { - sharding_ddl_util::acquireRecoverableCriticalSectionBlockReads( - opCtx, - _metadata.getSourceNss(), - _critSecReason, - ShardingCatalogClient::kLocalWriteConcern); + RecoverableCriticalSectionService::get(opCtx) + ->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx, + _metadata.getSourceNss(), + _critSecReason, + ShardingCatalogClient::kLocalWriteConcern); ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted); } @@ -680,11 +665,11 @@ void ReshardingDonorService::DonorStateMachine::_transitionToDone() { { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - sharding_ddl_util::releaseRecoverableCriticalSection( - opCtx.get(), - _metadata.getSourceNss(), - _critSecReason, - ShardingCatalogClient::kLocalWriteConcern); + RecoverableCriticalSectionService::get(opCtx.get()) + ->releaseRecoverableCriticalSection(opCtx.get(), + _metadata.getSourceNss(), + _critSecReason, + ShardingCatalogClient::kLocalWriteConcern); } _transitionState(DonorStateEnum::kDone); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 434bb644388..50e2abe232d 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_destination_manager.h" +#include "mongo/db/s/recoverable_critical_section_service.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_applier.h" @@ -51,7 +52,6 @@ #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/shard_key_util.h" -#include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_state.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -282,22 +282,6 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) if (!_completionPromise.getFuture().isReady()) { _completionPromise.setError(status); } - - // TODO SERVER-56040: this if-stmt must be removed. - if (status.isA<ErrorCategory::NotPrimaryError>() || - status.isA<ErrorCategory::ShutdownError>()) { - auto client = cc().getServiceContext()->makeClient("ReshardingRecipientCleanupClient"); - AlternativeClientRegion acr(client); - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - - auto* const csr = CollectionShardingRuntime::get_UNSAFE(opCtx->getServiceContext(), - _metadata.getSourceNss()); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock); - csr->clearFilteringMetadata(opCtx); - } } boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::reportForCurrentOp( @@ -326,7 +310,7 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange _onAbortOrStepdown(lk, status); if (!_isAlsoDonor) { - sharding_ddl_util::releaseRecoverableCriticalSection( + RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( opCtx, _metadata.getSourceNss(), _critSecReason, @@ -533,11 +517,12 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: .then([this] { if (!_isAlsoDonor) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites( - opCtx.get(), - _metadata.getSourceNss(), - _critSecReason, - ShardingCatalogClient::kMajorityWriteConcern); + RecoverableCriticalSectionService::get(opCtx.get()) + ->acquireRecoverableCriticalSectionBlockWrites( + opCtx.get(), + _metadata.getSourceNss(), + _critSecReason, + ShardingCatalogClient::kMajorityWriteConcern); } _transitionState(RecipientStateEnum::kStrictConsistency); @@ -566,11 +551,12 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi if (!_isAlsoDonor) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites( - opCtx.get(), - _metadata.getSourceNss(), - _critSecReason, - ShardingCatalogClient::kLocalWriteConcern); + RecoverableCriticalSectionService::get(opCtx.get()) + ->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx.get(), + _metadata.getSourceNss(), + _critSecReason, + ShardingCatalogClient::kLocalWriteConcern); RenameCollectionOptions options; options.dropTarget = true; @@ -578,11 +564,11 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi uassertStatusOK(renameCollection( opCtx.get(), _metadata.getTempReshardingNss(), _metadata.getSourceNss(), options)); - sharding_ddl_util::releaseRecoverableCriticalSection( - opCtx.get(), - _metadata.getSourceNss(), - _critSecReason, - ShardingCatalogClient::kLocalWriteConcern); + RecoverableCriticalSectionService::get(opCtx.get()) + ->releaseRecoverableCriticalSection(opCtx.get(), + _metadata.getSourceNss(), + _critSecReason, + ShardingCatalogClient::kLocalWriteConcern); } _cleanupReshardingCollections(); diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 99b25f4e82d..73a842da95c 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -44,6 +44,7 @@ #include "mongo/db/s/migration_util.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/range_deletion_task_gen.h" +#include "mongo/db/s/recoverable_critical_section_service.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state.h" @@ -275,15 +276,19 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) { const auto collCSDoc = CollectionCriticalSectionDocument::parse( IDLParserErrorContext("ShardServerOpObserver"), insertedDoc); - if (isStandaloneOrPrimary(opCtx)) { - opCtx->recoveryUnit()->onCommit([opCtx, insertedNss = collCSDoc.getNss()]( - boost::optional<Timestamp>) { + + + opCtx->recoveryUnit()->onCommit( + [opCtx, insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { + boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) + lockCollectionIfNotPrimary.emplace(opCtx, insertedNss, MODE_IX); + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); auto* const csr = CollectionShardingRuntime::get(opCtx, insertedNss); auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); csr->enterCriticalSectionCatchUpPhase(csrLock); }); - } } if (metadata && metadata->isSharded()) { @@ -405,21 +410,17 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE const auto collCSDoc = CollectionCriticalSectionDocument::parse( IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs.updatedDoc); - const auto& updatedNss = collCSDoc.getNss(); + opCtx->recoveryUnit()->onCommit( + [opCtx, updatedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { + boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) + lockCollectionIfNotPrimary.emplace(opCtx, updatedNss, MODE_IX); - if (isStandaloneOrPrimary(opCtx)) { - opCtx->recoveryUnit()->onCommit([opCtx, updatedNss](boost::optional<Timestamp>) { UninterruptibleLockGuard noInterrupt(opCtx->lockState()); auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss); auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); csr->enterCriticalSectionCommitPhase(csrLock); }); - } else { - // Force subsequent uses of the namespace to refresh the filtering metadata so they - // can synchronize with any work happening on the primary (e.g., migration critical - // section). - CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx); - } } auto* const csr = CollectionShardingRuntime::get(opCtx, args.nss); @@ -488,21 +489,24 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, } if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) { - if (isStandaloneOrPrimary(opCtx)) { - const auto deletedNss([&] { - std::string coll; - fassert(5514801, - bsonExtractStringField( - documentId, CollectionCriticalSectionDocument::kNssFieldName, &coll)); - return NamespaceString(coll); - }()); - opCtx->recoveryUnit()->onCommit([opCtx, deletedNss](boost::optional<Timestamp>) { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock); - }); - } + const auto deletedNss([&] { + std::string coll; + fassert(5514801, + bsonExtractStringField( + documentId, CollectionCriticalSectionDocument::kNssFieldName, &coll)); + return NamespaceString(coll); + }()); + + opCtx->recoveryUnit()->onCommit([opCtx, deletedNss](boost::optional<Timestamp>) { + boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) + lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX); + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + csr->exitCriticalSection(csrLock); + }); } } @@ -600,5 +604,13 @@ void ShardServerOpObserver::onCollMod(OperationContext* opCtx, abortOngoingMigrationIfNeeded(opCtx, nss); }; +void ShardServerOpObserver::onReplicationRollback(OperationContext* opCtx, + const RollbackObserverInfo& rbInfo) { + if (rbInfo.rollbackNamespaces.find(NamespaceString::kCollectionCriticalSectionsNamespace) != + rbInfo.rollbackNamespaces.end()) { + RecoverableCriticalSectionService::get(opCtx)->recoverRecoverableCriticalSections(opCtx); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 29b213aebcb..eff1924ffd6 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -194,7 +194,8 @@ public: void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} - void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) {} + void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo); + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) override {} }; diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index a8179143790..87051eb4385 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -109,8 +109,6 @@ void ShardingDDLCoordinator::interrupt(Status status) { "coordinatorId"_attr = _coorMetadata.getId(), "reason"_attr = redact(status)); - _interrupt(status); - // Resolve any unresolved promises to avoid hanging. stdx::lock_guard<Latch> lg(_mutex); if (!_constructionCompletionPromise.getFuture().isReady()) { diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index e28746ff561..852a19f63ac 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -106,10 +106,6 @@ private: virtual ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept = 0; - // TODO SERVER-56040: remove once we have critical section handling and replication on - // secondaries. - virtual void _interrupt(Status status) noexcept {} - void interrupt(Status status) override final; bool _removeDocument(OperationContext* opCtx); diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index fcf5258a65c..f1c0dce4b78 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -35,10 +35,6 @@ #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/persistent_task_store.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/collection_critical_section_document_gen.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_util.h" @@ -49,8 +45,6 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/set_allow_migrations_gen.h" -#include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/s/write_ops/batched_command_response.h" namespace mongo { @@ -297,323 +291,6 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded( return response; } -void acquireRecoverableCriticalSectionBlockWrites(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& reason, - const WriteConcernOptions& writeConcern, - const boost::optional<BSONObj>& additionalInfo) { - LOGV2_DEBUG(5656600, - 3, - "Acquiring recoverable critical section blocking writes", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); - - invariant(!opCtx->lockState()->isLocked()); - - { - Lock::GlobalLock lk(opCtx, MODE_IX); - AutoGetCollection cCollLock(opCtx, nss, MODE_S); - - DBDirectClient dbClient(opCtx); - auto cursor = dbClient.query( - NamespaceString::kCollectionCriticalSectionsNamespace, - BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString())); - - // if there is a doc with the same nss -> in order to not fail it must have the same reason - if (cursor->more()) { - const auto bsonObj = cursor->next(); - const auto collCSDoc = CollectionCriticalSectionDocument::parse( - IDLParserErrorContext("AcquireRecoverableCSBW"), bsonObj); - - invariant(collCSDoc.getReason().woCompare(reason) == 0, - str::stream() - << "Trying to acquire a critical section blocking writes for namespace " - << nss << " and reason " << reason - << " but it is already taken by another operation with different reason " - << collCSDoc.getReason()); - - LOGV2_DEBUG( - 5656601, - 3, - "The recoverable critical section was already acquired to block writes, do nothing", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); - return; - } - - // The collection critical section is not taken, try to acquire it. - - // The following code will try to add a doc to config.criticalCollectionSections: - // - If everything goes well, the shard server op observer will acquire the in-memory CS. - // - Otherwise this call will fail and the CS won't be taken (neither persisted nor in-mem) - CollectionCriticalSectionDocument newDoc(nss, reason, false /* blockReads */); - newDoc.setAdditionalInfo(additionalInfo); - - const auto commandResponse = dbClient.runCommand([&] { - write_ops::InsertCommandRequest insertOp( - NamespaceString::kCollectionCriticalSectionsNamespace); - insertOp.setDocuments({newDoc.toBSON()}); - return insertOp.serialize({}); - }()); - - const auto commandReply = commandResponse->getCommandReply(); - uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); - - BatchedCommandResponse batchedResponse; - std::string unusedErrmsg; - batchedResponse.parseBSON(commandReply, &unusedErrmsg); - invariant(batchedResponse.getN() > 0, - str::stream() << "Insert did not add any doc to collection " - << NamespaceString::kCollectionCriticalSectionsNamespace - << " for namespace " << nss << " and reason " << reason); - } - - WriteConcernResult ignoreResult; - const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult)); - LOGV2_DEBUG(5656602, - 2, - "Acquired recoverable critical section blocking writes", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); -} - -void acquireRecoverableCriticalSectionBlockReads(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& reason, - const WriteConcernOptions& writeConcern) { - LOGV2_DEBUG(5656603, - 3, - "Promoting recoverable critical section to also block reads", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); - - invariant(!opCtx->lockState()->isLocked()); - - { - AutoGetCollection cCollLock(opCtx, nss, MODE_X); - - DBDirectClient dbClient(opCtx); - auto cursor = dbClient.query( - NamespaceString::kCollectionCriticalSectionsNamespace, - BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString())); - - invariant( - cursor->more(), - str::stream() << "Trying to acquire a critical section blocking reads for namespace " - << nss << " and reason " << reason - << " but the critical section wasn't acquired first blocking writers."); - BSONObj bsonObj = cursor->next(); - const auto collCSDoc = CollectionCriticalSectionDocument::parse( - IDLParserErrorContext("AcquireRecoverableCSBR"), bsonObj); - - invariant( - collCSDoc.getReason().woCompare(reason) == 0, - str::stream() << "Trying to acquire a critical section blocking reads for namespace " - << nss << " and reason " << reason - << " but it is already taken by another operation with different reason " - << collCSDoc.getReason()); - - // if there is a document with the same nss, reason and blocking reads -> do nothing, the CS - // is already taken! - if (collCSDoc.getBlockReads()) { - LOGV2_DEBUG(5656604, - 3, - "The recoverable critical section was already promoted to also block " - "reads, do nothing", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); - return; - } - - // The CS is in the catch-up phase, try to advance it to the commit phase. - - // The following code will try to update a doc from config.criticalCollectionSections: - // - If everything goes well, the shard server op observer will advance the in-memory CS to - // the - // commit phase (blocking readers). - // - Otherwise this call will fail and the CS won't be advanced (neither persisted nor - // in-mem) - auto commandResponse = dbClient.runCommand([&] { - const auto query = BSON( - CollectionCriticalSectionDocument::kNssFieldName - << nss.toString() << CollectionCriticalSectionDocument::kReasonFieldName << reason); - const auto update = BSON( - "$set" << BSON(CollectionCriticalSectionDocument::kBlockReadsFieldName << true)); - - write_ops::UpdateCommandRequest updateOp( - NamespaceString::kCollectionCriticalSectionsNamespace); - auto updateModification = write_ops::UpdateModification::parseFromClassicUpdate(update); - write_ops::UpdateOpEntry updateEntry(query, updateModification); - updateOp.setUpdates({updateEntry}); - - return updateOp.serialize({}); - }()); - - const auto commandReply = commandResponse->getCommandReply(); - uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); - - BatchedCommandResponse batchedResponse; - std::string unusedErrmsg; - batchedResponse.parseBSON(commandReply, &unusedErrmsg); - invariant(batchedResponse.getNModified() > 0, - str::stream() << "Update did not modify any doc from collection " - << NamespaceString::kCollectionCriticalSectionsNamespace - << " for namespace " << nss << " and reason " << reason); - } - - WriteConcernResult ignoreResult; - const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult)); - LOGV2_DEBUG(5656605, - 2, - "Promoted recoverable critical section to also block reads", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); -} - -void releaseRecoverableCriticalSection(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& reason, - const WriteConcernOptions& writeConcern) { - LOGV2_DEBUG(5656606, - 3, - "Releasing recoverable critical section", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); - - invariant(!opCtx->lockState()->isLocked()); - - { - AutoGetCollection collLock(opCtx, nss, MODE_X); - - DBDirectClient dbClient(opCtx); - - const auto queryNss = - BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()); - auto cursor = - dbClient.query(NamespaceString::kCollectionCriticalSectionsNamespace, queryNss); - - // if there is no document with the same nss -> do nothing! - if (!cursor->more()) { - LOGV2_DEBUG(5656607, - 3, - "The recoverable critical section was already released, do nothing", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); - return; - } - - BSONObj bsonObj = cursor->next(); - const auto collCSDoc = CollectionCriticalSectionDocument::parse( - IDLParserErrorContext("ReleaseRecoverableCS"), bsonObj); - - invariant( - collCSDoc.getReason().woCompare(reason) == 0, - str::stream() << "Trying to release a critical for namespace " << nss << " and reason " - << reason - << " but it is already taken by another operation with different reason " - << collCSDoc.getReason()); - - - // The collection critical section is taken (in any phase), try to release it. - - // The following code will try to remove a doc from config.criticalCollectionSections: - // - If everything goes well, the shard server op observer will release the in-memory CS - // - Otherwise this call will fail and the CS won't be released (neither persisted nor - // in-mem) - - auto commandResponse = dbClient.runCommand([&] { - write_ops::DeleteCommandRequest deleteOp( - NamespaceString::kCollectionCriticalSectionsNamespace); - - deleteOp.setDeletes({[&] { - write_ops::DeleteOpEntry entry; - entry.setQ(queryNss); - entry.setMulti(true); - return entry; - }()}); - - return deleteOp.serialize({}); - }()); - - const auto commandReply = commandResponse->getCommandReply(); - uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); - - BatchedCommandResponse batchedResponse; - std::string unusedErrmsg; - batchedResponse.parseBSON(commandReply, &unusedErrmsg); - invariant(batchedResponse.getN() > 0, - str::stream() << "Delete did not remove any doc from collection " - << NamespaceString::kCollectionCriticalSectionsNamespace - << " for namespace " << nss << " and reason " << reason); - } - - WriteConcernResult ignoreResult; - const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult)); - - LOGV2_DEBUG(5656608, - 2, - "Released recoverable critical section", - "namespace"_attr = nss, - "reason"_attr = reason, - "writeConcern"_attr = writeConcern); -} - -void retakeInMemoryRecoverableCriticalSections(OperationContext* opCtx) { - - LOGV2_DEBUG(5549400, 2, "Starting re-acquisition of recoverable critical sections"); - - PersistentTaskStore<CollectionCriticalSectionDocument> store( - NamespaceString::kCollectionCriticalSectionsNamespace); - store.forEach(opCtx, Query{}, [&opCtx](const CollectionCriticalSectionDocument& doc) { - const auto& nss = doc.getNss(); - { - // Entering into the catch-up phase: blocking writes - Lock::GlobalLock lk(opCtx, MODE_IX); - AutoGetCollection cCollLock(opCtx, nss, MODE_S); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - - // It may happen that the ReplWriterWorker enters the critical section before drain mode - // upon committing a recoverable critical section oplog entry (SERVER-56104) - if (!csr->getCriticalSectionSignal( - opCtx, ShardingMigrationCriticalSection::Operation::kWrite)) { - csr->enterCriticalSectionCatchUpPhase(csrLock); - } - } - - if (doc.getBlockReads()) { - // Entering into the commit phase: blocking reads - AutoGetCollection cCollLock(opCtx, nss, MODE_X); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - - // It may happen that the ReplWriterWorker enters the critical section before drain mode - // upon committing a recoverable critical section oplog entry (SERVER-56104) - if (!csr->getCriticalSectionSignal( - opCtx, ShardingMigrationCriticalSection::Operation::kRead)) { - csr->enterCriticalSectionCommitPhase(csrLock); - } - - CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx); - } - - return true; - }); - - LOGV2_DEBUG(5549401, 2, "Finished re-acquisition of recoverable critical sections"); -} - void stopMigrations(OperationContext* opCtx, const NamespaceString& nss) { const ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss, false /* allowMigrations */); diff --git a/src/mongo/db/s/sharding_ddl_util.h b/src/mongo/db/s/sharding_ddl_util.h index d7871a1b544..3eb90293551 100644 --- a/src/mongo/db/s/sharding_ddl_util.h +++ b/src/mongo/db/s/sharding_ddl_util.h @@ -27,10 +27,11 @@ * it in the license file. */ +#pragma once + #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" -#include "mongo/db/write_concern_options.h" #include "mongo/executor/task_executor.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/request_types/sharded_ddl_commands_gen.h" @@ -112,56 +113,6 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded( bool unique); /** - * Acquires the collection critical section in the catch-up phase (i.e. blocking writes) for the - * specified namespace and reason. It works even if the namespace's current metadata are UNKNOWN. - * - * It adds a doc to config.collectionCriticalSections with with writeConcern write concern. - * - * Do nothing if the collection critical section is taken for that nss and reason, and will - * invariant otherwise since it is the responsibility of the caller to ensure that only one thread - * is taking the critical section. - */ -void acquireRecoverableCriticalSectionBlockWrites( - OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& reason, - const WriteConcernOptions& writeConcern, - const boost::optional<BSONObj>& additionalInfo = boost::none); - -/** - * Advances the recoverable critical section from the catch-up phase (i.e. blocking writes) to the - * commit phase (i.e. blocking reads) for the specified nss and reason. The recoverable critical - * section must have been acquired first through 'acquireRecoverableCriticalSectionBlockWrites' - * function. - * - * It updates a doc from config.collectionCriticalSections with writeConcern write concern. - * - * Do nothing if the collection critical section is already taken in commit phase. - */ -void acquireRecoverableCriticalSectionBlockReads(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& reason, - const WriteConcernOptions& writeConcern); - -/** - * Releases the recoverable critical section for the given nss and reason. - * - * It removes a doc from config.collectionCriticalSections with writeConcern write concern. - * - * Do nothing if the collection critical section is not taken for that nss and reason. - */ -void releaseRecoverableCriticalSection(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& reason, - const WriteConcernOptions& writeConcern); - -/** - * Retakes the in-memory collection critical section for each recoverable critical section - * persisted on config.collectionCriticalSections. It also clears the filtering metadata. - */ -void retakeInMemoryRecoverableCriticalSections(OperationContext* opCtx); - -/** * Stops ongoing migrations and prevents future ones to start for the given nss. */ void stopMigrations(OperationContext* opCtx, const NamespaceString& nss); diff --git a/src/mongo/db/s/sharding_migration_critical_section.cpp b/src/mongo/db/s/sharding_migration_critical_section.cpp index 10b60790ffd..6f29221e33c 100644 --- a/src/mongo/db/s/sharding_migration_critical_section.cpp +++ b/src/mongo/db/s/sharding_migration_critical_section.cpp @@ -57,6 +57,12 @@ void ShardingMigrationCriticalSection::exitCriticalSection() { } } +void ShardingMigrationCriticalSection::rollbackCriticalSectionCommitPhaseToCatchUpPhase() { + invariant(_critSecSignal); + invariant(_readsShouldWaitOnCritSec); + _readsShouldWaitOnCritSec = false; +} + boost::optional<SharedSemiFuture<void>> ShardingMigrationCriticalSection::getSignal( Operation op) const { if (!_critSecSignal) diff --git a/src/mongo/db/s/sharding_migration_critical_section.h b/src/mongo/db/s/sharding_migration_critical_section.h index 21a5ed91d9b..b0f87477607 100644 --- a/src/mongo/db/s/sharding_migration_critical_section.h +++ b/src/mongo/db/s/sharding_migration_critical_section.h @@ -70,6 +70,11 @@ public: void exitCriticalSection(); /** + * Sets the critical section back to the catch up phase, which disallows reads. + */ + void rollbackCriticalSectionCommitPhaseToCatchUpPhase(); + + /** * Retrieves a critical section future to wait on. Will return boost::none if the migration is * not yet in the critical section or if the caller is a reader and the migration is not yet in * the commit phase. |