summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergi Mateo Bellido <sergi.mateo-bellido@mongodb.com>2021-05-06 10:17:54 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-12 03:58:06 +0000
commitad4a810cd00939054d4609b807fcb296d8332cd9 (patch)
treee3091e6a36bf35729cb9da2ff43dea5b8212abfe
parenta34847b4091f1c86a94df78acee403a973dc9329 (diff)
downloadmongo-ad4a810cd00939054d4609b807fcb296d8332cd9.tar.gz
SERVER-56040 Changing the behavior of recoverable critical sections on secondary nodes
* New implementation of recoverable critical sections based on ReplicasetAwareServices + fixing its usages
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp4
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.h5
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp15
-rw-r--r--src/mongo/db/s/collection_sharding_state.h7
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp45
-rw-r--r--src/mongo/db/s/create_collection_coordinator.h2
-rw-r--r--src/mongo/db/s/recoverable_critical_section_service.cpp401
-rw-r--r--src/mongo/db/s/recoverable_critical_section_service.h112
-rw-r--r--src/mongo/db/s/rename_collection_participant_service.cpp30
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp41
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp52
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp68
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h3
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.cpp2
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h4
-rw-r--r--src/mongo/db/s/sharding_ddl_util.cpp323
-rw-r--r--src/mongo/db/s/sharding_ddl_util.h53
-rw-r--r--src/mongo/db/s/sharding_migration_critical_section.cpp6
-rw-r--r--src/mongo/db/s/sharding_migration_critical_section.h5
21 files changed, 659 insertions, 523 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index cab39ffea0a..ae3e0167ea1 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -83,7 +83,6 @@
#include "mongo/db/s/periodic_balancer_config_refresher.h"
#include "mongo/db/s/periodic_sharded_index_consistency_checker.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
-#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/s/transaction_coordinator_service.h"
@@ -906,8 +905,6 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
migrationutil::resubmitRangeDeletionsOnStepUp(_service);
migrationutil::resumeMigrationCoordinationsOnStepUp(opCtx);
- sharding_ddl_util::retakeInMemoryRecoverableCriticalSections(opCtx);
-
const bool scheduleAsyncRefresh = true;
resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh);
} else { // unsharded
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 6a33106b528..ee7e5ee055c 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -60,6 +60,7 @@ env.Library(
'range_deletion_task.idl',
'range_deletion_util.cpp',
'read_only_catalog_cache_loader.cpp',
+ 'recoverable_critical_section_service.cpp',
'resharding/resharding_collection_cloner.cpp',
'resharding/resharding_coordinator_commit_monitor.cpp',
'resharding/resharding_coordinator_observer.cpp',
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index 21d5550a775..8f5d04410ec 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -166,6 +166,10 @@ void CollectionShardingRuntime::enterCriticalSectionCommitPhase(const CSRLock&)
_critSec.enterCriticalSectionCommitPhase();
}
+void CollectionShardingRuntime::rollbackCriticalSectionCommitPhaseToCatchUpPhase(const CSRLock&) {
+ _critSec.rollbackCriticalSectionCommitPhaseToCatchUpPhase();
+}
+
void CollectionShardingRuntime::exitCriticalSection(const CSRLock&) {
_critSec.exitCriticalSection();
}
diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h
index decb27cd1b3..03622031c12 100644
--- a/src/mongo/db/s/collection_sharding_runtime.h
+++ b/src/mongo/db/s/collection_sharding_runtime.h
@@ -130,6 +130,11 @@ public:
void enterCriticalSectionCommitPhase(const CSRLock&);
/**
+ * It transitions the critical section back to the catch up phase.
+ */
+ void rollbackCriticalSectionCommitPhaseToCatchUpPhase(const CSRLock&);
+
+ /**
* Method to control the collection's critical secion. Method listed below must be called with
* the CSRLock in exclusive mode.
*
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 58055904265..3af06c388e7 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -100,6 +100,16 @@ public:
builder->appendNumber("rangeDeleterTasks", totalNumberOfRangesScheduledForDeletion);
}
+ std::vector<NamespaceString> getCollectionNames() {
+ stdx::lock_guard lg(_mutex);
+ std::vector<NamespaceString> result;
+ result.reserve(_collections.size());
+ for (const auto& [ns, _] : _collections) {
+ result.emplace_back(ns);
+ }
+ return result;
+ }
+
private:
using CollectionsMap = StringMap<std::shared_ptr<CollectionShardingState>>;
@@ -148,6 +158,11 @@ void CollectionShardingState::appendInfoForServerStatus(OperationContext* opCtx,
collectionsMap->appendInfoForServerStatus(builder);
}
+std::vector<NamespaceString> CollectionShardingState::getCollectionNames(OperationContext* opCtx) {
+ auto& collectionsMap = CollectionShardingStateMap::get(opCtx->getServiceContext());
+ return collectionsMap->getCollectionNames();
+}
+
void CollectionShardingStateFactory::set(ServiceContext* service,
std::unique_ptr<CollectionShardingStateFactory> factory) {
auto& collectionsMap = CollectionShardingStateMap::get(service);
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index d206a603b78..f4aeafba0c0 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -29,6 +29,8 @@
#pragma once
+#include <vector>
+
#include "mongo/db/logical_time.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/scoped_collection_metadata.h"
@@ -72,6 +74,11 @@ public:
static CollectionShardingState* get(OperationContext* opCtx, const NamespaceString& nss);
/**
+ * Returns the names of the collections that have a CollectionShardingState.
+ */
+ static std::vector<NamespaceString> getCollectionNames(OperationContext* opCtx);
+
+ /**
* Obtain a pointer to the CollectionShardingState that remains safe to access without holding
* a collection lock. Should be called instead of the regular get() if no collection lock is
* held. The returned CollectionShardingState instance should not be modified!
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index 31e113d8af5..da209d46523 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/create_collection_coordinator.h"
+#include "mongo/db/s/recoverable_critical_section_service.h"
#include "mongo/db/s/shard_key_util.h"
#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_logging.h"
@@ -391,8 +392,12 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
_collectionVersion = _result->getCollectionVersion();
// The collection was already created and commited but there was a
// stepdown after the commit.
- sharding_ddl_util::releaseRecoverableCriticalSection(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ RecoverableCriticalSectionService::get(opCtx)
+ ->releaseRecoverableCriticalSection(
+ opCtx,
+ nss(),
+ _critSecReason,
+ ShardingCatalogClient::kMajorityWriteConcern);
return;
}
@@ -400,8 +405,9 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// calling this method, we need the coordinator document to be persisted (and hence
// the kCheck state), otherwise nothing will release the critical section in the
// presence of a stepdown.
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ RecoverableCriticalSectionService::get(opCtx)
+ ->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
if (_recoveredFromDisk) {
auto uuid = sharding_ddl_util::getCollectionUUID(opCtx, nss());
@@ -430,14 +436,18 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// the collection on other shards, this way we prevent
// reads/writes that should be redirected to another
// shard.
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockReads(
- opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
+ RecoverableCriticalSectionService::get(opCtx)
+ ->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx,
+ nss(),
+ _critSecReason,
+ ShardingCatalogClient::kMajorityWriteConcern);
_createCollectionOnNonPrimaryShards(opCtx);
_commit(opCtx);
}
- sharding_ddl_util::releaseRecoverableCriticalSection(
+ RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
if (!_splitPolicy->isOptimized()) {
@@ -469,32 +479,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
- sharding_ddl_util::releaseRecoverableCriticalSection(
+ RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
}
return status;
});
}
-void CreateCollectionCoordinator::_interrupt(Status status) noexcept {
- // Only free the in memory critical section if we reached the commit phase (in which we might've
- // acquired it).
- if (_doc.getPhase() >= Phase::kCommit &&
- (status.isA<ErrorCategory::NotPrimaryError>() ||
- status.isA<ErrorCategory::ShutdownError>())) {
- auto client = cc().getServiceContext()->makeClient("CreateCollectionCleanupClient");
- AlternativeClientRegion acr(client);
- auto opCtxHolder = cc().makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
- auto* const csr = CollectionShardingRuntime::get_UNSAFE(opCtx->getServiceContext(), nss());
- auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
- csr->exitCriticalSection(csrLock);
- csr->clearFilteringMetadata(opCtx);
- }
-}
-
void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) {
LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = nss());
diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h
index cf5dcec69d6..fa5a69f0c14 100644
--- a/src/mongo/db/s/create_collection_coordinator.h
+++ b/src/mongo/db/s/create_collection_coordinator.h
@@ -69,8 +69,6 @@ private:
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
- void _interrupt(Status status) noexcept override;
-
template <typename Func>
auto _executePhase(const Phase& newPhase, Func&& func) {
return [=] {
diff --git a/src/mongo/db/s/recoverable_critical_section_service.cpp b/src/mongo/db/s/recoverable_critical_section_service.cpp
new file mode 100644
index 00000000000..1a61edc3e5c
--- /dev/null
+++ b/src/mongo/db/s/recoverable_critical_section_service.cpp
@@ -0,0 +1,401 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include <set>
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/recoverable_critical_section_service.h"
+
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/collection_critical_section_document_gen.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/sharding_migration_critical_section.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+
+namespace mongo {
+
+namespace {
+const auto serviceDecorator =
+ ServiceContext::declareDecoration<RecoverableCriticalSectionService>();
+}
+
+RecoverableCriticalSectionService* RecoverableCriticalSectionService::get(
+ ServiceContext* serviceContext) {
+ return &serviceDecorator(serviceContext);
+}
+
+RecoverableCriticalSectionService* RecoverableCriticalSectionService::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+const ReplicaSetAwareServiceRegistry::Registerer<RecoverableCriticalSectionService>
+ recoverableCriticalSectionServiceServiceRegisterer("RecoverableCriticalSectionService");
+
+void RecoverableCriticalSectionService::acquireRecoverableCriticalSectionBlockWrites(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& reason,
+ const WriteConcernOptions& writeConcern,
+ const boost::optional<BSONObj>& additionalInfo) {
+ LOGV2_DEBUG(5656600,
+ 3,
+ "Acquiring recoverable critical section blocking writes",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+
+ invariant(!opCtx->lockState()->isLocked());
+
+ {
+ Lock::GlobalLock lk(opCtx, MODE_IX);
+ AutoGetCollection cCollLock(opCtx, nss, MODE_S);
+
+ DBDirectClient dbClient(opCtx);
+ auto cursor = dbClient.query(
+ NamespaceString::kCollectionCriticalSectionsNamespace,
+ BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()));
+
+ // if there is a doc with the same nss -> in order to not fail it must have the same reason
+ if (cursor->more()) {
+ const auto bsonObj = cursor->next();
+ const auto collCSDoc = CollectionCriticalSectionDocument::parse(
+ IDLParserErrorContext("AcquireRecoverableCSBW"), bsonObj);
+
+ invariant(collCSDoc.getReason().woCompare(reason) == 0,
+ str::stream()
+ << "Trying to acquire a critical section blocking writes for namespace "
+ << nss << " and reason " << reason
+ << " but it is already taken by another operation with different reason "
+ << collCSDoc.getReason());
+
+ LOGV2_DEBUG(
+ 5656601,
+ 3,
+ "The recoverable critical section was already acquired to block writes, do nothing",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+
+ return;
+ }
+
+ // The collection critical section is not taken, try to acquire it.
+
+ // The following code will try to add a doc to config.criticalCollectionSections:
+ // - If everything goes well, the shard server op observer will acquire the in-memory CS.
+ // - Otherwise this call will fail and the CS won't be taken (neither persisted nor in-mem)
+ CollectionCriticalSectionDocument newDoc(nss, reason, false /* blockReads */);
+ newDoc.setAdditionalInfo(additionalInfo);
+
+ const auto commandResponse = dbClient.runCommand([&] {
+ write_ops::InsertCommandRequest insertOp(
+ NamespaceString::kCollectionCriticalSectionsNamespace);
+ insertOp.setDocuments({newDoc.toBSON()});
+ return insertOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ BatchedCommandResponse batchedResponse;
+ std::string unusedErrmsg;
+ batchedResponse.parseBSON(commandReply, &unusedErrmsg);
+ invariant(batchedResponse.getN() > 0,
+ str::stream() << "Insert did not add any doc to collection "
+ << NamespaceString::kCollectionCriticalSectionsNamespace
+ << " for namespace " << nss << " and reason " << reason);
+ }
+
+ WriteConcernResult ignoreResult;
+ const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult));
+
+ LOGV2_DEBUG(5656602,
+ 2,
+ "Acquired recoverable critical section blocking writes",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+}
+
+void RecoverableCriticalSectionService::promoteRecoverableCriticalSectionToBlockAlsoReads(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& reason,
+ const WriteConcernOptions& writeConcern) {
+ LOGV2_DEBUG(5656603,
+ 3,
+ "Promoting recoverable critical section to also block reads",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+
+ invariant(!opCtx->lockState()->isLocked());
+
+ {
+ AutoGetCollection cCollLock(opCtx, nss, MODE_X);
+
+ DBDirectClient dbClient(opCtx);
+ auto cursor = dbClient.query(
+ NamespaceString::kCollectionCriticalSectionsNamespace,
+ BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()));
+
+ invariant(
+ cursor->more(),
+ str::stream() << "Trying to acquire a critical section blocking reads for namespace "
+ << nss << " and reason " << reason
+ << " but the critical section wasn't acquired first blocking writers.");
+ BSONObj bsonObj = cursor->next();
+ const auto collCSDoc = CollectionCriticalSectionDocument::parse(
+ IDLParserErrorContext("AcquireRecoverableCSBR"), bsonObj);
+
+ invariant(
+ collCSDoc.getReason().woCompare(reason) == 0,
+ str::stream() << "Trying to acquire a critical section blocking reads for namespace "
+ << nss << " and reason " << reason
+ << " but it is already taken by another operation with different reason "
+ << collCSDoc.getReason());
+
+ // if there is a document with the same nss, reason and blocking reads -> do nothing, the CS
+ // is already taken!
+ if (collCSDoc.getBlockReads()) {
+ LOGV2_DEBUG(5656604,
+ 3,
+ "The recoverable critical section was already promoted to also block "
+ "reads, do nothing",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+ return;
+ }
+
+ // The CS is in the catch-up phase, try to advance it to the commit phase.
+
+ // The following code will try to update a doc from config.criticalCollectionSections:
+ // - If everything goes well, the shard server op observer will advance the in-memory CS to
+ // the
+ // commit phase (blocking readers).
+ // - Otherwise this call will fail and the CS won't be advanced (neither persisted nor
+ // in-mem)
+ auto commandResponse = dbClient.runCommand([&] {
+ const auto query = BSON(
+ CollectionCriticalSectionDocument::kNssFieldName
+ << nss.toString() << CollectionCriticalSectionDocument::kReasonFieldName << reason);
+ const auto update = BSON(
+ "$set" << BSON(CollectionCriticalSectionDocument::kBlockReadsFieldName << true));
+
+ write_ops::UpdateCommandRequest updateOp(
+ NamespaceString::kCollectionCriticalSectionsNamespace);
+ auto updateModification = write_ops::UpdateModification::parseFromClassicUpdate(update);
+ write_ops::UpdateOpEntry updateEntry(query, updateModification);
+ updateOp.setUpdates({updateEntry});
+
+ return updateOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ BatchedCommandResponse batchedResponse;
+ std::string unusedErrmsg;
+ batchedResponse.parseBSON(commandReply, &unusedErrmsg);
+ invariant(batchedResponse.getNModified() > 0,
+ str::stream() << "Update did not modify any doc from collection "
+ << NamespaceString::kCollectionCriticalSectionsNamespace
+ << " for namespace " << nss << " and reason " << reason);
+ }
+
+ WriteConcernResult ignoreResult;
+ const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult));
+
+ LOGV2_DEBUG(5656605,
+ 2,
+ "Promoted recoverable critical section to also block reads",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+}
+
+void RecoverableCriticalSectionService::releaseRecoverableCriticalSection(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& reason,
+ const WriteConcernOptions& writeConcern) {
+ LOGV2_DEBUG(5656606,
+ 3,
+ "Releasing recoverable critical section",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+
+ invariant(!opCtx->lockState()->isLocked());
+
+ {
+ AutoGetCollection collLock(opCtx, nss, MODE_X);
+
+ DBDirectClient dbClient(opCtx);
+
+ const auto queryNss =
+ BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString());
+ auto cursor =
+ dbClient.query(NamespaceString::kCollectionCriticalSectionsNamespace, queryNss);
+
+ // if there is no document with the same nss -> do nothing!
+ if (!cursor->more()) {
+ LOGV2_DEBUG(5656607,
+ 3,
+ "The recoverable critical section was already released, do nothing",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+ return;
+ }
+
+ BSONObj bsonObj = cursor->next();
+ const auto collCSDoc = CollectionCriticalSectionDocument::parse(
+ IDLParserErrorContext("ReleaseRecoverableCS"), bsonObj);
+
+ invariant(
+ collCSDoc.getReason().woCompare(reason) == 0,
+ str::stream() << "Trying to release a critical for namespace " << nss << " and reason "
+ << reason
+ << " but it is already taken by another operation with different reason "
+ << collCSDoc.getReason());
+
+
+ // The collection critical section is taken (in any phase), try to release it.
+
+ // The following code will try to remove a doc from config.criticalCollectionSections:
+ // - If everything goes well, the shard server op observer will release the in-memory CS
+ // - Otherwise this call will fail and the CS won't be released (neither persisted nor
+ // in-mem)
+
+ auto commandResponse = dbClient.runCommand([&] {
+ write_ops::DeleteCommandRequest deleteOp(
+ NamespaceString::kCollectionCriticalSectionsNamespace);
+
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(queryNss);
+ entry.setMulti(true);
+ return entry;
+ }()});
+
+ return deleteOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ BatchedCommandResponse batchedResponse;
+ std::string unusedErrmsg;
+ batchedResponse.parseBSON(commandReply, &unusedErrmsg);
+ invariant(batchedResponse.getN() > 0,
+ str::stream() << "Delete did not remove any doc from collection "
+ << NamespaceString::kCollectionCriticalSectionsNamespace
+ << " for namespace " << nss << " and reason " << reason);
+ }
+
+ WriteConcernResult ignoreResult;
+ const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult));
+
+ LOGV2_DEBUG(5656608,
+ 2,
+ "Released recoverable critical section",
+ "namespace"_attr = nss,
+ "reason"_attr = reason,
+ "writeConcern"_attr = writeConcern);
+}
+
+void RecoverableCriticalSectionService::recoverRecoverableCriticalSections(
+ OperationContext* opCtx) {
+ LOGV2_DEBUG(5604000, 2, "Recovering all recoverable critical sections");
+
+ std::set<NamespaceString> nssPresentOnDisk;
+ PersistentTaskStore<CollectionCriticalSectionDocument> store(
+ NamespaceString::kCollectionCriticalSectionsNamespace);
+ store.forEach(
+ opCtx, Query{}, [&opCtx, &nssPresentOnDisk](const CollectionCriticalSectionDocument& doc) {
+ const auto& nss = doc.getNss();
+ {
+ AutoGetCollection collLock(opCtx, nss, MODE_X);
+ auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
+
+ const bool blockingWritesCS = (bool)csr->getCriticalSectionSignal(
+ opCtx, ShardingMigrationCriticalSection::Operation::kWrite);
+
+ const bool blockingReadsAndWritesCS = blockingWritesCS &&
+ csr->getCriticalSectionSignal(
+ opCtx, ShardingMigrationCriticalSection::Operation::kRead);
+
+ if (!doc.getBlockReads()) {
+ if (!blockingWritesCS)
+ csr->enterCriticalSectionCatchUpPhase(csrLock);
+ else if (blockingReadsAndWritesCS)
+ csr->rollbackCriticalSectionCommitPhaseToCatchUpPhase(csrLock);
+ } else {
+ if (!blockingWritesCS)
+ csr->enterCriticalSectionCatchUpPhase(csrLock);
+ if (!blockingReadsAndWritesCS)
+ csr->enterCriticalSectionCommitPhase(csrLock);
+ }
+
+ nssPresentOnDisk.insert(nss);
+
+ return true;
+ }
+ });
+
+ // Release in-memory CS that are not present on the disk
+ const auto collectionNames = CollectionShardingState::getCollectionNames(opCtx);
+ for (const auto& collName : collectionNames) {
+ if (nssPresentOnDisk.find(collName) != nssPresentOnDisk.end())
+ continue;
+
+ AutoGetCollection collLock(opCtx, collName, MODE_X);
+ auto* const csr = CollectionShardingRuntime::get(opCtx, collName);
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
+ csr->exitCriticalSection(csrLock);
+ }
+
+ LOGV2_DEBUG(5604001, 2, "Recovered all recoverable critical sections");
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/recoverable_critical_section_service.h b/src/mongo/db/s/recoverable_critical_section_service.h
new file mode 100644
index 00000000000..f4d7f94820a
--- /dev/null
+++ b/src/mongo/db/s/recoverable_critical_section_service.h
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replica_set_aware_service.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/write_concern_options.h"
+
+namespace mongo {
+
+class RecoverableCriticalSectionService
+ : public ReplicaSetAwareServiceShardSvr<RecoverableCriticalSectionService> {
+
+public:
+ RecoverableCriticalSectionService() = default;
+
+ static RecoverableCriticalSectionService* get(ServiceContext* serviceContext);
+ static RecoverableCriticalSectionService* get(OperationContext* opCtx);
+
+ /**
+ * Acquires the collection critical section in the catch-up phase (i.e. blocking writes) for the
+ * specified namespace and reason. It works even if the namespace's current metadata are
+ * UNKNOWN.
+ *
+ * It adds a doc to config.collectionCriticalSections with with writeConcern write concern.
+ *
+ * Do nothing if the collection critical section is taken for that nss and reason, and will
+ * invariant otherwise since it is the responsibility of the caller to ensure that only one
+ * thread is taking the critical section.
+ */
+ void acquireRecoverableCriticalSectionBlockWrites(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& reason,
+ const WriteConcernOptions& writeConcern,
+ const boost::optional<BSONObj>& additionalInfo = boost::none);
+
+ /**
+ * Advances the recoverable critical section from the catch-up phase (i.e. blocking writes) to
+ * the commit phase (i.e. blocking reads) for the specified nss and reason. The recoverable
+ * critical section must have been acquired first through
+ * 'acquireRecoverableCriticalSectionBlockWrites' function.
+ *
+ * It updates a doc from config.collectionCriticalSections with writeConcern write concern.
+ *
+ * Do nothing if the collection critical section is already taken in commit phase.
+ */
+ void promoteRecoverableCriticalSectionToBlockAlsoReads(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& reason,
+ const WriteConcernOptions& writeConcern);
+ /**
+ * Releases the recoverable critical section for the given nss and reason.
+ *
+ * It removes a doc from config.collectionCriticalSections with writeConcern write concern.
+ *
+ * Do nothing if the collection critical section is not taken for that nss and reason.
+ */
+ void releaseRecoverableCriticalSection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& reason,
+ const WriteConcernOptions& writeConcern);
+
+
+ /**
+ * This method is called when we have to mirror the state on disk of the recoverable critical
+ * section to memory (on startUp or on rollback).
+ */
+ void recoverRecoverableCriticalSections(OperationContext* opCtx);
+
+private:
+ void onStartup(OperationContext* opCtx) override final {
+ recoverRecoverableCriticalSections(opCtx);
+ }
+
+ void onShutdown() override final {}
+ void onStepUpBegin(OperationContext* opCtx, long long term) override final {}
+ void onStepUpComplete(OperationContext* opCtx, long long term) override final {}
+ void onStepDown() override final {}
+ void onBecomeArbiter() override final {}
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/rename_collection_participant_service.cpp b/src/mongo/db/s/rename_collection_participant_service.cpp
index c3ac3a04042..f987d7d9d90 100644
--- a/src/mongo/db/s/rename_collection_participant_service.cpp
+++ b/src/mongo/db/s/rename_collection_participant_service.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/range_deletion_util.h"
+#include "mongo/db/s/recoverable_critical_section_service.h"
#include "mongo/db/s/rename_collection_participant_service.h"
#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/db/s/sharding_ddl_util.h"
@@ -230,13 +231,14 @@ SemiFuture<void> RenameParticipantInstance::run(
BSON("command"
<< "rename"
<< "from" << fromNss().toString() << "to" << toNss().toString());
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites(
+ auto service = RecoverableCriticalSectionService::get(opCtx);
+ service->acquireRecoverableCriticalSectionBlockWrites(
opCtx, fromNss(), reason, ShardingCatalogClient::kLocalWriteConcern);
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockReads(
+ service->promoteRecoverableCriticalSectionToBlockAlsoReads(
opCtx, fromNss(), reason, ShardingCatalogClient::kLocalWriteConcern);
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites(
+ service->acquireRecoverableCriticalSectionBlockWrites(
opCtx, toNss(), reason, ShardingCatalogClient::kLocalWriteConcern);
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockReads(
+ service->promoteRecoverableCriticalSectionToBlockAlsoReads(
opCtx, toNss(), reason, ShardingCatalogClient::kLocalWriteConcern);
snapshotRangeDeletionsForRename(opCtx, fromNss(), toNss());
@@ -290,9 +292,10 @@ SemiFuture<void> RenameParticipantInstance::run(
BSON("command"
<< "rename"
<< "from" << fromNss().toString() << "to" << toNss().toString());
- sharding_ddl_util::releaseRecoverableCriticalSection(
+ auto service = RecoverableCriticalSectionService::get(opCtx);
+ service->releaseRecoverableCriticalSection(
opCtx, fromNss(), reason, ShardingCatalogClient::kLocalWriteConcern);
- sharding_ddl_util::releaseRecoverableCriticalSection(
+ service->releaseRecoverableCriticalSection(
opCtx, toNss(), reason, ShardingCatalogClient::kMajorityWriteConcern);
Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(fromNss());
@@ -347,24 +350,9 @@ void RenameParticipantInstance::interrupt(Status status) noexcept {
"toNs"_attr = toNss(),
"error"_attr = redact(status));
- auto releaseInMemoryCritSec = [](const NamespaceString& nss) {
- auto client = cc().getServiceContext()->makeClient("RenameParticipantCleanupClient");
- AlternativeClientRegion acr(client);
- auto opCtxHolder = cc().makeOperationContext();
- auto* opCtx = opCtxHolder.get();
-
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- auto* const csr = CollectionShardingRuntime::get_UNSAFE(opCtx->getServiceContext(), nss);
- auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
- csr->exitCriticalSection(csrLock);
- csr->clearFilteringMetadata(opCtx);
- };
-
invariant(status.isA<ErrorCategory::NotPrimaryError>() ||
status.isA<ErrorCategory::ShutdownError>());
- releaseInMemoryCritSec(fromNss());
- releaseInMemoryCritSec(toNss());
_invalidateFutures(status);
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index d303ad6adb5..606407c81b6 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -46,11 +46,11 @@
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/s/recoverable_critical_section_service.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
-#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
@@ -331,22 +331,6 @@ void ReshardingDonorService::DonorStateMachine::_runMandatoryCleanup(Status stat
_completionPromise.setError(status);
}
}
-
- // TODO SERVER-56040: this if-stmt must be removed.
- if (status.isA<ErrorCategory::NotPrimaryError>() ||
- status.isA<ErrorCategory::ShutdownError>()) {
- auto client = cc().getServiceContext()->makeClient("ReshardingDonorCleanupClient");
- AlternativeClientRegion acr(client);
- auto opCtxHolder = cc().makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
- auto* const csr = CollectionShardingRuntime::get_UNSAFE(opCtx->getServiceContext(),
- _metadata.getSourceNss());
- auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
- csr->exitCriticalSection(csrLock);
- csr->clearFilteringMetadata(opCtx);
- }
}
SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
@@ -431,7 +415,7 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
}
if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) {
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites(
+ RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
opCtx,
_metadata.getSourceNss(),
_critSecReason,
@@ -441,11 +425,12 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
}
if (coordinatorState >= CoordinatorStateEnum::kDecisionPersisted) {
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockReads(
- opCtx,
- _metadata.getSourceNss(),
- _critSecReason,
- ShardingCatalogClient::kLocalWriteConcern);
+ RecoverableCriticalSectionService::get(opCtx)
+ ->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx,
+ _metadata.getSourceNss(),
+ _critSecReason,
+ ShardingCatalogClient::kLocalWriteConcern);
ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted);
}
@@ -680,11 +665,11 @@ void ReshardingDonorService::DonorStateMachine::_transitionToDone() {
{
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- sharding_ddl_util::releaseRecoverableCriticalSection(
- opCtx.get(),
- _metadata.getSourceNss(),
- _critSecReason,
- ShardingCatalogClient::kLocalWriteConcern);
+ RecoverableCriticalSectionService::get(opCtx.get())
+ ->releaseRecoverableCriticalSection(opCtx.get(),
+ _metadata.getSourceNss(),
+ _critSecReason,
+ ShardingCatalogClient::kLocalWriteConcern);
}
_transitionState(DonorStateEnum::kDone);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 434bb644388..50e2abe232d 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_destination_manager.h"
+#include "mongo/db/s/recoverable_critical_section_service.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_applier.h"
@@ -51,7 +52,6 @@
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/shard_key_util.h"
-#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -282,22 +282,6 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status)
if (!_completionPromise.getFuture().isReady()) {
_completionPromise.setError(status);
}
-
- // TODO SERVER-56040: this if-stmt must be removed.
- if (status.isA<ErrorCategory::NotPrimaryError>() ||
- status.isA<ErrorCategory::ShutdownError>()) {
- auto client = cc().getServiceContext()->makeClient("ReshardingRecipientCleanupClient");
- AlternativeClientRegion acr(client);
- auto opCtxHolder = cc().makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
- auto* const csr = CollectionShardingRuntime::get_UNSAFE(opCtx->getServiceContext(),
- _metadata.getSourceNss());
- auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
- csr->exitCriticalSection(csrLock);
- csr->clearFilteringMetadata(opCtx);
- }
}
boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::reportForCurrentOp(
@@ -326,7 +310,7 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange
_onAbortOrStepdown(lk, status);
if (!_isAlsoDonor) {
- sharding_ddl_util::releaseRecoverableCriticalSection(
+ RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
opCtx,
_metadata.getSourceNss(),
_critSecReason,
@@ -533,11 +517,12 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
.then([this] {
if (!_isAlsoDonor) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites(
- opCtx.get(),
- _metadata.getSourceNss(),
- _critSecReason,
- ShardingCatalogClient::kMajorityWriteConcern);
+ RecoverableCriticalSectionService::get(opCtx.get())
+ ->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx.get(),
+ _metadata.getSourceNss(),
+ _critSecReason,
+ ShardingCatalogClient::kMajorityWriteConcern);
}
_transitionState(RecipientStateEnum::kStrictConsistency);
@@ -566,11 +551,12 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi
if (!_isAlsoDonor) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- sharding_ddl_util::acquireRecoverableCriticalSectionBlockWrites(
- opCtx.get(),
- _metadata.getSourceNss(),
- _critSecReason,
- ShardingCatalogClient::kLocalWriteConcern);
+ RecoverableCriticalSectionService::get(opCtx.get())
+ ->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx.get(),
+ _metadata.getSourceNss(),
+ _critSecReason,
+ ShardingCatalogClient::kLocalWriteConcern);
RenameCollectionOptions options;
options.dropTarget = true;
@@ -578,11 +564,11 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi
uassertStatusOK(renameCollection(
opCtx.get(), _metadata.getTempReshardingNss(), _metadata.getSourceNss(), options));
- sharding_ddl_util::releaseRecoverableCriticalSection(
- opCtx.get(),
- _metadata.getSourceNss(),
- _critSecReason,
- ShardingCatalogClient::kLocalWriteConcern);
+ RecoverableCriticalSectionService::get(opCtx.get())
+ ->releaseRecoverableCriticalSection(opCtx.get(),
+ _metadata.getSourceNss(),
+ _critSecReason,
+ ShardingCatalogClient::kLocalWriteConcern);
}
_cleanupReshardingCollections();
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index 99b25f4e82d..73a842da95c 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/range_deletion_task_gen.h"
+#include "mongo/db/s/recoverable_critical_section_service.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_state.h"
@@ -275,15 +276,19 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx,
if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) {
const auto collCSDoc = CollectionCriticalSectionDocument::parse(
IDLParserErrorContext("ShardServerOpObserver"), insertedDoc);
- if (isStandaloneOrPrimary(opCtx)) {
- opCtx->recoveryUnit()->onCommit([opCtx, insertedNss = collCSDoc.getNss()](
- boost::optional<Timestamp>) {
+
+
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
+ boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx))
+ lockCollectionIfNotPrimary.emplace(opCtx, insertedNss, MODE_IX);
+
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
auto* const csr = CollectionShardingRuntime::get(opCtx, insertedNss);
auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
csr->enterCriticalSectionCatchUpPhase(csrLock);
});
- }
}
if (metadata && metadata->isSharded()) {
@@ -405,21 +410,17 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
const auto collCSDoc = CollectionCriticalSectionDocument::parse(
IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs.updatedDoc);
- const auto& updatedNss = collCSDoc.getNss();
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, updatedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
+ boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx))
+ lockCollectionIfNotPrimary.emplace(opCtx, updatedNss, MODE_IX);
- if (isStandaloneOrPrimary(opCtx)) {
- opCtx->recoveryUnit()->onCommit([opCtx, updatedNss](boost::optional<Timestamp>) {
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss);
auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
csr->enterCriticalSectionCommitPhase(csrLock);
});
- } else {
- // Force subsequent uses of the namespace to refresh the filtering metadata so they
- // can synchronize with any work happening on the primary (e.g., migration critical
- // section).
- CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx);
- }
}
auto* const csr = CollectionShardingRuntime::get(opCtx, args.nss);
@@ -488,21 +489,24 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx,
}
if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) {
- if (isStandaloneOrPrimary(opCtx)) {
- const auto deletedNss([&] {
- std::string coll;
- fassert(5514801,
- bsonExtractStringField(
- documentId, CollectionCriticalSectionDocument::kNssFieldName, &coll));
- return NamespaceString(coll);
- }());
- opCtx->recoveryUnit()->onCommit([opCtx, deletedNss](boost::optional<Timestamp>) {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss);
- auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
- csr->exitCriticalSection(csrLock);
- });
- }
+ const auto deletedNss([&] {
+ std::string coll;
+ fassert(5514801,
+ bsonExtractStringField(
+ documentId, CollectionCriticalSectionDocument::kNssFieldName, &coll));
+ return NamespaceString(coll);
+ }());
+
+ opCtx->recoveryUnit()->onCommit([opCtx, deletedNss](boost::optional<Timestamp>) {
+ boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx))
+ lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX);
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss);
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
+ csr->exitCriticalSection(csrLock);
+ });
}
}
@@ -600,5 +604,13 @@ void ShardServerOpObserver::onCollMod(OperationContext* opCtx,
abortOngoingMigrationIfNeeded(opCtx, nss);
};
+void ShardServerOpObserver::onReplicationRollback(OperationContext* opCtx,
+ const RollbackObserverInfo& rbInfo) {
+ if (rbInfo.rollbackNamespaces.find(NamespaceString::kCollectionCriticalSectionsNamespace) !=
+ rbInfo.rollbackNamespaces.end()) {
+ RecoverableCriticalSectionService::get(opCtx)->recoverRecoverableCriticalSections(opCtx);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 29b213aebcb..eff1924ffd6 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -194,7 +194,8 @@ public:
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
- void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) {}
+ void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo);
+
void onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) override {}
};
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp
index a8179143790..87051eb4385 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp
@@ -109,8 +109,6 @@ void ShardingDDLCoordinator::interrupt(Status status) {
"coordinatorId"_attr = _coorMetadata.getId(),
"reason"_attr = redact(status));
- _interrupt(status);
-
// Resolve any unresolved promises to avoid hanging.
stdx::lock_guard<Latch> lg(_mutex);
if (!_constructionCompletionPromise.getFuture().isReady()) {
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index e28746ff561..852a19f63ac 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -106,10 +106,6 @@ private:
virtual ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept = 0;
- // TODO SERVER-56040: remove once we have critical section handling and replication on
- // secondaries.
- virtual void _interrupt(Status status) noexcept {}
-
void interrupt(Status status) override final;
bool _removeDocument(OperationContext* opCtx);
diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp
index fcf5258a65c..f1c0dce4b78 100644
--- a/src/mongo/db/s/sharding_ddl_util.cpp
+++ b/src/mongo/db/s/sharding_ddl_util.cpp
@@ -35,10 +35,6 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/persistent_task_store.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/s/collection_critical_section_document_gen.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_util.h"
@@ -49,8 +45,6 @@
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/set_allow_migrations_gen.h"
-#include "mongo/s/write_ops/batched_command_request.h"
-#include "mongo/s/write_ops/batched_command_response.h"
namespace mongo {
@@ -297,323 +291,6 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded(
return response;
}
-void acquireRecoverableCriticalSectionBlockWrites(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& reason,
- const WriteConcernOptions& writeConcern,
- const boost::optional<BSONObj>& additionalInfo) {
- LOGV2_DEBUG(5656600,
- 3,
- "Acquiring recoverable critical section blocking writes",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
-
- invariant(!opCtx->lockState()->isLocked());
-
- {
- Lock::GlobalLock lk(opCtx, MODE_IX);
- AutoGetCollection cCollLock(opCtx, nss, MODE_S);
-
- DBDirectClient dbClient(opCtx);
- auto cursor = dbClient.query(
- NamespaceString::kCollectionCriticalSectionsNamespace,
- BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()));
-
- // if there is a doc with the same nss -> in order to not fail it must have the same reason
- if (cursor->more()) {
- const auto bsonObj = cursor->next();
- const auto collCSDoc = CollectionCriticalSectionDocument::parse(
- IDLParserErrorContext("AcquireRecoverableCSBW"), bsonObj);
-
- invariant(collCSDoc.getReason().woCompare(reason) == 0,
- str::stream()
- << "Trying to acquire a critical section blocking writes for namespace "
- << nss << " and reason " << reason
- << " but it is already taken by another operation with different reason "
- << collCSDoc.getReason());
-
- LOGV2_DEBUG(
- 5656601,
- 3,
- "The recoverable critical section was already acquired to block writes, do nothing",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
- return;
- }
-
- // The collection critical section is not taken, try to acquire it.
-
- // The following code will try to add a doc to config.criticalCollectionSections:
- // - If everything goes well, the shard server op observer will acquire the in-memory CS.
- // - Otherwise this call will fail and the CS won't be taken (neither persisted nor in-mem)
- CollectionCriticalSectionDocument newDoc(nss, reason, false /* blockReads */);
- newDoc.setAdditionalInfo(additionalInfo);
-
- const auto commandResponse = dbClient.runCommand([&] {
- write_ops::InsertCommandRequest insertOp(
- NamespaceString::kCollectionCriticalSectionsNamespace);
- insertOp.setDocuments({newDoc.toBSON()});
- return insertOp.serialize({});
- }());
-
- const auto commandReply = commandResponse->getCommandReply();
- uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
-
- BatchedCommandResponse batchedResponse;
- std::string unusedErrmsg;
- batchedResponse.parseBSON(commandReply, &unusedErrmsg);
- invariant(batchedResponse.getN() > 0,
- str::stream() << "Insert did not add any doc to collection "
- << NamespaceString::kCollectionCriticalSectionsNamespace
- << " for namespace " << nss << " and reason " << reason);
- }
-
- WriteConcernResult ignoreResult;
- const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult));
- LOGV2_DEBUG(5656602,
- 2,
- "Acquired recoverable critical section blocking writes",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
-}
-
-void acquireRecoverableCriticalSectionBlockReads(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& reason,
- const WriteConcernOptions& writeConcern) {
- LOGV2_DEBUG(5656603,
- 3,
- "Promoting recoverable critical section to also block reads",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
-
- invariant(!opCtx->lockState()->isLocked());
-
- {
- AutoGetCollection cCollLock(opCtx, nss, MODE_X);
-
- DBDirectClient dbClient(opCtx);
- auto cursor = dbClient.query(
- NamespaceString::kCollectionCriticalSectionsNamespace,
- BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString()));
-
- invariant(
- cursor->more(),
- str::stream() << "Trying to acquire a critical section blocking reads for namespace "
- << nss << " and reason " << reason
- << " but the critical section wasn't acquired first blocking writers.");
- BSONObj bsonObj = cursor->next();
- const auto collCSDoc = CollectionCriticalSectionDocument::parse(
- IDLParserErrorContext("AcquireRecoverableCSBR"), bsonObj);
-
- invariant(
- collCSDoc.getReason().woCompare(reason) == 0,
- str::stream() << "Trying to acquire a critical section blocking reads for namespace "
- << nss << " and reason " << reason
- << " but it is already taken by another operation with different reason "
- << collCSDoc.getReason());
-
- // if there is a document with the same nss, reason and blocking reads -> do nothing, the CS
- // is already taken!
- if (collCSDoc.getBlockReads()) {
- LOGV2_DEBUG(5656604,
- 3,
- "The recoverable critical section was already promoted to also block "
- "reads, do nothing",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
- return;
- }
-
- // The CS is in the catch-up phase, try to advance it to the commit phase.
-
- // The following code will try to update a doc from config.criticalCollectionSections:
- // - If everything goes well, the shard server op observer will advance the in-memory CS to
- // the
- // commit phase (blocking readers).
- // - Otherwise this call will fail and the CS won't be advanced (neither persisted nor
- // in-mem)
- auto commandResponse = dbClient.runCommand([&] {
- const auto query = BSON(
- CollectionCriticalSectionDocument::kNssFieldName
- << nss.toString() << CollectionCriticalSectionDocument::kReasonFieldName << reason);
- const auto update = BSON(
- "$set" << BSON(CollectionCriticalSectionDocument::kBlockReadsFieldName << true));
-
- write_ops::UpdateCommandRequest updateOp(
- NamespaceString::kCollectionCriticalSectionsNamespace);
- auto updateModification = write_ops::UpdateModification::parseFromClassicUpdate(update);
- write_ops::UpdateOpEntry updateEntry(query, updateModification);
- updateOp.setUpdates({updateEntry});
-
- return updateOp.serialize({});
- }());
-
- const auto commandReply = commandResponse->getCommandReply();
- uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
-
- BatchedCommandResponse batchedResponse;
- std::string unusedErrmsg;
- batchedResponse.parseBSON(commandReply, &unusedErrmsg);
- invariant(batchedResponse.getNModified() > 0,
- str::stream() << "Update did not modify any doc from collection "
- << NamespaceString::kCollectionCriticalSectionsNamespace
- << " for namespace " << nss << " and reason " << reason);
- }
-
- WriteConcernResult ignoreResult;
- const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult));
- LOGV2_DEBUG(5656605,
- 2,
- "Promoted recoverable critical section to also block reads",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
-}
-
-void releaseRecoverableCriticalSection(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& reason,
- const WriteConcernOptions& writeConcern) {
- LOGV2_DEBUG(5656606,
- 3,
- "Releasing recoverable critical section",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
-
- invariant(!opCtx->lockState()->isLocked());
-
- {
- AutoGetCollection collLock(opCtx, nss, MODE_X);
-
- DBDirectClient dbClient(opCtx);
-
- const auto queryNss =
- BSON(CollectionCriticalSectionDocument::kNssFieldName << nss.toString());
- auto cursor =
- dbClient.query(NamespaceString::kCollectionCriticalSectionsNamespace, queryNss);
-
- // if there is no document with the same nss -> do nothing!
- if (!cursor->more()) {
- LOGV2_DEBUG(5656607,
- 3,
- "The recoverable critical section was already released, do nothing",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
- return;
- }
-
- BSONObj bsonObj = cursor->next();
- const auto collCSDoc = CollectionCriticalSectionDocument::parse(
- IDLParserErrorContext("ReleaseRecoverableCS"), bsonObj);
-
- invariant(
- collCSDoc.getReason().woCompare(reason) == 0,
- str::stream() << "Trying to release a critical for namespace " << nss << " and reason "
- << reason
- << " but it is already taken by another operation with different reason "
- << collCSDoc.getReason());
-
-
- // The collection critical section is taken (in any phase), try to release it.
-
- // The following code will try to remove a doc from config.criticalCollectionSections:
- // - If everything goes well, the shard server op observer will release the in-memory CS
- // - Otherwise this call will fail and the CS won't be released (neither persisted nor
- // in-mem)
-
- auto commandResponse = dbClient.runCommand([&] {
- write_ops::DeleteCommandRequest deleteOp(
- NamespaceString::kCollectionCriticalSectionsNamespace);
-
- deleteOp.setDeletes({[&] {
- write_ops::DeleteOpEntry entry;
- entry.setQ(queryNss);
- entry.setMulti(true);
- return entry;
- }()});
-
- return deleteOp.serialize({});
- }());
-
- const auto commandReply = commandResponse->getCommandReply();
- uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
-
- BatchedCommandResponse batchedResponse;
- std::string unusedErrmsg;
- batchedResponse.parseBSON(commandReply, &unusedErrmsg);
- invariant(batchedResponse.getN() > 0,
- str::stream() << "Delete did not remove any doc from collection "
- << NamespaceString::kCollectionCriticalSectionsNamespace
- << " for namespace " << nss << " and reason " << reason);
- }
-
- WriteConcernResult ignoreResult;
- const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, writeConcern, &ignoreResult));
-
- LOGV2_DEBUG(5656608,
- 2,
- "Released recoverable critical section",
- "namespace"_attr = nss,
- "reason"_attr = reason,
- "writeConcern"_attr = writeConcern);
-}
-
-void retakeInMemoryRecoverableCriticalSections(OperationContext* opCtx) {
-
- LOGV2_DEBUG(5549400, 2, "Starting re-acquisition of recoverable critical sections");
-
- PersistentTaskStore<CollectionCriticalSectionDocument> store(
- NamespaceString::kCollectionCriticalSectionsNamespace);
- store.forEach(opCtx, Query{}, [&opCtx](const CollectionCriticalSectionDocument& doc) {
- const auto& nss = doc.getNss();
- {
- // Entering into the catch-up phase: blocking writes
- Lock::GlobalLock lk(opCtx, MODE_IX);
- AutoGetCollection cCollLock(opCtx, nss, MODE_S);
- auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
- auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
-
- // It may happen that the ReplWriterWorker enters the critical section before drain mode
- // upon committing a recoverable critical section oplog entry (SERVER-56104)
- if (!csr->getCriticalSectionSignal(
- opCtx, ShardingMigrationCriticalSection::Operation::kWrite)) {
- csr->enterCriticalSectionCatchUpPhase(csrLock);
- }
- }
-
- if (doc.getBlockReads()) {
- // Entering into the commit phase: blocking reads
- AutoGetCollection cCollLock(opCtx, nss, MODE_X);
- auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
- auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
-
- // It may happen that the ReplWriterWorker enters the critical section before drain mode
- // upon committing a recoverable critical section oplog entry (SERVER-56104)
- if (!csr->getCriticalSectionSignal(
- opCtx, ShardingMigrationCriticalSection::Operation::kRead)) {
- csr->enterCriticalSectionCommitPhase(csrLock);
- }
-
- CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx);
- }
-
- return true;
- });
-
- LOGV2_DEBUG(5549401, 2, "Finished re-acquisition of recoverable critical sections");
-}
-
void stopMigrations(OperationContext* opCtx, const NamespaceString& nss) {
const ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss,
false /* allowMigrations */);
diff --git a/src/mongo/db/s/sharding_ddl_util.h b/src/mongo/db/s/sharding_ddl_util.h
index d7871a1b544..3eb90293551 100644
--- a/src/mongo/db/s/sharding_ddl_util.h
+++ b/src/mongo/db/s/sharding_ddl_util.h
@@ -27,10 +27,11 @@
* it in the license file.
*/
+#pragma once
+
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/write_concern_options.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
@@ -112,56 +113,6 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded(
bool unique);
/**
- * Acquires the collection critical section in the catch-up phase (i.e. blocking writes) for the
- * specified namespace and reason. It works even if the namespace's current metadata are UNKNOWN.
- *
- * It adds a doc to config.collectionCriticalSections with with writeConcern write concern.
- *
- * Do nothing if the collection critical section is taken for that nss and reason, and will
- * invariant otherwise since it is the responsibility of the caller to ensure that only one thread
- * is taking the critical section.
- */
-void acquireRecoverableCriticalSectionBlockWrites(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& reason,
- const WriteConcernOptions& writeConcern,
- const boost::optional<BSONObj>& additionalInfo = boost::none);
-
-/**
- * Advances the recoverable critical section from the catch-up phase (i.e. blocking writes) to the
- * commit phase (i.e. blocking reads) for the specified nss and reason. The recoverable critical
- * section must have been acquired first through 'acquireRecoverableCriticalSectionBlockWrites'
- * function.
- *
- * It updates a doc from config.collectionCriticalSections with writeConcern write concern.
- *
- * Do nothing if the collection critical section is already taken in commit phase.
- */
-void acquireRecoverableCriticalSectionBlockReads(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& reason,
- const WriteConcernOptions& writeConcern);
-
-/**
- * Releases the recoverable critical section for the given nss and reason.
- *
- * It removes a doc from config.collectionCriticalSections with writeConcern write concern.
- *
- * Do nothing if the collection critical section is not taken for that nss and reason.
- */
-void releaseRecoverableCriticalSection(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& reason,
- const WriteConcernOptions& writeConcern);
-
-/**
- * Retakes the in-memory collection critical section for each recoverable critical section
- * persisted on config.collectionCriticalSections. It also clears the filtering metadata.
- */
-void retakeInMemoryRecoverableCriticalSections(OperationContext* opCtx);
-
-/**
* Stops ongoing migrations and prevents future ones to start for the given nss.
*/
void stopMigrations(OperationContext* opCtx, const NamespaceString& nss);
diff --git a/src/mongo/db/s/sharding_migration_critical_section.cpp b/src/mongo/db/s/sharding_migration_critical_section.cpp
index 10b60790ffd..6f29221e33c 100644
--- a/src/mongo/db/s/sharding_migration_critical_section.cpp
+++ b/src/mongo/db/s/sharding_migration_critical_section.cpp
@@ -57,6 +57,12 @@ void ShardingMigrationCriticalSection::exitCriticalSection() {
}
}
+void ShardingMigrationCriticalSection::rollbackCriticalSectionCommitPhaseToCatchUpPhase() {
+ invariant(_critSecSignal);
+ invariant(_readsShouldWaitOnCritSec);
+ _readsShouldWaitOnCritSec = false;
+}
+
boost::optional<SharedSemiFuture<void>> ShardingMigrationCriticalSection::getSignal(
Operation op) const {
if (!_critSecSignal)
diff --git a/src/mongo/db/s/sharding_migration_critical_section.h b/src/mongo/db/s/sharding_migration_critical_section.h
index 21a5ed91d9b..b0f87477607 100644
--- a/src/mongo/db/s/sharding_migration_critical_section.h
+++ b/src/mongo/db/s/sharding_migration_critical_section.h
@@ -70,6 +70,11 @@ public:
void exitCriticalSection();
/**
+ * Sets the critical section back to the catch up phase, which disallows reads.
+ */
+ void rollbackCriticalSectionCommitPhaseToCatchUpPhase();
+
+ /**
* Retrieves a critical section future to wait on. Will return boost::none if the migration is
* not yet in the critical section or if the caller is a reader and the migration is not yet in
* the commit phase.