diff options
author | Jordi Serra Torrens <jordi.serra-torrens@mongodb.com> | 2022-03-16 18:48:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-16 20:14:51 +0000 |
commit | ce0bbc5ec1728e443c5ff893a78693b24570b80d (patch) | |
tree | abe97ebc0eb4db7d53918cb170c438929055a929 | |
parent | f801cb26949e709161b9673eb5fa9e39d641b17d (diff) | |
download | mongo-ce0bbc5ec1728e443c5ff893a78693b24570b80d.tar.gz |
SERVER-64517 Recover RecoverableCriticalSection after initialSync and startupRecovery have completed
17 files changed, 229 insertions, 133 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 499f5de261a..707f73c8f8a 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1213,6 +1213,7 @@ env.Library( '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/executor/scoped_task_executor', 'repl_server_parameters', + 'replica_set_aware_service', ] ) diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 6e098bdf01c..127f382f0c5 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -58,6 +58,7 @@ #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/replica_set_aware_service.h" #include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/storage_interface.h" @@ -576,6 +577,8 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx); reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync); + ReplicaSetAwareServiceRegistry::get(opCtx->getServiceContext()).onInitialSyncComplete(opCtx); + _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx); // We set the initial data timestamp before clearing the initial sync flag. See comments in diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h index d0d8fe01db3..7e6fc1baf2e 100644 --- a/src/mongo/db/repl/primary_only_service.h +++ b/src/mongo/db/repl/primary_only_service.h @@ -566,6 +566,8 @@ public: std::vector<BSONObj>* ops) noexcept; void onStartup(OperationContext*) final; + void onStartupRecoveryComplete(OperationContext* opCtx) final {} + void onInitialSyncComplete(OperationContext* opCtx) final {} void onShutdown() final; void onStepUpBegin(OperationContext*, long long term) final {} void onBecomeArbiter() final {} diff --git a/src/mongo/db/repl/replica_set_aware_service.cpp b/src/mongo/db/repl/replica_set_aware_service.cpp index cb925e58ced..fdf09046af2 100644 --- a/src/mongo/db/repl/replica_set_aware_service.cpp +++ b/src/mongo/db/repl/replica_set_aware_service.cpp @@ -56,6 +56,18 @@ void ReplicaSetAwareServiceRegistry::onStartup(OperationContext* opCtx) { }); } +void ReplicaSetAwareServiceRegistry::onStartupRecoveryComplete(OperationContext* opCtx) { + std::for_each(_services.begin(), _services.end(), [&](ReplicaSetAwareInterface* service) { + service->onStartupRecoveryComplete(opCtx); + }); +} + +void ReplicaSetAwareServiceRegistry::onInitialSyncComplete(OperationContext* opCtx) { + std::for_each(_services.begin(), _services.end(), [&](ReplicaSetAwareInterface* service) { + service->onInitialSyncComplete(opCtx); + }); +} + void ReplicaSetAwareServiceRegistry::onShutdown() { std::for_each(_services.begin(), _services.end(), [&](ReplicaSetAwareInterface* service) { service->onShutdown(); diff --git a/src/mongo/db/repl/replica_set_aware_service.h b/src/mongo/db/repl/replica_set_aware_service.h index eb38cac0252..f7ea8c3a71e 100644 --- a/src/mongo/db/repl/replica_set_aware_service.h +++ b/src/mongo/db/repl/replica_set_aware_service.h @@ -118,6 +118,16 @@ public: virtual void onStartup(OperationContext* opCtx) = 0; /** + * Called after startup recovery has completed. + */ + virtual void onStartupRecoveryComplete(OperationContext* opCtx) = 0; + + /** + * Called after initial sync has completed. + */ + virtual void onInitialSyncComplete(OperationContext* opCtx) = 0; + + /** * Called as part of ReplicationCoordinator shutdown. */ virtual void onShutdown() = 0; @@ -190,6 +200,8 @@ public: static ReplicaSetAwareServiceRegistry& get(ServiceContext* serviceContext); void onStartup(OperationContext* opCtx) final; + void onStartupRecoveryComplete(OperationContext* opCtx) final; + void onInitialSyncComplete(OperationContext* opCtx) final; void onShutdown() final; void onStepUpBegin(OperationContext* opCtx, long long term) final; void onStepUpComplete(OperationContext* opCtx, long long term) final; diff --git a/src/mongo/db/repl/replica_set_aware_service_test.cpp b/src/mongo/db/repl/replica_set_aware_service_test.cpp index cd4e6697835..f2e841581b0 100644 --- a/src/mongo/db/repl/replica_set_aware_service_test.cpp +++ b/src/mongo/db/repl/replica_set_aware_service_test.cpp @@ -43,6 +43,8 @@ template <class ActualService> class TestService : public ReplicaSetAwareService<ActualService> { public: int numCallsOnStartup{0}; + int numCallsOnStartupRecoveryComplete{0}; + int numCallsOnInitialSyncComplete{0}; int numCallsOnStepUpBegin{0}; int numCallsOnStepUpComplete{0}; int numCallsOnStepDown{0}; @@ -53,6 +55,14 @@ protected: numCallsOnStartup++; } + void onStartupRecoveryComplete(OperationContext* opCtx) override { + numCallsOnStartupRecoveryComplete++; + } + + void onInitialSyncComplete(OperationContext* opCtx) override { + numCallsOnInitialSyncComplete++; + } + void onStepUpBegin(OperationContext* opCtx, long long term) override { numCallsOnStepUpBegin++; } @@ -135,6 +145,12 @@ private: TestService::onStartup(opCtx); } + void onStartupRecoveryComplete(OperationContext* opCtx) final { + ASSERT_EQ(numCallsOnStartupRecoveryComplete, + ServiceB::get(getServiceContext())->numCallsOnStartupRecoveryComplete - 1); + TestService::onStartupRecoveryComplete(opCtx); + } + void onStepUpBegin(OperationContext* opCtx, long long term) final { ASSERT_EQ(numCallsOnStepUpBegin, ServiceB::get(getServiceContext())->numCallsOnStepUpBegin - 1); @@ -199,24 +215,32 @@ TEST_F(ReplicaSetAwareServiceTest, ReplicaSetAwareService) { auto c = ServiceC::get(sc); ASSERT_EQ(0, a->numCallsOnStartup); + ASSERT_EQ(0, a->numCallsOnStartupRecoveryComplete); + ASSERT_EQ(0, a->numCallsOnInitialSyncComplete); ASSERT_EQ(0, a->numCallsOnStepUpBegin); ASSERT_EQ(0, a->numCallsOnStepUpComplete); ASSERT_EQ(0, a->numCallsOnStepDown); ASSERT_EQ(0, a->numCallsOnBecomeArbiter); ASSERT_EQ(0, b->numCallsOnStartup); + ASSERT_EQ(0, b->numCallsOnStartupRecoveryComplete); + ASSERT_EQ(0, b->numCallsOnInitialSyncComplete); ASSERT_EQ(0, b->numCallsOnStepUpBegin); ASSERT_EQ(0, b->numCallsOnStepUpComplete); ASSERT_EQ(0, b->numCallsOnStepDown); ASSERT_EQ(0, b->numCallsOnBecomeArbiter); ASSERT_EQ(0, c->numCallsOnStartup); + ASSERT_EQ(0, c->numCallsOnStartupRecoveryComplete); + ASSERT_EQ(0, c->numCallsOnInitialSyncComplete); ASSERT_EQ(0, c->numCallsOnStepUpBegin); ASSERT_EQ(0, c->numCallsOnStepUpComplete); ASSERT_EQ(0, c->numCallsOnStepDown); ASSERT_EQ(0, c->numCallsOnBecomeArbiter); ReplicaSetAwareServiceRegistry::get(sc).onStartup(opCtx); + ReplicaSetAwareServiceRegistry::get(sc).onStartupRecoveryComplete(opCtx); + ReplicaSetAwareServiceRegistry::get(sc).onInitialSyncComplete(opCtx); ReplicaSetAwareServiceRegistry::get(sc).onStepUpBegin(opCtx, _term); ReplicaSetAwareServiceRegistry::get(sc).onStepUpBegin(opCtx, _term); ReplicaSetAwareServiceRegistry::get(sc).onStepUpBegin(opCtx, _term); @@ -226,18 +250,24 @@ TEST_F(ReplicaSetAwareServiceTest, ReplicaSetAwareService) { ReplicaSetAwareServiceRegistry::get(sc).onBecomeArbiter(); ASSERT_EQ(0, a->numCallsOnStartup); + ASSERT_EQ(0, a->numCallsOnStartupRecoveryComplete); + ASSERT_EQ(0, a->numCallsOnInitialSyncComplete); ASSERT_EQ(0, a->numCallsOnStepUpBegin); ASSERT_EQ(0, a->numCallsOnStepUpComplete); ASSERT_EQ(0, a->numCallsOnStepDown); ASSERT_EQ(0, a->numCallsOnBecomeArbiter); ASSERT_EQ(1, b->numCallsOnStartup); + ASSERT_EQ(1, b->numCallsOnStartupRecoveryComplete); + ASSERT_EQ(1, b->numCallsOnInitialSyncComplete); ASSERT_EQ(3, b->numCallsOnStepUpBegin); ASSERT_EQ(2, b->numCallsOnStepUpComplete); ASSERT_EQ(1, b->numCallsOnStepDown); ASSERT_EQ(1, b->numCallsOnBecomeArbiter); ASSERT_EQ(1, c->numCallsOnStartup); + ASSERT_EQ(1, c->numCallsOnStartupRecoveryComplete); + ASSERT_EQ(1, c->numCallsOnInitialSyncComplete); ASSERT_EQ(3, c->numCallsOnStepUpBegin); ASSERT_EQ(2, c->numCallsOnStepUpComplete); ASSERT_EQ(1, c->numCallsOnStepDown); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index da5fbd375e9..89fcbd5e291 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -525,6 +525,8 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig( LOGV2(4280506, "Reconstructing prepared transactions"); reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering); + ReplicaSetAwareServiceRegistry::get(_service).onStartupRecoveryComplete(opCtx); + const auto lastOpTimeAndWallTimeResult = _externalState->loadLastOpTimeAndWallTime(opCtx); // Use a callback here, because _finishLoadLocalConfig calls isself() which requires diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h index 6c6ed972a2a..8145bc37775 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.h +++ b/src/mongo/db/repl/tenant_file_importer_service.h @@ -54,6 +54,9 @@ public: private: void onStartup(OperationContext* opCtx) final; + void onStartupRecoveryComplete(OperationContext* opCtx) final {} + void onInitialSyncComplete(OperationContext* opCtx) final {} + void onShutdown() final { stdx::lock_guard lk(_mutex); _reset(lk); diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index 54ac2fce63f..ab1ad167c89 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -219,6 +219,8 @@ private: * ReplicaSetAwareService entry points. */ void onStartup(OperationContext* opCtx) final {} + void onStartupRecoveryComplete(OperationContext* opCtx) final {} + void onInitialSyncComplete(OperationContext* opCtx) final {} void onShutdown() final {} void onStepUpBegin(OperationContext* opCtx, long long term) final; void onStepUpComplete(OperationContext* opCtx, long long term) final; diff --git a/src/mongo/db/s/dist_lock_manager_replset.cpp b/src/mongo/db/s/dist_lock_manager_replset.cpp index a41839870fe..6f491dbff67 100644 --- a/src/mongo/db/s/dist_lock_manager_replset.cpp +++ b/src/mongo/db/s/dist_lock_manager_replset.cpp @@ -65,6 +65,8 @@ public: static DistLockManagerService* get(OperationContext* opCtx); void onStartup(OperationContext* opCtx) override {} + void onStartupRecoveryComplete(OperationContext* opCtx) override {} + void onInitialSyncComplete(OperationContext* opCtx) override {} void onShutdown() override {} void onStepUpBegin(OperationContext* opCtx, long long term) override { auto distLockManager = DistLockManager::get(opCtx); diff --git a/src/mongo/db/s/recoverable_critical_section_service.cpp b/src/mongo/db/s/recoverable_critical_section_service.cpp index dbfcd529035..073c1be01ca 100644 --- a/src/mongo/db/s/recoverable_critical_section_service.cpp +++ b/src/mongo/db/s/recoverable_critical_section_service.cpp @@ -48,6 +48,20 @@ namespace mongo { +namespace recoverable_critical_section_util { + +bool inRecoveryMode(OperationContext* opCtx) { + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isReplEnabled()) { + return false; + } + + const auto memberState = replCoord->getMemberState(); + return memberState.startup() || memberState.startup2() || memberState.rollback(); +} + +} // namespace recoverable_critical_section_util + namespace { const auto serviceDecorator = ServiceContext::declareDecoration<RecoverableCriticalSectionService>(); diff --git a/src/mongo/db/s/recoverable_critical_section_service.h b/src/mongo/db/s/recoverable_critical_section_service.h index f4d7f94820a..3452cb0ede7 100644 --- a/src/mongo/db/s/recoverable_critical_section_service.h +++ b/src/mongo/db/s/recoverable_critical_section_service.h @@ -37,6 +37,12 @@ namespace mongo { +namespace recoverable_critical_section_util { + +bool inRecoveryMode(OperationContext* opCtx); + +} + class RecoverableCriticalSectionService : public ReplicaSetAwareServiceShardSvr<RecoverableCriticalSectionService> { @@ -98,10 +104,15 @@ public: void recoverRecoverableCriticalSections(OperationContext* opCtx); private: - void onStartup(OperationContext* opCtx) override final { + void onStartupRecoveryComplete(OperationContext* opCtx) override final { + recoverRecoverableCriticalSections(opCtx); + } + + void onInitialSyncComplete(OperationContext* opCtx) override final { recoverRecoverableCriticalSections(opCtx); } + void onStartup(OperationContext* opCtx) override final {} void onShutdown() override final {} void onStepUpBegin(OperationContext* opCtx, long long term) override final {} void onStepUpComplete(OperationContext* opCtx, long long term) override final {} diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 8d7826d23c0..eed070743ee 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -270,17 +270,14 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, } } - if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->isReplEnabled() || - (!replCoord->getMemberState().recovering() && - !replCoord->getMemberState().rollback())) { - const auto collCSDoc = CollectionCriticalSectionDocument::parse( - IDLParserErrorContext("ShardServerOpObserver"), insertedDoc); - opCtx->recoveryUnit()->onCommit([opCtx, - insertedNss = collCSDoc.getNss(), - reason = collCSDoc.getReason().getOwned()]( - boost::optional<Timestamp>) { + if (nss == NamespaceString::kCollectionCriticalSectionsNamespace && + !recoverable_critical_section_util::inRecoveryMode(opCtx)) { + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("ShardServerOpObserver"), insertedDoc); + opCtx->recoveryUnit()->onCommit( + [opCtx, + insertedNss = collCSDoc.getNss(), + reason = collCSDoc.getReason().getOwned()](boost::optional<Timestamp>) { boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; if (!isStandaloneOrPrimary(opCtx)) lockCollectionIfNotPrimary.emplace(opCtx, insertedNss, MODE_IX); @@ -290,7 +287,6 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); csr->enterCriticalSectionCatchUpPhase(csrLock, reason); }); - } } if (metadata && metadata->isSharded()) { @@ -412,27 +408,23 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE } } - if (args.nss == NamespaceString::kCollectionCriticalSectionsNamespace) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->isReplEnabled() || - (!replCoord->getMemberState().recovering() && - !replCoord->getMemberState().rollback())) { - const auto collCSDoc = CollectionCriticalSectionDocument::parse( - IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs->updatedDoc); - - opCtx->recoveryUnit()->onCommit( - [opCtx, updatedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( - boost::optional<Timestamp>) { - boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; - if (!isStandaloneOrPrimary(opCtx)) - lockCollectionIfNotPrimary.emplace(opCtx, updatedNss, MODE_IX); - - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss); - auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - csr->enterCriticalSectionCommitPhase(csrLock, reason); - }); - } + if (args.nss == NamespaceString::kCollectionCriticalSectionsNamespace && + !recoverable_critical_section_util::inRecoveryMode(opCtx)) { + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs->updatedDoc); + + opCtx->recoveryUnit()->onCommit( + [opCtx, updatedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( + boost::optional<Timestamp>) { + boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) + lockCollectionIfNotPrimary.emplace(opCtx, updatedNss, MODE_IX); + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss); + auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); + csr->enterCriticalSectionCommitPhase(csrLock, reason); + }); } auto* const csr = CollectionShardingRuntime::get(opCtx, args.nss); @@ -508,34 +500,30 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, } } - if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->isReplEnabled() || - (!replCoord->getMemberState().recovering() && - !replCoord->getMemberState().rollback())) { - const auto& deletedDoc = documentId; - const auto collCSDoc = CollectionCriticalSectionDocument::parse( - IDLParserErrorContext("ShardServerOpObserver"), deletedDoc); - - opCtx->recoveryUnit()->onCommit( - [opCtx, deletedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( - boost::optional<Timestamp>) { - boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; - if (!isStandaloneOrPrimary(opCtx)) - lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX); - - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); - - // Secondary nodes must clear the filtering metadata before releasing the - // in-memory critical section - if (!isStandaloneOrPrimary(opCtx)) - csr->clearFilteringMetadata(opCtx); - - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock, reason); - }); - } + if (nss == NamespaceString::kCollectionCriticalSectionsNamespace && + !recoverable_critical_section_util::inRecoveryMode(opCtx)) { + const auto& deletedDoc = documentId; + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("ShardServerOpObserver"), deletedDoc); + + opCtx->recoveryUnit()->onCommit( + [opCtx, deletedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( + boost::optional<Timestamp>) { + boost::optional<AutoGetCollection> lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) + lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX); + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); + + // Secondary nodes must clear the filtering metadata before releasing the + // in-memory critical section + if (!isStandaloneOrPrimary(opCtx)) + csr->clearFilteringMetadata(opCtx); + + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + csr->exitCriticalSection(csrLock, reason); + }); } } diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp index c1457176934..2a0635591b1 100644 --- a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp +++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp @@ -44,6 +44,20 @@ namespace mongo { +namespace user_writes_recoverable_critical_section_util { + +bool inRecoveryMode(OperationContext* opCtx) { + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isReplEnabled()) { + return false; + } + + const auto memberState = replCoord->getMemberState(); + return memberState.startup() || memberState.startup2() || memberState.rollback(); +} + +} // namespace user_writes_recoverable_critical_section_util + namespace { const auto serviceDecorator = ServiceContext::declareDecoration<UserWritesRecoverableCriticalSectionService>(); diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.h b/src/mongo/db/s/user_writes_recoverable_critical_section_service.h index a08756b4a93..d361c81f86e 100644 --- a/src/mongo/db/s/user_writes_recoverable_critical_section_service.h +++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.h @@ -36,6 +36,12 @@ namespace mongo { +namespace user_writes_recoverable_critical_section_util { + +bool inRecoveryMode(OperationContext* opCtx); + +} + /** * Represents the 'user writes blocking' critical section. The critical section status is persisted * on disk and it's in-memory representation is kept in sync with the persisted state through an @@ -136,10 +142,15 @@ public: void recoverRecoverableCriticalSections(OperationContext* opCtx); private: - void onStartup(OperationContext* opCtx) override final { + void onStartupRecoveryComplete(OperationContext* opCtx) override final { + recoverRecoverableCriticalSections(opCtx); + } + + void onInitialSyncComplete(OperationContext* opCtx) override final { recoverRecoverableCriticalSections(opCtx); } + void onStartup(OperationContext* opCtx) override final {} void onShutdown() override final {} void onStepUpBegin(OperationContext* opCtx, long long term) override final {} void onStepUpComplete(OperationContext* opCtx, long long term) override final {} diff --git a/src/mongo/db/user_write_block_mode_op_observer.cpp b/src/mongo/db/user_write_block_mode_op_observer.cpp index c024231909d..ab043ce7f9e 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.cpp +++ b/src/mongo/db/user_write_block_mode_op_observer.cpp @@ -55,60 +55,19 @@ void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx, bool fromMigrate) { _checkWriteAllowed(opCtx, nss); - if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) { + if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace && + !user_writes_recoverable_critical_section_util::inRecoveryMode(opCtx)) { for (auto it = first; it != last; ++it) { const auto& insertedDoc = it->doc; - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->isReplEnabled() || - (!replCoord->getMemberState().recovering() && - !replCoord->getMemberState().rollback())) { - const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( - IDLParserErrorContext("UserWriteBlockOpObserver"), insertedDoc); - opCtx->recoveryUnit()->onCommit( - [opCtx, - blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(), - blockWrites = collCSDoc.getBlockUserWrites(), - insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { - invariant(insertedNss.isEmpty()); - boost::optional<Lock::GlobalLock> globalLockIfNotPrimary; - if (!isStandaloneOrPrimary(opCtx)) { - globalLockIfNotPrimary.emplace(opCtx, MODE_IX); - } - - if (blockShardedDDL) { - GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking( - opCtx); - } - - if (blockWrites) { - GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); - } - }); - } - } - } -} - -void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx, - const OplogUpdateEntryArgs& args) { - _checkWriteAllowed(opCtx, args.nss); - - if (args.nss == NamespaceString::kUserWritesCriticalSectionsNamespace) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->isReplEnabled() || - (!replCoord->getMemberState().recovering() && - !replCoord->getMemberState().rollback())) { const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( - IDLParserErrorContext("UserWriteBlockOpObserver"), args.updateArgs->updatedDoc); - + IDLParserErrorContext("UserWriteBlockOpObserver"), insertedDoc); opCtx->recoveryUnit()->onCommit( [opCtx, - updatedNss = collCSDoc.getNss(), blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(), blockWrites = collCSDoc.getBlockUserWrites(), insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { - invariant(updatedNss.isEmpty()); + invariant(insertedNss.isEmpty()); boost::optional<Lock::GlobalLock> globalLockIfNotPrimary; if (!isStandaloneOrPrimary(opCtx)) { globalLockIfNotPrimary.emplace(opCtx, MODE_IX); @@ -116,20 +75,52 @@ void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx, if (blockShardedDDL) { GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(opCtx); - } else { - GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx); } if (blockWrites) { GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); - } else { - GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); } }); } } } +void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx, + const OplogUpdateEntryArgs& args) { + _checkWriteAllowed(opCtx, args.nss); + + if (args.nss == NamespaceString::kUserWritesCriticalSectionsNamespace && + !user_writes_recoverable_critical_section_util::inRecoveryMode(opCtx)) { + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("UserWriteBlockOpObserver"), args.updateArgs->updatedDoc); + + opCtx->recoveryUnit()->onCommit( + [opCtx, + updatedNss = collCSDoc.getNss(), + blockShardedDDL = collCSDoc.getBlockNewUserShardedDDL(), + blockWrites = collCSDoc.getBlockUserWrites(), + insertedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { + invariant(updatedNss.isEmpty()); + boost::optional<Lock::GlobalLock> globalLockIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + globalLockIfNotPrimary.emplace(opCtx, MODE_IX); + } + + if (blockShardedDDL) { + GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(opCtx); + } else { + GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx); + } + + if (blockWrites) { + GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); + } else { + GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); + } + }); + } +} + void UserWriteBlockModeOpObserver::aboutToDelete(OperationContext* opCtx, NamespaceString const& nss, const UUID& uuid, @@ -147,30 +138,26 @@ void UserWriteBlockModeOpObserver::onDelete(OperationContext* opCtx, const OplogDeleteEntryArgs& args) { _checkWriteAllowed(opCtx, nss); - if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace) { + if (nss == NamespaceString::kUserWritesCriticalSectionsNamespace && + !user_writes_recoverable_critical_section_util::inRecoveryMode(opCtx)) { auto& documentId = documentIdDecoration(opCtx); invariant(!documentId.isEmpty()); - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->isReplEnabled() || - (!replCoord->getMemberState().recovering() && - !replCoord->getMemberState().rollback())) { - const auto& deletedDoc = documentId; - const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( - IDLParserErrorContext("UserWriteBlockOpObserver"), deletedDoc); - - opCtx->recoveryUnit()->onCommit( - [opCtx, deletedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { - invariant(deletedNss.isEmpty()); - boost::optional<Lock::GlobalLock> globalLockIfNotPrimary; - if (!isStandaloneOrPrimary(opCtx)) { - globalLockIfNotPrimary.emplace(opCtx, MODE_IX); - } - - GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx); - GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); - }); - } + const auto& deletedDoc = documentId; + const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( + IDLParserErrorContext("UserWriteBlockOpObserver"), deletedDoc); + + opCtx->recoveryUnit()->onCommit( + [opCtx, deletedNss = collCSDoc.getNss()](boost::optional<Timestamp>) { + invariant(deletedNss.isEmpty()); + boost::optional<Lock::GlobalLock> globalLockIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) { + globalLockIfNotPrimary.emplace(opCtx, MODE_IX); + } + + GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx); + GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); + }); } } diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp index 93db5aa123c..226d762e2a4 100644 --- a/src/mongo/db/vector_clock_mongod.cpp +++ b/src/mongo/db/vector_clock_mongod.cpp @@ -92,6 +92,8 @@ private: // ReplicaSetAwareService methods implementation void onStartup(OperationContext* opCtx) override {} + void onStartupRecoveryComplete(OperationContext* opCtx) override {} + void onInitialSyncComplete(OperationContext* opCtx) override {} void onShutdown() override {} void onStepUpBegin(OperationContext* opCtx, long long term) override; void onStepUpComplete(OperationContext* opCtx, long long term) override {} |