diff options
author | Andrew Shuvalov <andrew.shuvalov@mongodb.com> | 2021-11-03 19:18:11 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-28 21:25:36 +0000 |
commit | 776d8f5b7d4d674b2108ccc9335e0355f1166245 (patch) | |
tree | bb3c91b37f7af40d60379494068b375ca2e99872 | |
parent | 60356fccefbaef8cc39cec91082b9812571a907f (diff) | |
download | mongo-776d8f5b7d4d674b2108ccc9335e0355f1166245.tar.gz |
SERVER-59366 Progress monitor for periodic health check
-rw-r--r-- | src/mongo/db/process_health/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/process_health/fault_manager.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/process_health/fault_manager.h | 39 | ||||
-rw-r--r-- | src/mongo/db/process_health/fault_manager_config.h | 15 | ||||
-rw-r--r-- | src/mongo/db/process_health/fault_manager_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/process_health/fault_manager_test_suite.h | 32 | ||||
-rw-r--r-- | src/mongo/db/process_health/health_observer.h | 28 | ||||
-rw-r--r-- | src/mongo/db/process_health/health_observer_base.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/process_health/health_observer_base.h | 34 | ||||
-rw-r--r-- | src/mongo/db/process_health/health_observer_mock.h | 11 | ||||
-rw-r--r-- | src/mongo/db/process_health/health_observer_registration.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/process_health/health_observer_registration.h | 5 | ||||
-rw-r--r-- | src/mongo/db/process_health/health_observer_test.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/process_health/progress_monitor.cpp | 122 | ||||
-rw-r--r-- | src/mongo/db/process_health/progress_monitor.h | 75 | ||||
-rw-r--r-- | src/mongo/util/exit_code.h | 4 |
16 files changed, 466 insertions, 73 deletions
diff --git a/src/mongo/db/process_health/SConscript b/src/mongo/db/process_health/SConscript index ce2e3951071..40bec669cd4 100644 --- a/src/mongo/db/process_health/SConscript +++ b/src/mongo/db/process_health/SConscript @@ -15,6 +15,7 @@ env.Library( 'health_monitoring_server_parameters.cpp', 'health_observer_base.cpp', 'health_observer_registration.cpp', + 'progress_monitor.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/process_health/fault_manager.cpp b/src/mongo/db/process_health/fault_manager.cpp index a858f1808af..9b203c23823 100644 --- a/src/mongo/db/process_health/fault_manager.cpp +++ b/src/mongo/db/process_health/fault_manager.cpp @@ -45,6 +45,7 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/util/exit_code.h" #include "mongo/util/future_util.h" namespace mongo { @@ -72,9 +73,6 @@ ServiceContext::ConstructorActionRegisterer faultManagerRegisterer{ } // namespace - -static constexpr auto kPeriodicHealthCheckInterval{Milliseconds(50)}; - FaultManager* FaultManager::get(ServiceContext* svcCtx) { return sFaultManager(svcCtx).get(); } @@ -112,16 +110,33 @@ FaultManager::TransientFaultDeadline::~TransientFaultDeadline() { FaultManager::FaultManager(ServiceContext* svcCtx, std::shared_ptr<executor::TaskExecutor> taskExecutor, std::unique_ptr<FaultManagerConfig> config) - : _config(std::move(config)), _svcCtx(svcCtx), _taskExecutor(taskExecutor) { + : _config(std::move(config)), + _svcCtx(svcCtx), + _taskExecutor(taskExecutor), + _crashCb([](std::string cause) { + LOGV2_ERROR(5936605, + "Fault manager progress monitor is terminating the server", + "cause"_attr = cause); + // This calls the exit_group syscall on Linux + ::_exit(ExitCode::EXIT_PROCESS_HEALTH_CHECK); + }) { invariant(_svcCtx); invariant(_svcCtx->getFastClockSource()); + invariant(_svcCtx->getPreciseClockSource()); } +FaultManager::FaultManager(ServiceContext* svcCtx, + std::shared_ptr<executor::TaskExecutor> taskExecutor, + std::unique_ptr<FaultManagerConfig> config, + std::function<void(std::string cause)> crashCb) + : _config(std::move(config)), _svcCtx(svcCtx), _taskExecutor(taskExecutor), _crashCb(crashCb) {} + void FaultManager::schedulePeriodicHealthCheckThread(bool immediately) { if (!feature_flags::gFeatureFlagHealthMonitoring) { return; } + const auto kPeriodicHealthCheckInterval = getConfig().getPeriodicHealthCheckInterval(); auto lk = stdx::lock_guard(_mutex); const auto cb = [this](const mongo::executor::TaskExecutor::CallbackArgs& cbData) { @@ -143,12 +158,23 @@ void FaultManager::schedulePeriodicHealthCheckThread(bool immediately) { } FaultManager::~FaultManager() { + _managerShuttingDownCancellationSource.cancel(); _taskExecutor->shutdown(); + + LOGV2(5936601, "Shutting down periodic health checks"); if (_periodicHealthCheckCbHandle) { _taskExecutor->cancel(*_periodicHealthCheckCbHandle); } + // All health checks must use the _taskExecutor, joining it + // should guarantee that health checks are done. Hovewer, if a health + // check is stuck in some blocking call the _progressMonitorThread will + // kill the process after timeout. _taskExecutor->join(); + // Must be joined after _taskExecutor. + if (_progressMonitor) { + _progressMonitor->join(); + } if (!_initialHealthCheckCompletedPromise.getFuture().isReady()) { _initialHealthCheckCompletedPromise.emplaceValue(); @@ -197,15 +223,16 @@ FaultFacetsContainerPtr FaultManager::getOrCreateFaultFacetsContainer() { void FaultManager::healthCheck() { // One time init. - _initHealthObserversIfNeeded(); + _firstTimeInitIfNeeded(); ON_BLOCK_EXIT([this] { schedulePeriodicHealthCheckThread(); }); std::vector<HealthObserver*> observers = FaultManager::getHealthObservers(); // Start checks outside of lock. + auto token = _managerShuttingDownCancellationSource.token(); for (auto observer : observers) { - observer->periodicCheck(*this, _taskExecutor); + observer->periodicCheck(*this, _taskExecutor, token); } // Garbage collect all resolved fault facets. @@ -270,6 +297,11 @@ bool FaultManager::hasCriticalFacet(const FaultInternal* fault) const { return false; } +FaultManagerConfig FaultManager::getConfig() const { + auto lk = stdx::lock_guard(_mutex); + return *_config; +} + void FaultManager::processFaultExistsEvent() { FaultState currentState = getFaultState(); @@ -344,20 +376,19 @@ void FaultManager::transitionToState(FaultState newState) { _currentState = newState; } -void FaultManager::_initHealthObserversIfNeeded() { - if (_initializedAllHealthObservers.load()) { +void FaultManager::_firstTimeInitIfNeeded() { + if (_firstTimeInitExecuted.load()) { return; } auto lk = stdx::lock_guard(_mutex); // One more time under lock to avoid race. - if (_initializedAllHealthObservers.load()) { + if (_firstTimeInitExecuted.load()) { return; } - _initializedAllHealthObservers.store(true); + _firstTimeInitExecuted.store(true); - _observers = HealthObserverRegistration::instantiateAllObservers(_svcCtx->getFastClockSource(), - _svcCtx->getTickSource()); + _observers = HealthObserverRegistration::instantiateAllObservers(_svcCtx); // Verify that all observer types are unique. std::set<FaultFacetType> allTypes; @@ -366,6 +397,9 @@ void FaultManager::_initHealthObserversIfNeeded() { } invariant(allTypes.size() == _observers.size()); + // Start the monitor thread after all observers are initialized. + _progressMonitor = std::make_unique<ProgressMonitor>(this, _svcCtx, _crashCb); + auto lk2 = stdx::lock_guard(_stateMutex); LOGV2(5956701, "Instantiated health observers, periodic health checking starts", @@ -384,5 +418,9 @@ std::vector<HealthObserver*> FaultManager::getHealthObservers() { return result; } +void FaultManager::progressMonitorCheckForTests(std::function<void(std::string cause)> crashCb) { + _progressMonitor->progressMonitorCheck(crashCb); +} + } // namespace process_health } // namespace mongo diff --git a/src/mongo/db/process_health/fault_manager.h b/src/mongo/db/process_health/fault_manager.h index 936f44c9b6e..bd9b3130f73 100644 --- a/src/mongo/db/process_health/fault_manager.h +++ b/src/mongo/db/process_health/fault_manager.h @@ -36,6 +36,7 @@ #include "mongo/db/process_health/fault_manager_config.h" #include "mongo/db/process_health/health_monitoring_server_parameters_gen.h" #include "mongo/db/process_health/health_observer.h" +#include "mongo/db/process_health/progress_monitor.h" #include "mongo/db/service_context.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/atomic_word.h" @@ -60,9 +61,14 @@ class FaultManager : protected FaultFacetsContainerFactory { public: // The taskExecutor provided should not be already started. - explicit FaultManager(ServiceContext* svcCtx, - std::shared_ptr<executor::TaskExecutor> taskExecutor, - std::unique_ptr<FaultManagerConfig> config); + FaultManager(ServiceContext* svcCtx, + std::shared_ptr<executor::TaskExecutor> taskExecutor, + std::unique_ptr<FaultManagerConfig> config); + // Variant with explicit crash callback for tests. + FaultManager(ServiceContext* svcCtx, + std::shared_ptr<executor::TaskExecutor> taskExecutor, + std::unique_ptr<FaultManagerConfig> config, + std::function<void(std::string cause)> crashCb); virtual ~FaultManager(); // Start periodic health checks, invoke it once during server startup. @@ -84,15 +90,18 @@ public: // Returns the current fault, if any. Otherwise returns an empty pointer. FaultConstPtr currentFault() const; + // All observers remain valid for the manager lifetime, thus returning + // just pointers is safe, as long as they are used while manager exists. + std::vector<HealthObserver*> getHealthObservers(); + + // Gets the aggregate configuration for all process health environment. + FaultManagerConfig getConfig() const; + protected: // Starts the health check sequence and updates the internal state on completion. // This is invoked by the internal timer. virtual void healthCheck(); - // All observers remain valid for the manager lifetime, thus returning - // just pointers is safe, as long as they are used while manager exists. - std::vector<HealthObserver*> getHealthObservers(); - // Protected interface FaultFacetsContainerFactory implementation. // The interface FaultFacetsContainerFactory is implemented by the member '_fault'. @@ -119,23 +128,29 @@ protected: // TODO: move this into fault class; refactor to remove FaultInternal bool hasCriticalFacet(const FaultInternal* fault) const; - std::unique_ptr<FaultManagerConfig> _config; + void progressMonitorCheckForTests(std::function<void(std::string cause)> crashCb); private: // One time init. - void _initHealthObserversIfNeeded(); + void _firstTimeInitIfNeeded(); + std::unique_ptr<FaultManagerConfig> _config; ServiceContext* const _svcCtx; + std::shared_ptr<executor::TaskExecutor> _taskExecutor; + // Callback used to crash the server. + std::function<void(std::string cause)> _crashCb; mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(5), "FaultManager::_mutex"); std::shared_ptr<FaultInternal> _fault; // We lazily init all health observers. - AtomicWord<bool> _initializedAllHealthObservers{false}; + AtomicWord<bool> _firstTimeInitExecuted{false}; + // This source is canceled before the _taskExecutor shutdown(). It + // can be used to check for the start of the shutdown sequence. + CancellationSource _managerShuttingDownCancellationSource; // Manager owns all observer instances. std::vector<std::unique_ptr<HealthObserver>> _observers; - std::shared_ptr<executor::TaskExecutor> _taskExecutor; boost::optional<executor::TaskExecutor::CallbackHandle> _periodicHealthCheckCbHandle; SharedPromise<void> _initialHealthCheckCompletedPromise; @@ -164,6 +179,8 @@ private: ExecutorFuture<void> activeFaultTransition; }; std::unique_ptr<TransientFaultDeadline> _transientFaultDeadline; + + std::unique_ptr<ProgressMonitor> _progressMonitor; }; } // namespace process_health diff --git a/src/mongo/db/process_health/fault_manager_config.h b/src/mongo/db/process_health/fault_manager_config.h index d127abc20a4..2c64565018f 100644 --- a/src/mongo/db/process_health/fault_manager_config.h +++ b/src/mongo/db/process_health/fault_manager_config.h @@ -98,10 +98,23 @@ public: MONGO_UNREACHABLE; } } - Milliseconds getActiveFaultDuration() { + + Milliseconds getActiveFaultDuration() const { return kActiveFaultDuration; } + Milliseconds getPeriodicHealthCheckInterval() const { + return Milliseconds(50); + } + + Milliseconds getPeriodicLivenessCheckInterval() const { + return Milliseconds(50); + } + + Seconds getPeriodicLivenessDeadline() const { + return Seconds(300); + } + protected: // If the server persists in TransientFault for more than this duration // it will move to the ActiveFault state and terminate. diff --git a/src/mongo/db/process_health/fault_manager_test.cpp b/src/mongo/db/process_health/fault_manager_test.cpp index 50cb1de29f8..d216206c505 100644 --- a/src/mongo/db/process_health/fault_manager_test.cpp +++ b/src/mongo/db/process_health/fault_manager_test.cpp @@ -49,9 +49,9 @@ TEST(FaultManagerTest, Registration) { // Tests the default health observer intensity of non-critical TEST_F(FaultManagerTest, GetHealthObserverIntensity) { auto config = manager().getConfig(); - ASSERT(config->getHealthObserverIntensity(FaultFacetType::kLdap) == + ASSERT(config.getHealthObserverIntensity(FaultFacetType::kLdap) == HealthObserverIntensityEnum::kNonCritical); - ASSERT(config->getHealthObserverIntensity(FaultFacetType::kDns) == + ASSERT(config.getHealthObserverIntensity(FaultFacetType::kDns) == HealthObserverIntensityEnum::kNonCritical); } diff --git a/src/mongo/db/process_health/fault_manager_test_suite.h b/src/mongo/db/process_health/fault_manager_test_suite.h index a7677967d0c..590ab2b1966 100644 --- a/src/mongo/db/process_health/fault_manager_test_suite.h +++ b/src/mongo/db/process_health/fault_manager_test_suite.h @@ -56,7 +56,13 @@ class FaultManagerTestImpl : public FaultManager { public: FaultManagerTestImpl(ServiceContext* svcCtx, std::shared_ptr<executor::TaskExecutor> taskExecutor) - : FaultManager(svcCtx, taskExecutor, std::make_unique<FaultManagerConfig>()) {} + : FaultManager( + svcCtx, taskExecutor, std::make_unique<FaultManagerConfig>(), [](std::string cause) { + // In tests, do not crash. + LOGV2(5936606, + "Fault manager progress monitor triggered the termination", + "cause"_attr = cause); + }) {} void transitionStateTest(FaultState newState) { transitionToState(newState); @@ -88,8 +94,12 @@ public: return *(static_cast<FaultInternal*>(fault.get())); } - FaultManagerConfig* getConfig() { - return _config.get(); + void progressMonitorCheckTest(std::function<void(std::string cause)> crashCb) { + progressMonitorCheckForTests(crashCb); + } + + FaultManagerConfig getConfigTest() { + return getConfig(); } }; @@ -104,6 +114,7 @@ public: _svcCtx = ServiceContext::make(); _svcCtx->setFastClockSource(std::make_unique<ClockSourceMock>()); + _svcCtx->setPreciseClockSource(std::make_unique<ClockSourceMock>()); _svcCtx->setTickSource(std::make_unique<TickSourceMock<Milliseconds>>()); resetManager(); @@ -130,9 +141,8 @@ public: void registerMockHealthObserver(FaultFacetType mockType, std::function<double()> getSeverityCallback) { HealthObserverRegistration::registerObserverFactory( - [mockType, getSeverityCallback](ClockSource* clockSource, TickSource* tickSource) { - return std::make_unique<HealthObserverMock>( - mockType, clockSource, tickSource, getSeverityCallback); + [mockType, getSeverityCallback](ServiceContext* svcCtx) { + return std::make_unique<HealthObserverMock>(mockType, svcCtx, getSeverityCallback); }); } @@ -144,6 +154,10 @@ public: return *static_cast<ClockSourceMock*>(_svcCtx->getFastClockSource()); } + ServiceContext* svcCtx() const { + return _svcCtx.get(); + } + TickSourceMock<Milliseconds>& tickSource() { return *static_cast<TickSourceMock<Milliseconds>*>(_svcCtx->getTickSource()); } @@ -152,7 +166,13 @@ public: void advanceTime(Duration d) { executor::NetworkInterfaceMock::InNetworkGuard guard(_net); _net->advanceTime(_net->now() + d); + advanceClockSourcesTime(d); + } + + template <typename Duration> + void advanceClockSourcesTime(Duration d) { clockSource().advance(d); + static_cast<ClockSourceMock*>(_svcCtx->getPreciseClockSource())->advance(d); tickSource().advance(d); } diff --git a/src/mongo/db/process_health/health_observer.h b/src/mongo/db/process_health/health_observer.h index c4e775be70c..40c9d379db7 100644 --- a/src/mongo/db/process_health/health_observer.h +++ b/src/mongo/db/process_health/health_observer.h @@ -32,11 +32,32 @@ #include "mongo/db/process_health/fault_facets_container.h" #include "mongo/db/process_health/fault_manager_config.h" #include "mongo/executor/task_executor.h" +#include "mongo/util/future.h" namespace mongo { namespace process_health { /** + * Liveness data and stats. + */ +struct HealthObserverLivenessStats { + // True if this observer is enabled. + bool isEnabled = false; + // true is this observer is currently running a health check. + bool currentlyRunningHealthCheck = false; + // When the last or current check started, depending if currently + // running one. + Date_t lastTimeCheckStarted = Date_t::max(); + // When the last check completed (not the current one). + Date_t lastTimeCheckCompleted = Date_t::max(); + // Incremented when a check is done. + int completedChecksCount = 0; + // Incremented when check completed with fault. + // This doesn't take into account critical vs non-critical. + int completedChecksWithFaultCount = 0; +}; + +/** * Interface to conduct periodic health checks. * Every instance of health observer is wired internally to update the state of the FaultManager * when a problem is detected. @@ -65,12 +86,15 @@ public: * @param factory Interface to get or create the factory of facets container. */ virtual void periodicCheck(FaultFacetsContainerFactory& factory, - std::shared_ptr<executor::TaskExecutor> taskExecutor) = 0; + std::shared_ptr<executor::TaskExecutor> taskExecutor, + CancellationToken token) = 0; /** * @return HealthObserverIntensity */ - virtual HealthObserverIntensity getIntensity() = 0; + virtual HealthObserverIntensity getIntensity() const = 0; + + virtual HealthObserverLivenessStats getStats() const = 0; }; } // namespace process_health diff --git a/src/mongo/db/process_health/health_observer_base.cpp b/src/mongo/db/process_health/health_observer_base.cpp index 918093b34b4..d3f6dd03c30 100644 --- a/src/mongo/db/process_health/health_observer_base.cpp +++ b/src/mongo/db/process_health/health_observer_base.cpp @@ -37,11 +37,11 @@ namespace mongo { namespace process_health { -HealthObserverBase::HealthObserverBase(ClockSource* clockSource, TickSource* tickSource) - : _clockSource(clockSource), _tickSource(tickSource) {} +HealthObserverBase::HealthObserverBase(ServiceContext* svcCtx) : _svcCtx(svcCtx) {} void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory, - std::shared_ptr<executor::TaskExecutor> taskExecutor) { + std::shared_ptr<executor::TaskExecutor> taskExecutor, + CancellationToken token) { // TODO(SERVER-59368): fix this for runtime options support. if (getIntensity() == HealthObserverIntensity::kOff) { return; @@ -53,10 +53,11 @@ void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory, return; } - if (_clockSource->now() - _lastTimeTheCheckWasRun < minimalCheckInterval()) { + const auto now = _svcCtx->getPreciseClockSource()->now(); + if (now - _lastTimeTheCheckWasRun < minimalCheckInterval()) { return; } - _lastTimeTheCheckWasRun = _clockSource->now(); + _lastTimeTheCheckWasRun = now; LOGV2_DEBUG(6007902, 2, "Start periodic health check", "observerType"_attr = getType()); @@ -64,10 +65,17 @@ void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory, } // Do the health check. - taskExecutor->schedule([this, &factory](Status status) { - periodicCheckImpl({}) + taskExecutor->schedule([this, &factory, token, taskExecutor](Status status) { + periodicCheckImpl({token, taskExecutor}) .then([this, &factory](HealthCheckStatus&& checkStatus) mutable { + const auto severity = checkStatus.getSeverity(); factory.updateWithCheckStatus(std::move(checkStatus)); + + auto lk = stdx::lock_guard(_mutex); + ++_completedChecksCount; + if (!HealthCheckStatus::isResolved(severity)) { + ++_completedChecksWithFaultCount; + } }) .onCompletion([this](Status status) { if (!status.isOK()) { @@ -75,15 +83,17 @@ void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory, LOGV2_ERROR( 6007901, "Unexpected failure during health check", "status"_attr = status); } + const auto now = _svcCtx->getPreciseClockSource()->now(); auto lk = stdx::lock_guard(_mutex); invariant(_currentlyRunningHealthCheck); _currentlyRunningHealthCheck = false; + _lastTimeCheckCompleted = now; }) .getAsync([this](Status status) {}); }); } -HealthObserverIntensity HealthObserverBase::getIntensity() { +HealthObserverIntensity HealthObserverBase::getIntensity() const { return _intensity; } @@ -107,5 +117,21 @@ HealthCheckStatus HealthObserverBase::makeSimpleFailedStatus(double severity, return HealthCheckStatus(getType(), severity, sb.stringData()); } +HealthObserverLivenessStats HealthObserverBase::getStats() const { + auto lk = stdx::lock_guard(_mutex); + return getStatsLocked(lk); +} + +HealthObserverLivenessStats HealthObserverBase::getStatsLocked(WithLock) const { + HealthObserverLivenessStats stats; + stats.isEnabled = _intensity != HealthObserverIntensity::kOff; + stats.currentlyRunningHealthCheck = _currentlyRunningHealthCheck; + stats.lastTimeCheckStarted = _lastTimeTheCheckWasRun; + stats.lastTimeCheckCompleted = _lastTimeCheckCompleted; + stats.completedChecksCount = _completedChecksCount; + stats.completedChecksWithFaultCount = _completedChecksWithFaultCount; + return stats; +} + } // namespace process_health } // namespace mongo diff --git a/src/mongo/db/process_health/health_observer_base.h b/src/mongo/db/process_health/health_observer_base.h index 5c227e6355c..382f237c6fa 100644 --- a/src/mongo/db/process_health/health_observer_base.h +++ b/src/mongo/db/process_health/health_observer_base.h @@ -42,15 +42,19 @@ namespace process_health { */ class HealthObserverBase : public HealthObserver { public: - HealthObserverBase(ClockSource* clockSource, TickSource* tickSource); + explicit HealthObserverBase(ServiceContext* svcCtx); virtual ~HealthObserverBase() = default; ClockSource* clockSource() const { - return _clockSource; + return _svcCtx->getPreciseClockSource(); } TickSource* tickSource() const { - return _tickSource; + return _svcCtx->getTickSource(); + } + + ServiceContext* svcCtx() const { + return _svcCtx; } /** @@ -63,10 +67,16 @@ public: // Implements the common logic for periodic checks. // Every observer should implement periodicCheckImpl() for specific tests. void periodicCheck(FaultFacetsContainerFactory& factory, - std::shared_ptr<executor::TaskExecutor> taskExecutor) override; + std::shared_ptr<executor::TaskExecutor> taskExecutor, + CancellationToken token) override; + + HealthObserverLivenessStats getStats() const override; protected: - struct PeriodicHealthCheckContext {}; + struct PeriodicHealthCheckContext { + CancellationToken cancellationToken; + std::shared_ptr<executor::TaskExecutor> taskExecutor; + }; /** * The main method every health observer should implement for a particular @@ -77,7 +87,7 @@ protected: virtual Future<HealthCheckStatus> periodicCheckImpl( PeriodicHealthCheckContext&& periodicCheckContext) = 0; - HealthObserverIntensity getIntensity() override; + HealthObserverIntensity getIntensity() const override; // Helper method to create a status without errors. HealthCheckStatus makeHealthyStatus() const; @@ -85,17 +95,21 @@ protected: // Make a generic error status. HealthCheckStatus makeSimpleFailedStatus(double severity, std::vector<Status>&& failures) const; - ClockSource* const _clockSource; - TickSource* const _tickSource; + HealthObserverLivenessStats getStatsLocked(WithLock) const; + + ServiceContext* const _svcCtx; mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "HealthObserverBase::_mutex"); - // Initially disable all observers until enabled by config options. - HealthObserverIntensity _intensity = HealthObserverIntensity::kOff; + // TODO: remove this field, should be done in config. + HealthObserverIntensity _intensity = HealthObserverIntensity::kNonCritical; // Indicates if there any check running to prevent running checks concurrently. bool _currentlyRunningHealthCheck = false; // Enforces the safety interval. Date_t _lastTimeTheCheckWasRun; + Date_t _lastTimeCheckCompleted; + int _completedChecksCount = 0; + int _completedChecksWithFaultCount = 0; }; } // namespace process_health diff --git a/src/mongo/db/process_health/health_observer_mock.h b/src/mongo/db/process_health/health_observer_mock.h index 96c984960ae..17516f13c98 100644 --- a/src/mongo/db/process_health/health_observer_mock.h +++ b/src/mongo/db/process_health/health_observer_mock.h @@ -43,10 +43,9 @@ namespace process_health { class HealthObserverMock : public HealthObserverBase { public: HealthObserverMock(FaultFacetType mockType, - ClockSource* clockSource, - TickSource* tickSource, + ServiceContext* svcCtx, std::function<double()> getSeverityCallback) - : HealthObserverBase(clockSource, tickSource), + : HealthObserverBase(svcCtx), _mockType(mockType), _getSeverityCallback(getSeverityCallback) {} @@ -64,14 +63,18 @@ protected: auto completionPf = makePromiseFuture<HealthCheckStatus>(); if (HealthCheckStatus::isResolved(severity)) { + LOGV2(5936603, "Mock health observer returns a resolved severity"); completionPf.promise.emplaceValue(HealthCheckStatus(getType())); } else { + LOGV2(5936604, + "Mock health observer returns a fault severity", + "severity"_attr = severity); completionPf.promise.emplaceValue(HealthCheckStatus(getType(), severity, "failed")); } return std::move(completionPf.future); } - HealthObserverIntensity getIntensity() override { + HealthObserverIntensity getIntensity() const override { return HealthObserverIntensity::kNonCritical; } diff --git a/src/mongo/db/process_health/health_observer_registration.cpp b/src/mongo/db/process_health/health_observer_registration.cpp index f59aa3b3155..a1da3e30f94 100644 --- a/src/mongo/db/process_health/health_observer_registration.cpp +++ b/src/mongo/db/process_health/health_observer_registration.cpp @@ -34,8 +34,8 @@ namespace process_health { namespace { -using HealthObserverFactoryCallback = std::function<std::unique_ptr<HealthObserver>( - ClockSource* clockSource, TickSource* tickSource)>; +using HealthObserverFactoryCallback = + std::function<std::unique_ptr<HealthObserver>(ServiceContext* svcCtx)>; // Returns static vector of all registrations. // No synchronization is required as all the factories are registered during @@ -49,16 +49,15 @@ std::vector<HealthObserverFactoryCallback>* getObserverFactories() { } // namespace void HealthObserverRegistration::registerObserverFactory( - std::function<std::unique_ptr<HealthObserver>(ClockSource* clockSource, TickSource* tickSource)> - factoryCallback) { + std::function<std::unique_ptr<HealthObserver>(ServiceContext* svcCtx)> factoryCallback) { getObserverFactories()->push_back(std::move(factoryCallback)); } std::vector<std::unique_ptr<HealthObserver>> HealthObserverRegistration::instantiateAllObservers( - ClockSource* clockSource, TickSource* tickSource) { + ServiceContext* svcCtx) { std::vector<std::unique_ptr<HealthObserver>> result; for (auto& cb : *getObserverFactories()) { - result.push_back(cb(clockSource, tickSource)); + result.push_back(cb(svcCtx)); } return result; } diff --git a/src/mongo/db/process_health/health_observer_registration.h b/src/mongo/db/process_health/health_observer_registration.h index b7c72bd2b7a..6606e13f05a 100644 --- a/src/mongo/db/process_health/health_observer_registration.h +++ b/src/mongo/db/process_health/health_observer_registration.h @@ -52,15 +52,14 @@ public: * @param factoryCallback creates observer instance when invoked. */ static void registerObserverFactory( - std::function<std::unique_ptr<HealthObserver>(ClockSource* clockSource, - TickSource* tickSource)> factoryCallback); + std::function<std::unique_ptr<HealthObserver>(ServiceContext* svcCtx)> factoryCallback); /** * Invokes all registered factories and returns new instances. * The ownership of all observers is transferred to the invoker. */ static std::vector<std::unique_ptr<HealthObserver>> instantiateAllObservers( - ClockSource* clockSource, TickSource* tickSource); + ServiceContext* svcCtx); /** * Test-only method to cleanup the list of registered factories. diff --git a/src/mongo/db/process_health/health_observer_test.cpp b/src/mongo/db/process_health/health_observer_test.cpp index 0e10890024c..9db3e4197b8 100644 --- a/src/mongo/db/process_health/health_observer_test.cpp +++ b/src/mongo/db/process_health/health_observer_test.cpp @@ -50,8 +50,7 @@ namespace { // by the instantiate method below will be greater than expected. TEST_F(FaultManagerTest, Registration) { registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0; }); - auto allObservers = - HealthObserverRegistration::instantiateAllObservers(&clockSource(), &tickSource()); + auto allObservers = HealthObserverRegistration::instantiateAllObservers(svcCtx()); ASSERT_EQ(1, allObservers.size()); ASSERT_EQ(FaultFacetType::kMock1, allObservers[0]->getType()); } @@ -70,7 +69,7 @@ TEST_F(FaultManagerTest, HealthCheckCreatesFacetOnHealthCheckFoundFault) { advanceTime(Milliseconds(100)); manager().healthCheckTest(); - waitForFaultBeingCreated(); + waitForTransitionIntoState(FaultState::kTransientFault); auto currentFault = manager().currentFault(); ASSERT_TRUE(currentFault); // Is created. } @@ -222,16 +221,58 @@ TEST_F(FaultManagerTest, InitialHealthCheckDoesNotRunIfFeatureFlagNotEnabled) { feature_flags::gFeatureFlagHealthMonitoring = true; } +TEST_F(FaultManagerTest, Stats) { + advanceTime(Milliseconds(100)); + registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; }); + waitForTransitionIntoState(FaultState::kTransientFault); + + auto observer = manager().getHealthObserversTest()[0]; + auto stats = observer->getStats(); + ASSERT_TRUE(stats.isEnabled); + ASSERT_FALSE(stats.currentlyRunningHealthCheck); + ASSERT_TRUE(stats.lastTimeCheckStarted >= clockSource().now()); + ASSERT_TRUE(stats.lastTimeCheckCompleted >= stats.lastTimeCheckStarted); + ASSERT_TRUE(stats.completedChecksCount >= 1); + ASSERT_TRUE(stats.completedChecksWithFaultCount >= 1); +} + +TEST_F(FaultManagerTest, ProgressMonitorCheck) { + AtomicWord<bool> shouldBlock{true}; + registerMockHealthObserver(FaultFacetType::kMock1, [this, &shouldBlock] { + while (shouldBlock.load()) { + sleepFor(Milliseconds(1)); + } + return 0.1; + }); + + // Health check should get stuck here. + manager().healthCheckTest(); + // Verify that the 'crash callback' is invoked after timeout. + bool crashTriggered = false; + std::function<void(std::string cause)> crashCb = [&crashTriggered](std::string) { + crashTriggered = true; + }; + manager().progressMonitorCheckTest(crashCb); + // The progress check passed because the simulated time did not advance. + ASSERT_FALSE(crashTriggered); + advanceClockSourcesTime(manager().getConfig().getPeriodicLivenessDeadline() + Seconds(1)); + manager().progressMonitorCheckTest(crashCb); + // The progress check simulated a crash. + ASSERT_TRUE(crashTriggered); + shouldBlock.store(false); + resetManager(); // Before fields above go out of scope. +} + TEST_F(FaultManagerTest, TransitionsToActiveFaultAfterTimeout) { registerMockHealthObserver(FaultFacetType::kMock1, [] { return 1.1; }); waitForTransitionIntoState(FaultState::kTransientFault); ASSERT_TRUE(manager().getFaultState() == FaultState::kTransientFault); - advanceTime(manager().getConfig()->getActiveFaultDuration() + Milliseconds(1)); + advanceTime(manager().getConfig().getActiveFaultDuration() + Milliseconds(1)); waitForTransitionIntoState(FaultState::kActiveFault); } TEST_F(FaultManagerTest, DoesNotTransitionToActiveFaultIfResolved) { - const auto activeFaultDuration = manager().getConfig()->getActiveFaultDuration(); + const auto activeFaultDuration = manager().getConfigTest().getActiveFaultDuration(); const auto start = clockSource().now(); // Initially unhealthy; Transitions to healthy before the active fault timeout. @@ -242,9 +283,8 @@ TEST_F(FaultManagerTest, DoesNotTransitionToActiveFaultIfResolved) { Milliseconds(durationCount<Milliseconds>(activeFaultDuration) / 4); if (elapsed < quarterActiveFaultDuration) { return 1.1; - } else { - return 0.0; } + return 0.0; }); waitForTransitionIntoState(FaultState::kTransientFault); assertSoonWithHealthCheck([this]() { return manager().getFaultState() == FaultState::kOk; }); diff --git a/src/mongo/db/process_health/progress_monitor.cpp b/src/mongo/db/process_health/progress_monitor.cpp new file mode 100644 index 00000000000..75fdd4f93c1 --- /dev/null +++ b/src/mongo/db/process_health/progress_monitor.cpp @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kProcessHealth + +#include "mongo/db/process_health/progress_monitor.h" + +#include "mongo/db/process_health/fault_manager.h" +#include "mongo/db/process_health/health_observer.h" +#include "mongo/logv2/log.h" + +namespace mongo { +namespace process_health { + +ProgressMonitor::ProgressMonitor(FaultManager* faultManager, + ServiceContext* svcCtx, + std::function<void(std::string cause)> crashCb) + : _faultManager(faultManager), _svcCtx(svcCtx), _crashCb(crashCb) { + // Start the monitor thread, this should happen after all observers are initialized. + _progressMonitorThread = stdx::thread([this] { _progressMonitorLoop(); }); +} + +void ProgressMonitor::join() { + _terminate.store(true); + + // The _progressMonitorThread is watching the _taskExecutor join() + // completion and thus can be joined only after the _taskExecutor completes. + LOGV2(5936602, "Stopping the periodic health checks liveness monitor"); + if (_progressMonitorThread.joinable()) { + _progressMonitorThread.join(); + } +} + +void ProgressMonitor::progressMonitorCheck(std::function<void(std::string cause)> crashCb) { + std::vector<HealthObserver*> observers = _faultManager->getHealthObservers(); + const auto now = _svcCtx->getPreciseClockSource()->now(); + std::vector<HealthObserver*> secondPass; + + // Check the liveness of every health observer. + for (auto observer : observers) { + const auto stats = observer->getStats(); + if (!stats.isEnabled) { + continue; + } + + // Special case: if the health observer is enabled but did not run + // for a very long time, it could be a race. We should check it later. + if (!stats.currentlyRunningHealthCheck && + now - stats.lastTimeCheckStarted > + _faultManager->getConfig().getPeriodicLivenessDeadline() * 2) { + secondPass.push_back(observer); + continue; + } + + if (stats.currentlyRunningHealthCheck && + now - stats.lastTimeCheckStarted > + _faultManager->getConfig().getPeriodicLivenessDeadline()) { + // Crash because this health checker is running for too long. + crashCb(str::stream() << "Health observer " << observer->getType() + << " is still running since " + << stats.lastTimeCheckStarted.toString()); + } + } + + if (secondPass.empty()) { + return; + } + // The observer is enabled but did not run for a while. Sleep two cycles + // and check again. Note: this should be rare. + sleepFor(_faultManager->getConfig().getPeriodicHealthCheckInterval() * 2); + for (auto observer : secondPass) { + const auto stats = observer->getStats(); + if (stats.isEnabled && !stats.currentlyRunningHealthCheck && + now - stats.lastTimeCheckStarted > + _faultManager->getConfig().getPeriodicLivenessDeadline() * 2) { + // Crash because this health checker was never started. + crashCb(str::stream() << "Health observer " << observer->getType() + << " did not run since " + << stats.lastTimeCheckStarted.toString()); + continue; + } + } +} + +void ProgressMonitor::_progressMonitorLoop() { + Client::initThread("Health checks progress monitor"_sd, _svcCtx, nullptr); + + while (!_terminate.load()) { + progressMonitorCheck(_crashCb); + + sleepFor(_faultManager->getConfig().getPeriodicLivenessCheckInterval()); + } +} + +} // namespace process_health +} // namespace mongo diff --git a/src/mongo/db/process_health/progress_monitor.h b/src/mongo/db/process_health/progress_monitor.h new file mode 100644 index 00000000000..9b7e1f79b4e --- /dev/null +++ b/src/mongo/db/process_health/progress_monitor.h @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include <functional> +#include <string> +#include <thread> + +#include "mongo/db/service_context.h" +#include "mongo/stdx/thread.h" + +namespace mongo { +namespace process_health { + +class FaultManager; + +/** + * Tracks that the health checks are invoked regularly and have progress. + */ +class ProgressMonitor { +public: + ProgressMonitor(FaultManager* faultManager, + ServiceContext* svcCtx, + std::function<void(std::string cause)> crashCb); + + // Signals that the monitoring can stop and blocks until the thread is joined. + // Invoked to signal that the task executor is joined. + void join(); + + // Checks that the health checks are invoked and are not stuck forever. Invokes the callback + // after timeout configured by options. + void progressMonitorCheck(std::function<void(std::string cause)> crashCb); + +private: + // Checks that the periodic health checks actually make progress. + void _progressMonitorLoop(); + + FaultManager* const _faultManager; + ServiceContext* const _svcCtx; + // Callback used to crash the server. + const std::function<void(std::string cause)> _crashCb; + // This flag is set after the _taskExecutor join() returns, thus + // we know no more health checks are still running. + AtomicWord<bool> _terminate{false}; + stdx::thread _progressMonitorThread; +}; + +} // namespace process_health +} // namespace mongo diff --git a/src/mongo/util/exit_code.h b/src/mongo/util/exit_code.h index c1bceb598ab..79024f32f86 100644 --- a/src/mongo/util/exit_code.h +++ b/src/mongo/util/exit_code.h @@ -56,7 +56,9 @@ enum ExitCode : int { EXIT_WATCHDOG = 61, // Internal Watchdog has terminated mongod EXIT_NEED_DOWNGRADE = 62, // The current binary version is not appropriate to run on the existing datafiles. - EXIT_UNCAUGHT = 100, // top level exception that wasn't caught + EXIT_THREAD_SANITIZER = 66, // Default Exit code for Thread Sanitizer failures + EXIT_PROCESS_HEALTH_CHECK = 67, // Process health check triggered the crash. + EXIT_UNCAUGHT = 100, // top level exception that wasn't caught EXIT_TEST = 101 }; |