summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2022-03-16 18:48:49 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-16 20:14:51 +0000
commitce0bbc5ec1728e443c5ff893a78693b24570b80d (patch)
treeabe97ebc0eb4db7d53918cb170c438929055a929
parentf801cb26949e709161b9673eb5fa9e39d641b17d (diff)
downloadmongo-ce0bbc5ec1728e443c5ff893a78693b24570b80d.tar.gz
SERVER-64517 Recover RecoverableCriticalSection after initialSync and startupRecovery have completed
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp3
-rw-r--r--src/mongo/db/repl/primary_only_service.h2
-rw-r--r--src/mongo/db/repl/replica_set_aware_service.cpp12
-rw-r--r--src/mongo/db/repl/replica_set_aware_service.h12
-rw-r--r--src/mongo/db/repl/replica_set_aware_service_test.cpp30
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.h3
-rw-r--r--src/mongo/db/s/balancer/balancer.h2
-rw-r--r--src/mongo/db/s/dist_lock_manager_replset.cpp2
-rw-r--r--src/mongo/db/s/recoverable_critical_section_service.cpp14
-rw-r--r--src/mongo/db/s/recoverable_critical_section_service.h13
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp110
-rw-r--r--src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp14
-rw-r--r--src/mongo/db/s/user_writes_recoverable_critical_section_service.h13
-rw-r--r--src/mongo/db/user_write_block_mode_op_observer.cpp127
-rw-r--r--src/mongo/db/vector_clock_mongod.cpp2
17 files changed, 229 insertions, 133 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 499f5de261a..707f73c8f8a 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1213,6 +1213,7 @@ env.Library(
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/executor/scoped_task_executor',
'repl_server_parameters',
+ 'replica_set_aware_service',
]
)
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 6e098bdf01c..127f382f0c5 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/repl/replication_consistency_markers.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface.h"
@@ -576,6 +577,8 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync);
+ ReplicaSetAwareServiceRegistry::get(opCtx->getServiceContext()).onInitialSyncComplete(opCtx);
+
_replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx);
// We set the initial data timestamp before clearing the initial sync flag. See comments in
diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h
index d0d8fe01db3..7e6fc1baf2e 100644
--- a/src/mongo/db/repl/primary_only_service.h
+++ b/src/mongo/db/repl/primary_only_service.h
@@ -566,6 +566,8 @@ public:
std::vector<BSONObj>* ops) noexcept;
void onStartup(OperationContext*) final;
+ void onStartupRecoveryComplete(OperationContext* opCtx) final {}
+ void onInitialSyncComplete(OperationContext* opCtx) final {}
void onShutdown() final;
void onStepUpBegin(OperationContext*, long long term) final {}
void onBecomeArbiter() final {}
diff --git a/src/mongo/db/repl/replica_set_aware_service.cpp b/src/mongo/db/repl/replica_set_aware_service.cpp
index cb925e58ced..fdf09046af2 100644
--- a/src/mongo/db/repl/replica_set_aware_service.cpp
+++ b/src/mongo/db/repl/replica_set_aware_service.cpp
@@ -56,6 +56,18 @@ void ReplicaSetAwareServiceRegistry::onStartup(OperationContext* opCtx) {
});
}
+void ReplicaSetAwareServiceRegistry::onStartupRecoveryComplete(OperationContext* opCtx) {
+ std::for_each(_services.begin(), _services.end(), [&](ReplicaSetAwareInterface* service) {
+ service->onStartupRecoveryComplete(opCtx);
+ });
+}
+
+void ReplicaSetAwareServiceRegistry::onInitialSyncComplete(OperationContext* opCtx) {
+ std::for_each(_services.begin(), _services.end(), [&](ReplicaSetAwareInterface* service) {
+ service->onInitialSyncComplete(opCtx);
+ });
+}
+
void ReplicaSetAwareServiceRegistry::onShutdown() {
std::for_each(_services.begin(), _services.end(), [&](ReplicaSetAwareInterface* service) {
service->onShutdown();
diff --git a/src/mongo/db/repl/replica_set_aware_service.h b/src/mongo/db/repl/replica_set_aware_service.h
index eb38cac0252..f7ea8c3a71e 100644
--- a/src/mongo/db/repl/replica_set_aware_service.h
+++ b/src/mongo/db/repl/replica_set_aware_service.h
@@ -118,6 +118,16 @@ public:
virtual void onStartup(OperationContext* opCtx) = 0;
/**
+ * Called after startup recovery has completed.
+ */
+ virtual void onStartupRecoveryComplete(OperationContext* opCtx) = 0;
+
+ /**
+ * Called after initial sync has completed.
+ */
+ virtual void onInitialSyncComplete(OperationContext* opCtx) = 0;
+
+ /**
* Called as part of ReplicationCoordinator shutdown.
*/
virtual void onShutdown() = 0;
@@ -190,6 +200,8 @@ public:
static ReplicaSetAwareServiceRegistry& get(ServiceContext* serviceContext);
void onStartup(OperationContext* opCtx) final;
+ void onStartupRecoveryComplete(OperationContext* opCtx) final;
+ void onInitialSyncComplete(OperationContext* opCtx) final;
void onShutdown() final;
void onStepUpBegin(OperationContext* opCtx, long long term) final;
void onStepUpComplete(OperationContext* opCtx, long long term) final;
diff --git a/src/mongo/db/repl/replica_set_aware_service_test.cpp b/src/mongo/db/repl/replica_set_aware_service_test.cpp
index cd4e6697835..f2e841581b0 100644
--- a/src/mongo/db/repl/replica_set_aware_service_test.cpp
+++ b/src/mongo/db/repl/replica_set_aware_service_test.cpp
@@ -43,6 +43,8 @@ template <class ActualService>
class TestService : public ReplicaSetAwareService<ActualService> {
public:
int numCallsOnStartup{0};
+ int numCallsOnStartupRecoveryComplete{0};
+ int numCallsOnInitialSyncComplete{0};
int numCallsOnStepUpBegin{0};
int numCallsOnStepUpComplete{0};
int numCallsOnStepDown{0};
@@ -53,6 +55,14 @@ protected:
numCallsOnStartup++;
}
+ void onStartupRecoveryComplete(OperationContext* opCtx) override {
+ numCallsOnStartupRecoveryComplete++;
+ }
+
+ void onInitialSyncComplete(OperationContext* opCtx) override {
+ numCallsOnInitialSyncComplete++;
+ }
+
void onStepUpBegin(OperationContext* opCtx, long long term) override {
numCallsOnStepUpBegin++;
}
@@ -135,6 +145,12 @@ private:
TestService::onStartup(opCtx);
}
+ void onStartupRecoveryComplete(OperationContext* opCtx) final {
+ ASSERT_EQ(numCallsOnStartupRecoveryComplete,
+ ServiceB::get(getServiceContext())->numCallsOnStartupRecoveryComplete - 1);
+ TestService::onStartupRecoveryComplete(opCtx);
+ }
+
void onStepUpBegin(OperationContext* opCtx, long long term) final {
ASSERT_EQ(numCallsOnStepUpBegin,
ServiceB::get(getServiceContext())->numCallsOnStepUpBegin - 1);
@@ -199,24 +215,32 @@ TEST_F(ReplicaSetAwareServiceTest, ReplicaSetAwareService) {
auto c = ServiceC::get(sc);
ASSERT_EQ(0, a->numCallsOnStartup);
+ ASSERT_EQ(0, a->numCallsOnStartupRecoveryComplete);
+ ASSERT_EQ(0, a->numCallsOnInitialSyncComplete);
ASSERT_EQ(0, a->numCallsOnStepUpBegin);
ASSERT_EQ(0, a->numCallsOnStepUpComplete);
ASSERT_EQ(0, a->numCallsOnStepDown);
ASSERT_EQ(0, a->numCallsOnBecomeArbiter);
ASSERT_EQ(0, b->numCallsOnStartup);
+ ASSERT_EQ(0, b->numCallsOnStartupRecoveryComplete);
+ ASSERT_EQ(0, b->numCallsOnInitialSyncComplete);
ASSERT_EQ(0, b->numCallsOnStepUpBegin);
ASSERT_EQ(0, b->numCallsOnStepUpComplete);
ASSERT_EQ(0, b->numCallsOnStepDown);
ASSERT_EQ(0, b->numCallsOnBecomeArbiter);
ASSERT_EQ(0, c->numCallsOnStartup);
+ ASSERT_EQ(0, c->numCallsOnStartupRecoveryComplete);
+ ASSERT_EQ(0, c->numCallsOnInitialSyncComplete);
ASSERT_EQ(0, c->numCallsOnStepUpBegin);
ASSERT_EQ(0, c->numCallsOnStepUpComplete);
ASSERT_EQ(0, c->numCallsOnStepDown);
ASSERT_EQ(0, c->numCallsOnBecomeArbiter);
ReplicaSetAwareServiceRegistry::get(sc).onStartup(opCtx);
+ ReplicaSetAwareServiceRegistry::get(sc).onStartupRecoveryComplete(opCtx);
+ ReplicaSetAwareServiceRegistry::get(sc).onInitialSyncComplete(opCtx);
ReplicaSetAwareServiceRegistry::get(sc).onStepUpBegin(opCtx, _term);
ReplicaSetAwareServiceRegistry::get(sc).onStepUpBegin(opCtx, _term);
ReplicaSetAwareServiceRegistry::get(sc).onStepUpBegin(opCtx, _term);
@@ -226,18 +250,24 @@ TEST_F(ReplicaSetAwareServiceTest, ReplicaSetAwareService) {
ReplicaSetAwareServiceRegistry::get(sc).onBecomeArbiter();
ASSERT_EQ(0, a->numCallsOnStartup);
+ ASSERT_EQ(0, a->numCallsOnStartupRecoveryComplete);
+ ASSERT_EQ(0, a->numCallsOnInitialSyncComplete);
ASSERT_EQ(0, a->numCallsOnStepUpBegin);
ASSERT_EQ(0, a->numCallsOnStepUpComplete);
ASSERT_EQ(0, a->numCallsOnStepDown);
ASSERT_EQ(0, a->numCallsOnBecomeArbiter);
ASSERT_EQ(1, b->numCallsOnStartup);
+ ASSERT_EQ(1, b->numCallsOnStartupRecoveryComplete);
+ ASSERT_EQ(1, b->numCallsOnInitialSyncComplete);
ASSERT_EQ(3, b->numCallsOnStepUpBegin);
ASSERT_EQ(2, b->numCallsOnStepUpComplete);
ASSERT_EQ(1, b->numCallsOnStepDown);
ASSERT_EQ(1, b->numCallsOnBecomeArbiter);
ASSERT_EQ(1, c->numCallsOnStartup);
+ ASSERT_EQ(1, c->numCallsOnStartupRecoveryComplete);
+ ASSERT_EQ(1, c->numCallsOnInitialSyncComplete);
ASSERT_EQ(3, c->numCallsOnStepUpBegin);
ASSERT_EQ(2, c->numCallsOnStepUpComplete);
ASSERT_EQ(1, c->numCallsOnStepDown);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index da5fbd375e9..89fcbd5e291 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -525,6 +525,8 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(
LOGV2(4280506, "Reconstructing prepared transactions");
reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering);
+ ReplicaSetAwareServiceRegistry::get(_service).onStartupRecoveryComplete(opCtx);
+
const auto lastOpTimeAndWallTimeResult = _externalState->loadLastOpTimeAndWallTime(opCtx);
// Use a callback here, because _finishLoadLocalConfig calls isself() which requires
diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h
index 6c6ed972a2a..8145bc37775 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.h
+++ b/src/mongo/db/repl/tenant_file_importer_service.h
@@ -54,6 +54,9 @@ public:
private:
void onStartup(OperationContext* opCtx) final;
+ void onStartupRecoveryComplete(OperationContext* opCtx) final {}
+ void onInitialSyncComplete(OperationContext* opCtx) final {}
+
void onShutdown() final {
stdx::lock_guard lk(_mutex);
_reset(lk);
diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h
index 54ac2fce63f..ab1ad167c89 100644
--- a/src/mongo/db/s/balancer/balancer.h
+++ b/src/mongo/db/s/balancer/balancer.h
@@ -219,6 +219,8 @@ private:
* ReplicaSetAwareService entry points.
*/
void onStartup(OperationContext* opCtx) final {}
+ void onStartupRecoveryComplete(OperationContext* opCtx) final {}
+ void onInitialSyncComplete(OperationContext* opCtx) final {}
void onShutdown() final {}
void onStepUpBegin(OperationContext* opCtx, long long term) final;
void onStepUpComplete(OperationContext* opCtx, long long term) final;
diff --git a/src/mongo/db/s/dist_lock_manager_replset.cpp b/src/mongo/db/s/dist_lock_manager_replset.cpp
index a41839870fe..6f491dbff67 100644
--- a/src/mongo/db/s/dist_lock_manager_replset.cpp
+++ b/src/mongo/db/s/dist_lock_manager_replset.cpp
@@ -65,6 +65,8 @@ public:
static DistLockManagerService* get(OperationContext* opCtx);
void onStartup(OperationContext* opCtx) override {}
+ void onStartupRecoveryComplete(OperationContext* opCtx) override {}
+ void onInitialSyncComplete(OperationContext* opCtx) override {}
void onShutdown() override {}
void onStepUpBegin(OperationContext* opCtx, long long term) override {
auto distLockManager = DistLockManager::get(opCtx);
diff --git a/src/mongo/db/s/recoverable_critical_section_service.cpp b/src/mongo/db/s/recoverable_critical_section_service.cpp
index dbfcd529035..073c1be01ca 100644
--- a/src/mongo/db/s/recoverable_critical_section_service.cpp
+++ b/src/mongo/db/s/recoverable_critical_section_service.cpp
@@ -48,6 +48,20 @@
namespace mongo {
+namespace recoverable_critical_section_util {
+
+bool inRecoveryMode(OperationContext* opCtx) {
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isReplEnabled()) {
+ return false;
+ }
+
+ const auto memberState = replCoord->getMemberState();
+ return memberState.startup() || memberState.startup2() || memberState.rollback();
+}
+
+} // namespace recoverable_critical_section_util
+
namespace {
const auto serviceDecorator =
ServiceContext::declareDecoration<RecoverableCriticalSectionService>();
diff --git a/src/mongo/db/s/recoverable_critical_section_service.h b/src/mongo/db/s/recoverable_critical_section_service.h
index f4d7f94820a..3452cb0ede7 100644
--- a/src/mongo/db/s/recoverable_critical_section_service.h
+++ b/src/mongo/db/s/recoverable_critical_section_service.h
@@ -37,6 +37,12 @@
namespace mongo {
+namespace recoverable_critical_section_util {
+
+bool inRecoveryMode(OperationContext* opCtx);
+
+}
+
class RecoverableCriticalSectionService
: public ReplicaSetAwareServiceShardSvr<RecoverableCriticalSectionService> {
@@ -98,10 +104,15 @@ public:
void recoverRecoverableCriticalSections(OperationContext* opCtx);
private:
- void onStartup(OperationContext* opCtx) override final {
+ void onStartupRecoveryComplete(OperationContext* opCtx) override final {
+ recoverRecoverableCriticalSections(opCtx);
+ }
+
+ void onInitialSyncComplete(OperationContext* opCtx) override final {
recoverRecoverableCriticalSections(opCtx);
}
+ void onStartup(OperationContext* opCtx) override final {}
void onShutdown() override final {}
void onStepUpBegin(OperationContext* opCtx, long long term) override final {}
void onStepUpComplete(OperationContext* opCtx, long long term) override final {}
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index 8d7826d23c0..eed070743ee 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -270,17 +270,14 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx,
}
}
- if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->isReplEnabled() ||
- (!replCoord->getMemberState().recovering() &&
- !replCoord->getMemberState().rollback())) {
- const auto collCSDoc = CollectionCriticalSectionDocument::parse(
- IDLParserErrorContext("ShardServerOpObserver"), insertedDoc);
- opCtx->recoveryUnit()->onCommit([opCtx,
- insertedNss = collCSDoc.getNss(),
- reason = collCSDoc.getReason().getOwned()](
- boost::optional<Timestamp>) {
+ if (nss == NamespaceString::kCollectionCriticalSectionsNamespace &&
+ !recoverable_critical_section_util::inRecoveryMode(opCtx)) {
+ const auto collCSDoc = CollectionCriticalSectionDocument::parse(
+ IDLParserErrorContext("ShardServerOpObserver"), insertedDoc);
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx,
+ insertedNss = collCSDoc.getNss(),
+ reason = collCSDoc.getReason().getOwned()](boost::optional<Timestamp>) {
boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
if (!isStandaloneOrPrimary(opCtx))
lockCollectionIfNotPrimary.emplace(opCtx, insertedNss, MODE_IX);
@@ -290,7 +287,6 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx,
auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
csr->enterCriticalSectionCatchUpPhase(csrLock, reason);
});
- }
}
if (metadata && metadata->isSharded()) {
@@ -412,27 +408,23 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
}
}
- if (args.nss == NamespaceString::kCollectionCriticalSectionsNamespace) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->isReplEnabled() ||
- (!replCoord->getMemberState().recovering() &&
- !replCoord->getMemberState().rollback())) {
- const auto collCSDoc = CollectionCriticalSectionDocument::parse(
- IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs->updatedDoc);
-
- opCtx->recoveryUnit()->onCommit(
- [opCtx, updatedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()](
- boost::optional<Timestamp>) {
- boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
- if (!isStandaloneOrPrimary(opCtx))
- lockCollectionIfNotPrimary.emplace(opCtx, updatedNss, MODE_IX);
-
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss);
- auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
- csr->enterCriticalSectionCommitPhase(csrLock, reason);
- });
- }
+ if (args.nss == NamespaceString::kCollectionCriticalSectionsNamespace &&
+ !recoverable_critical_section_util::inRecoveryMode(opCtx)) {
+ const auto collCSDoc = CollectionCriticalSectionDocument::parse(
+ IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs->updatedDoc);
+
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, updatedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()](
+ boost::optional<Timestamp>) {
+ boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx))
+ lockCollectionIfNotPrimary.emplace(opCtx, updatedNss, MODE_IX);
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss);
+ auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
+ csr->enterCriticalSectionCommitPhase(csrLock, reason);
+ });
}
auto* const csr = CollectionShardingRuntime::get(opCtx, args.nss);
@@ -508,34 +500,30 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx,
}
}
- if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->isReplEnabled() ||
- (!replCoord->getMemberState().recovering() &&
- !replCoord->getMemberState().rollback())) {
- const auto& deletedDoc = documentId;
- const auto collCSDoc = CollectionCriticalSectionDocument::parse(
- IDLParserErrorContext("ShardServerOpObserver"), deletedDoc);
-
- opCtx->recoveryUnit()->onCommit(
- [opCtx, deletedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()](
- boost::optional<Timestamp>) {
- boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
- if (!isStandaloneOrPrimary(opCtx))
- lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX);
-
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss);
-
- // Secondary nodes must clear the filtering metadata before releasing the
- // in-memory critical section
- if (!isStandaloneOrPrimary(opCtx))
- csr->clearFilteringMetadata(opCtx);
-
- auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
- csr->exitCriticalSection(csrLock, reason);
- });
- }
+ if (nss == NamespaceString::kCollectionCriticalSectionsNamespace &&
+ !recoverable_critical_section_util::inRecoveryMode(opCtx)) {
+ const auto& deletedDoc = documentId;
+ const auto collCSDoc = CollectionCriticalSectionDocument::parse(
+ IDLParserErrorContext("ShardServerOpObserver"), deletedDoc);
+
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, deletedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()](
+ boost::optional<Timestamp>) {
+ boost::optional<AutoGetCollection> lockCollectionIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx))
+ lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX);
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss);
+
+ // Secondary nodes must clear the filtering metadata before releasing the
+ // in-memory critical section
+ if (!isStandaloneOrPrimary(opCtx))
+ csr->clearFilteringMetadata(opCtx);
+
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
+ csr->exitCriticalSection(csrLock, reason);
+ });
}
}
diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
index c1457176934..2a0635591b1 100644
--- a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
+++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
@@ -44,6 +44,20 @@
namespace mongo {
+namespace user_writes_recoverable_critical_section_util {
+
+bool inRecoveryMode(OperationContext* opCtx) {
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isReplEnabled()) {
+ return false;
+ }
+
+ const auto memberState = replCoord->getMemberState();
+ return memberState.startup() || memberState.startup2() || memberState.rollback();
+}
+
+} // namespace user_writes_recoverable_critical_section_util
+
namespace {
const auto serviceDecorator =
ServiceContext::declareDecoration<UserWritesRecoverableCriticalSectionService>();
diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.h b/src/mongo/db/s/user_writes_recoverable_critical_section_service.h
index a08756b4a93..d361c81f86e 100644
--- a/src/mongo/db/s/user_writes_recoverable_critical_section_service.h
+++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.h
@@ -36,6 +36,12 @@
namespace mongo {
+namespace user_writes_recoverable_critical_section_util {
+
+bool inRecoveryMode(OperationContext* opCtx);
+
+}
+
/**
* Represents the 'user writes blocking' critical section. The critical section status is persisted
* on disk and it's in-memory representation is kept in sync with the persisted state through an
@@ -136,10 +142,15 @@ public:
void recoverRecoverableCriticalSections(OperationContext* opCtx);
private:
- void onStartup(OperationContext* opCtx) override final {
+ void onStartupRecoveryComplete(OperationContext* opCtx) override final {
+ recoverRecoverableCriticalSections(opCtx);
+ }
+
+ void onInitialSyncComplete(OperationContext* opCtx) override final {
recoverRecoverableCriticalSections(opCtx);
}
+ void onStartup(OperationContext* opCtx) override final {}
void onShutdown() override final {}
void onStepUpBegin(OperationContext* opCtx, long long term) override final {}
void onStepUpComplete(OperationContext* opCtx, long long term) override final {}
diff --git a/src/mongo/db/user_write_block_mode_op_observer.cpp b/src/mongo/db/user_write_block_mode_op_observer.cpp
index c024231909d..ab043ce7f9e 100644
--- a/src/mongo/db/user_write_block_mode_op_observer.cpp
+++ b/src/mongo/db/user_write_block_mode_op_observer.cpp
@@ -55,60 +55,19 @@ void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx,
bool fromMigrate) {
_checkWriteAllowed(opCtx, nss);
- if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) {
+ if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace &&
+ !user_writes_recoverable_critical_section_util::inRecoveryMode(opCtx)) {
for (auto it = first; it != last; ++it) {
const auto& insertedDoc = it->doc;
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->isReplEnabled() ||
- (!replCoord->getMemberState().recovering() &&
- !replCoord->getMemberState().rollback())) {
- const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
- IDLParserErrorContext("UserWriteBlockOpObserver"), insertedDoc);
- opCtx->recoveryUnit()->onCommit(
- [opCtx,
- blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(),
- blockWrites = collCSDoc.getBlockUserWrites(),
- insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
- invariant(insertedNss.isEmpty());
- boost::optional<Lock::GlobalLock> globalLockIfNotPrimary;
- if (!isStandaloneOrPrimary(opCtx)) {
- globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
- }
-
- if (blockShardedDDL) {
- GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(
- opCtx);
- }
-
- if (blockWrites) {
- GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
- }
- });
- }
- }
- }
-}
-
-void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx,
- const OplogUpdateEntryArgs& args) {
- _checkWriteAllowed(opCtx, args.nss);
-
- if (args.nss == NamespaceString::kUserWritesCriticalSectionsNamespace) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->isReplEnabled() ||
- (!replCoord->getMemberState().recovering() &&
- !replCoord->getMemberState().rollback())) {
const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
- IDLParserErrorContext("UserWriteBlockOpObserver"), args.updateArgs->updatedDoc);
-
+ IDLParserErrorContext("UserWriteBlockOpObserver"), insertedDoc);
opCtx->recoveryUnit()->onCommit(
[opCtx,
- updatedNss = collCSDoc.getNss(),
blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(),
blockWrites = collCSDoc.getBlockUserWrites(),
insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
- invariant(updatedNss.isEmpty());
+ invariant(insertedNss.isEmpty());
boost::optional<Lock::GlobalLock> globalLockIfNotPrimary;
if (!isStandaloneOrPrimary(opCtx)) {
globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
@@ -116,20 +75,52 @@ void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx,
if (blockShardedDDL) {
GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(opCtx);
- } else {
- GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx);
}
if (blockWrites) {
GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
- } else {
- GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
}
});
}
}
}
+void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx,
+ const OplogUpdateEntryArgs& args) {
+ _checkWriteAllowed(opCtx, args.nss);
+
+ if (args.nss == NamespaceString::kUserWritesCriticalSectionsNamespace &&
+ !user_writes_recoverable_critical_section_util::inRecoveryMode(opCtx)) {
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("UserWriteBlockOpObserver"), args.updateArgs->updatedDoc);
+
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx,
+ updatedNss = collCSDoc.getNss(),
+ blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(),
+ blockWrites = collCSDoc.getBlockUserWrites(),
+ insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
+ invariant(updatedNss.isEmpty());
+ boost::optional<Lock::GlobalLock> globalLockIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
+ }
+
+ if (blockShardedDDL) {
+ GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(opCtx);
+ } else {
+ GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx);
+ }
+
+ if (blockWrites) {
+ GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
+ } else {
+ GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
+ }
+ });
+ }
+}
+
void UserWriteBlockModeOpObserver::aboutToDelete(OperationContext* opCtx,
NamespaceString const& nss,
const UUID& uuid,
@@ -147,30 +138,26 @@ void UserWriteBlockModeOpObserver::onDelete(OperationContext* opCtx,
const OplogDeleteEntryArgs& args) {
_checkWriteAllowed(opCtx, nss);
- if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) {
+ if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace &&
+ !user_writes_recoverable_critical_section_util::inRecoveryMode(opCtx)) {
auto& documentId = documentIdDecoration(opCtx);
invariant(!documentId.isEmpty());
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->isReplEnabled() ||
- (!replCoord->getMemberState().recovering() &&
- !replCoord->getMemberState().rollback())) {
- const auto& deletedDoc = documentId;
- const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
- IDLParserErrorContext("UserWriteBlockOpObserver"), deletedDoc);
-
- opCtx->recoveryUnit()->onCommit(
- [opCtx, deletedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
- invariant(deletedNss.isEmpty());
- boost::optional<Lock::GlobalLock> globalLockIfNotPrimary;
- if (!isStandaloneOrPrimary(opCtx)) {
- globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
- }
-
- GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx);
- GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
- });
- }
+ const auto& deletedDoc = documentId;
+ const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
+ IDLParserErrorContext("UserWriteBlockOpObserver"), deletedDoc);
+
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, deletedNss = collCSDoc.getNss()](boost::optional<Timestamp>) {
+ invariant(deletedNss.isEmpty());
+ boost::optional<Lock::GlobalLock> globalLockIfNotPrimary;
+ if (!isStandaloneOrPrimary(opCtx)) {
+ globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
+ }
+
+ GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx);
+ GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
+ });
}
}
diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp
index 93db5aa123c..226d762e2a4 100644
--- a/src/mongo/db/vector_clock_mongod.cpp
+++ b/src/mongo/db/vector_clock_mongod.cpp
@@ -92,6 +92,8 @@ private:
// ReplicaSetAwareService methods implementation
void onStartup(OperationContext* opCtx) override {}
+ void onStartupRecoveryComplete(OperationContext* opCtx) override {}
+ void onInitialSyncComplete(OperationContext* opCtx) override {}
void onShutdown() override {}
void onStepUpBegin(OperationContext* opCtx, long long term) override;
void onStepUpComplete(OperationContext* opCtx, long long term) override {}