diff options
author | Kshitij Gupta <kshitij.gupta@mongodb.com> | 2021-12-06 18:52:25 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-06 19:29:21 +0000 |
commit | ee87e655744dcd5fc7331489e621a794e2ad4141 (patch) | |
tree | acaea87ebdd021b28b92ef1e2dcd1750c59f65fd | |
parent | ea6a59377c01ed48157557aaaae0bd8191b7fa4e (diff) | |
download | mongo-ee87e655744dcd5fc7331489e621a794e2ad4141.tar.gz |
SERVER-59365: Use the new state machine.
Co-authored-by: Andrew Witten<andrew.witten@mongodb.com>
Co-authored-by: Lamont Nelson<lamont.nelson@mongodb.com>
17 files changed, 810 insertions, 625 deletions
diff --git a/src/mongo/db/process_health/fault.h b/src/mongo/db/process_health/fault.h index b0d8205879b..383c4121e5e 100644 --- a/src/mongo/db/process_health/fault.h +++ b/src/mongo/db/process_health/fault.h @@ -73,6 +73,12 @@ public: * Describes the current fault. */ virtual void appendDescription(BSONObjBuilder* builder) const = 0; + + BSONObj toBSON() const { + BSONObjBuilder builder; + appendDescription(&builder); + return builder.obj(); + } }; using FaultConstPtr = std::shared_ptr<const Fault>; diff --git a/src/mongo/db/process_health/fault_impl.cpp b/src/mongo/db/process_health/fault_impl.cpp index f25920b81fc..77ca9a82cc5 100644 --- a/src/mongo/db/process_health/fault_impl.cpp +++ b/src/mongo/db/process_health/fault_impl.cpp @@ -116,7 +116,13 @@ void FaultImpl::garbageCollectResolvedFacets() { _facets.end()); } -void FaultImpl::appendDescription(BSONObjBuilder* builder) const {} +void FaultImpl::appendDescription(BSONObjBuilder* builder) const { + builder->append("id", getId().toBSON()); + builder->append("severity", getSeverity()); + builder->append("duration", getDuration().toBSON()); + // TODO (SERVER-61914): Add fault facet details + builder->append("facets", static_cast<int>(_facets.size())); +} } // namespace process_health } // namespace mongo diff --git a/src/mongo/db/process_health/fault_manager.cpp b/src/mongo/db/process_health/fault_manager.cpp index 512b5a51b50..a9266e9d3b6 100644 --- a/src/mongo/db/process_health/fault_manager.cpp +++ b/src/mongo/db/process_health/fault_manager.cpp @@ -91,15 +91,16 @@ FaultManager::TransientFaultDeadline::TransientFaultDeadline( std::shared_ptr<executor::TaskExecutor> executor, Milliseconds timeout) : cancelActiveFaultTransition(CancellationSource()), - activeFaultTransition( - executor->sleepFor(timeout, cancelActiveFaultTransition.token()) - .thenRunOn(executor) - .then([faultManager]() { faultManager->transitionToState(FaultState::kActiveFault); }) - .onError([](Status status) { - LOGV2_WARNING(5937001, - "The Fault Manager transient fault deadline was disabled.", - "status"_attr = status); - })) {} + activeFaultTransition(executor->sleepFor(timeout, cancelActiveFaultTransition.token()) + .thenRunOn(executor) + .then([faultManager]() { faultManager->accept(boost::none); }) + .onError([](Status status) { + LOGV2_DEBUG( + 5937001, + 1, + "The Fault Manager transient fault deadline was disabled.", + "status"_attr = status); + })) {} FaultManager::TransientFaultDeadline::~TransientFaultDeadline() { if (!cancelActiveFaultTransition.token().isCanceled()) { @@ -110,7 +111,8 @@ FaultManager::TransientFaultDeadline::~TransientFaultDeadline() { FaultManager::FaultManager(ServiceContext* svcCtx, std::shared_ptr<executor::TaskExecutor> taskExecutor, std::unique_ptr<FaultManagerConfig> config) - : _config(std::move(config)), + : StateMachine(FaultState::kStartupCheck), + _config(std::move(config)), _svcCtx(svcCtx), _taskExecutor(taskExecutor), _crashCb([](std::string cause) { @@ -124,41 +126,206 @@ FaultManager::FaultManager(ServiceContext* svcCtx, invariant(_svcCtx->getFastClockSource()); invariant(_svcCtx->getPreciseClockSource()); _lastTransitionTime = _svcCtx->getFastClockSource()->now(); + setupStateMachine(); } FaultManager::FaultManager(ServiceContext* svcCtx, std::shared_ptr<executor::TaskExecutor> taskExecutor, std::unique_ptr<FaultManagerConfig> config, std::function<void(std::string cause)> crashCb) - : _config(std::move(config)), _svcCtx(svcCtx), _taskExecutor(taskExecutor), _crashCb(crashCb) { + : StateMachine(FaultState::kStartupCheck), + _config(std::move(config)), + _svcCtx(svcCtx), + _taskExecutor(taskExecutor), + _crashCb(crashCb) { _lastTransitionTime = _svcCtx->getFastClockSource()->now(); + setupStateMachine(); } -void FaultManager::schedulePeriodicHealthCheckThread(bool immediately) { - if (!feature_flags::gFeatureFlagHealthMonitoring.isEnabled( - serverGlobalParams.featureCompatibility)) { - return; +void FaultManager::setupStateMachine() { + validTransitions({ + {FaultState::kStartupCheck, {FaultState::kOk, FaultState::kActiveFault}}, + {FaultState::kOk, {FaultState::kTransientFault}}, + {FaultState::kTransientFault, {FaultState::kOk, FaultState::kActiveFault}}, + {FaultState::kActiveFault, {}}, + }); + + auto bindThis = [&](auto&& pmf) { return [=](auto&&... a) { return (this->*pmf)(a...); }; }; + + registerHandler(FaultState::kStartupCheck, bindThis(&FaultManager::handleStartupCheck)) + ->enter(bindThis(&FaultManager::logCurrentState)) + ->exit(bindThis(&FaultManager::setInitialHealthCheckComplete)); + + registerHandler(FaultState::kOk, bindThis(&FaultManager::handleOk)) + ->enter(bindThis(&FaultManager::clearTransientFaultDeadline)) + ->enter(bindThis(&FaultManager::logCurrentState)); + + registerHandler(FaultState::kTransientFault, bindThis(&FaultManager::handleTransientFault)) + ->enter(bindThis(&FaultManager::setTransientFaultDeadline)) + ->enter(bindThis(&FaultManager::logCurrentState)); + + registerHandler(FaultState::kActiveFault, + bindThis(&FaultManager::handleActiveFault), + true /* transient state */) + ->enter(bindThis(&FaultManager::logCurrentState)); + + start(); +} + +boost::optional<FaultState> FaultManager::handleStartupCheck(const OptionalMessageType& message) { + if (!message) { + return FaultState::kActiveFault; } - const auto kPeriodicHealthCheckInterval = getConfig().getPeriodicHealthCheckInterval(); - auto lk = stdx::lock_guard(_mutex); + HealthCheckStatus status = message.get(); + + auto activeObservers = getActiveHealthObservers(); + stdx::unordered_set<FaultFacetType> activeObserversTypes; + std::for_each(activeObservers.begin(), + activeObservers.end(), + [&activeObserversTypes](HealthObserver* observer) { + activeObserversTypes.insert(observer->getType()); + }); - const auto cb = [this](const mongo::executor::TaskExecutor::CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - return; + + auto lk = stdx::lock_guard(_stateMutex); + logMessageReceived(state(), status); + + if (status.isActiveFault()) { + _healthyObservations.erase(status.getType()); + } else { + _healthyObservations.insert(status.getType()); + } + + updateWithCheckStatus(HealthCheckStatus(status)); + auto optionalActiveFault = getFaultFacetsContainer(); + if (optionalActiveFault) { + optionalActiveFault->garbageCollectResolvedFacets(); + } + + if (optionalActiveFault && hasCriticalFacet(_fault.get()) && !_transientFaultDeadline) { + setTransientFaultDeadline( + FaultState::kStartupCheck, FaultState::kStartupCheck, boost::none); + } + + // If the whole fault becomes resolved, garbage collect it + // with proper locking. + std::shared_ptr<FaultInternal> faultToDelete; + { + auto lk = stdx::lock_guard(_mutex); + if (_fault && _fault->getFacets().empty()) { + faultToDelete.swap(_fault); } + } - healthCheck(); - }; + if (activeObserversTypes == _healthyObservations) { + return FaultState::kOk; + } + return boost::none; +} + +boost::optional<FaultState> FaultManager::handleOk(const OptionalMessageType& message) { + invariant(message); + + HealthCheckStatus status = message.get(); + auto lk = stdx::lock_guard(_stateMutex); + logMessageReceived(state(), status); + + if (_config->getHealthObserverIntensity(status.getType()) == + HealthObserverIntensityEnum::kOff) { + return boost::none; + } + + updateWithCheckStatus(HealthCheckStatus(status)); + + if (!HealthCheckStatus::isResolved(status.getSeverity())) { + return FaultState::kTransientFault; + } + + return boost::none; +} + +boost::optional<FaultState> FaultManager::handleTransientFault(const OptionalMessageType& message) { + if (!message) { + return FaultState::kActiveFault; + } + + HealthCheckStatus status = message.get(); + auto lk = stdx::lock_guard(_stateMutex); + logMessageReceived(state(), status); + + updateWithCheckStatus(HealthCheckStatus(status)); - auto periodicThreadCbHandleStatus = immediately - ? _taskExecutor->scheduleWork(cb) - : _taskExecutor->scheduleWorkAt(_taskExecutor->now() + kPeriodicHealthCheckInterval, cb); + auto optionalActiveFault = getFaultFacetsContainer(); + if (optionalActiveFault) { + optionalActiveFault->garbageCollectResolvedFacets(); + } + + // If the whole fault becomes resolved, garbage collect it + // with proper locking. + if (_fault && _fault->getFacets().empty()) { + _fault.reset(); + return FaultState::kOk; + } + return boost::none; +} - uassert(5936101, - "Failed to initialize periodic health check work.", - periodicThreadCbHandleStatus.isOK()); - _periodicHealthCheckCbHandle = periodicThreadCbHandleStatus.getValue(); +boost::optional<FaultState> FaultManager::handleActiveFault(const OptionalMessageType& message) { + LOGV2_FATAL(5936509, "Fault manager received active fault"); + return boost::none; +} + +void FaultManager::logMessageReceived(FaultState state, const HealthCheckStatus& status) { + LOGV2_DEBUG(5936504, + 1, + "Fault manager recieved health check result", + "state"_attr = (str::stream() << state), + "result"_attr = status, + "passed"_attr = (!status.isActiveFault(status.getSeverity()))); +} + +void FaultManager::logCurrentState(FaultState, FaultState newState, const OptionalMessageType&) { + LOGV2(5936503, "Fault manager changed state ", "state"_attr = (str::stream() << newState)); +} + +void FaultManager::setTransientFaultDeadline(FaultState, FaultState, const OptionalMessageType&) { + _transientFaultDeadline = std::make_unique<TransientFaultDeadline>( + this, _taskExecutor, _config->getActiveFaultDuration()); +} + +void FaultManager::clearTransientFaultDeadline(FaultState, FaultState, const OptionalMessageType&) { + _transientFaultDeadline = nullptr; +} + +void FaultManager::setInitialHealthCheckComplete(FaultState, + FaultState newState, + const OptionalMessageType&) { + LOGV2_DEBUG(5936502, + 0, + "The fault manager initial health checks have completed", + "state"_attr = (str::stream() << newState)); + _initialHealthCheckCompletedPromise.emplaceValue(); +} + +void FaultManager::schedulePeriodicHealthCheckThread() { + if (!feature_flags::gFeatureFlagHealthMonitoring.isEnabled( + serverGlobalParams.featureCompatibility) || + _config->periodicChecksDisabledForTests()) { + return; + } + + auto observers = getHealthObservers(); + for (auto observer : observers) { + LOGV2_DEBUG( + 59365, 1, "starting health observer", "observerType"_attr = observer->getType()); + + // TODO (SERVER-59368): The system should properly handle a health checker being turned + // on/off + auto token = _managerShuttingDownCancellationSource.token(); + if (_config->isHealthObserverEnabled(observer->getType())) { + healthCheck(observer, token); + } + } } FaultManager::~FaultManager() { @@ -166,8 +333,11 @@ FaultManager::~FaultManager() { _taskExecutor->shutdown(); LOGV2(5936601, "Shutting down periodic health checks"); - if (_periodicHealthCheckCbHandle) { - _taskExecutor->cancel(*_periodicHealthCheckCbHandle); + for (auto& pair : _healthCheckContexts) { + auto cbHandle = pair.second.resultStatus; + if (cbHandle) { + _taskExecutor->cancel(cbHandle.get()); + } } // All health checks must use the _taskExecutor, joining it @@ -181,32 +351,37 @@ FaultManager::~FaultManager() { } if (!_initialHealthCheckCompletedPromise.getFuture().isReady()) { - _initialHealthCheckCompletedPromise.emplaceValue(); + _initialHealthCheckCompletedPromise.setError( + {ErrorCodes::CommandFailed, "Fault manager failed initial health check"}); } LOGV2_DEBUG(6136801, 1, "Done shutting down periodic health checks"); } -void FaultManager::startPeriodicHealthChecks() { +SharedSemiFuture<void> FaultManager::startPeriodicHealthChecks() { if (!feature_flags::gFeatureFlagHealthMonitoring.isEnabled( serverGlobalParams.featureCompatibility)) { LOGV2_DEBUG(6187201, 1, "Health checks disabled by feature flag"); - return; + return Future<void>::makeReady(); } _taskExecutor->startup(); - invariant(getFaultState() == FaultState::kStartupCheck); - { - auto lk = stdx::lock_guard(_mutex); - invariant(!_periodicHealthCheckCbHandle); + invariant(state() == FaultState::kStartupCheck); + + _init(); + + if (getActiveHealthObservers().size() == 0) { + LOGV2_DEBUG(5936511, 2, "No active health observers are configured."); + setState(FaultState::kOk, HealthCheckStatus(FaultFacetType::kSystem)); + } else { + schedulePeriodicHealthCheckThread(); } - schedulePeriodicHealthCheckThread(true /* immediately */); - _initialHealthCheckCompletedPromise.getFuture().get(); + return _initialHealthCheckCompletedPromise.getFuture(); } FaultState FaultManager::getFaultState() const { stdx::lock_guard<Latch> lk(_stateMutex); - return _currentState; + return state(); } Date_t FaultManager::getLastTransitionTime() const { @@ -233,46 +408,72 @@ FaultFacetsContainerPtr FaultManager::getOrCreateFaultFacetsContainer() { return std::static_pointer_cast<FaultFacetsContainer>(_fault); } -void FaultManager::healthCheck() { - // One time init. - _firstTimeInitIfNeeded(); +void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token) { + auto schedulerCb = [this, observer, token] { + auto periodicThreadCbHandleStatus = this->_taskExecutor->scheduleWorkAt( + _taskExecutor->now() + this->_config->kPeriodicHealthCheckInterval, + [this, observer, token](const mongo::executor::TaskExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; + } + healthCheck(observer, token); + }); + + if (!periodicThreadCbHandleStatus.isOK()) { + if (ErrorCodes::isA<ErrorCategory::ShutdownError>( + periodicThreadCbHandleStatus.getStatus().code())) { + return; + } - ON_BLOCK_EXIT([this] { - if (!_config->periodicChecksDisabledForTests()) { - schedulePeriodicHealthCheckThread(); + uassert(5936101, + fmt::format("Failed to initialize periodic health check work. Reason: {}", + periodicThreadCbHandleStatus.getStatus().codeString()), + periodicThreadCbHandleStatus.isOK()); } - }); - - std::vector<HealthObserver*> observers = FaultManager::getHealthObservers(); - // Start checks outside of lock. - auto token = _managerShuttingDownCancellationSource.token(); - for (auto observer : observers) { - // TODO: SERVER-59368, fix bug where health observer is turned off when in transient fault - // state - if (_config->getHealthObserverIntensity(observer->getType()) != - HealthObserverIntensityEnum::kOff) - observer->periodicCheck(*this, _taskExecutor, token); - } + _healthCheckContexts.at(observer->getType()).resultStatus = + std::move(periodicThreadCbHandleStatus.getValue()); + }; - // Garbage collect all resolved fault facets. - auto optionalActiveFault = getFaultFacetsContainer(); - if (optionalActiveFault) { - optionalActiveFault->garbageCollectResolvedFacets(); - } + auto acceptNotOKStatus = [this, observer](Status s) { + auto healthCheckStatus = HealthCheckStatus(observer->getType(), 1.0, s.reason()); + LOGV2_ERROR( + 6007901, "Unexpected failure during health check", "status"_attr = healthCheckStatus); + this->accept(healthCheckStatus); + return healthCheckStatus; + }; - // If the whole fault becomes resolved, garbage collect it - // with proper locking. - std::shared_ptr<FaultInternal> faultToDelete; - { - auto lk = stdx::lock_guard(_mutex); - if (_fault && _fault->getFacets().empty()) { - faultToDelete.swap(_fault); - } + // If health observer is disabled, then do nothing and schedule another run (health observer may + // become enabled). + // TODO (SERVER-59368): The system should properly handle a health checker being turned on/off + if (!_config->isHealthObserverEnabled(observer->getType())) { + schedulerCb(); + return; } - // Actions above can result in a state change. - checkForStateTransition(); + _healthCheckContexts.insert({observer->getType(), HealthCheckContext(nullptr, boost::none)}); + // Run asynchronous health check. When complete, check for state transition (and perform if + // necessary). Then schedule the next run. + auto healthCheckFuture = observer->periodicCheck(*this, _taskExecutor, token) + .thenRunOn(_taskExecutor) + .onCompletion([this, acceptNotOKStatus, schedulerCb]( + StatusWith<HealthCheckStatus> status) { + ON_BLOCK_EXIT([this, schedulerCb]() { + if (!_config->periodicChecksDisabledForTests()) { + schedulerCb(); + } + }); + + if (!status.isOK()) { + return acceptNotOKStatus(status.getStatus()); + } + + this->accept(status.getValue()); + return status.getValue(); + }); + auto futurePtr = + std::make_unique<ExecutorFuture<HealthCheckStatus>>(std::move(healthCheckFuture)); + _healthCheckContexts.at(observer->getType()).result = std::move(futurePtr); } void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) { @@ -281,6 +482,7 @@ void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) { if (container) { container->updateWithSuppliedFacet(checkStatus.getType(), nullptr); } + return; } @@ -296,15 +498,6 @@ void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) { } } -void FaultManager::checkForStateTransition() { - FaultConstPtr fault = currentFault(); - if (fault && !HealthCheckStatus::isResolved(fault->getSeverity())) { - processFaultExistsEvent(); - } else if (!fault || HealthCheckStatus::isResolved(fault->getSeverity())) { - processFaultIsResolvedEvent(); - } -} - bool FaultManager::hasCriticalFacet(const FaultInternal* fault) const { invariant(fault); const auto& facets = fault->getFacets(); @@ -322,93 +515,8 @@ FaultManagerConfig FaultManager::getConfig() const { return *_config; } -void FaultManager::processFaultExistsEvent() { - FaultState currentState = getFaultState(); - - switch (currentState) { - case FaultState::kStartupCheck: - case FaultState::kOk: { - transitionToState(FaultState::kTransientFault); - if (hasCriticalFacet(_fault.get())) { - // This will transition the FaultManager to ActiveFault state after the timeout - // occurs. - _transientFaultDeadline = std::make_unique<TransientFaultDeadline>( - this, _taskExecutor, _config->getActiveFaultDuration()); - } - break; - } - case FaultState::kTransientFault: - case FaultState::kActiveFault: - // NOP. - break; - default: - MONGO_UNREACHABLE; - break; - } -} - -void FaultManager::processFaultIsResolvedEvent() { - FaultState currentState = getFaultState(); - - switch (currentState) { - case FaultState::kOk: - // NOP. - break; - case FaultState::kStartupCheck: - transitionToState(FaultState::kOk); - _initialHealthCheckCompletedPromise.emplaceValue(); - break; - case FaultState::kTransientFault: - // Clear the transient fault deadline timer. - _transientFaultDeadline.reset(); - transitionToState(FaultState::kOk); - break; - case FaultState::kActiveFault: - // Too late, this state cannot be resolved to Ok. - break; - default: - MONGO_UNREACHABLE; - break; - } -} - -void FaultManager::transitionToState(FaultState newState) { - // Maps currentState to valid newStates - static const stdx::unordered_map<FaultState, std::vector<FaultState>> validTransitions = { - {FaultState::kStartupCheck, {FaultState::kOk, FaultState::kTransientFault}}, - {FaultState::kOk, {FaultState::kTransientFault}}, - {FaultState::kTransientFault, {FaultState::kOk, FaultState::kActiveFault}}, - {FaultState::kActiveFault, {}}, - }; - - stdx::lock_guard<Latch> lk(_stateMutex); - const auto& validStates = validTransitions.at(_currentState); - auto validIt = std::find(validStates.begin(), validStates.end(), newState); - uassert(ErrorCodes::BadValue, - str::stream() << "Invalid fault manager transition from " << _currentState << " to " - << newState, - validIt != validStates.end()); - - LOGV2(5936201, - "Transitioned fault manager state", - "newState"_attr = str::stream() << newState, - "oldState"_attr = str::stream() << _currentState); - - _lastTransitionTime = _svcCtx->getFastClockSource()->now(); - _currentState = newState; -} - -void FaultManager::_firstTimeInitIfNeeded() { - if (_firstTimeInitExecuted.load()) { - return; - } - +void FaultManager::_init() { auto lk = stdx::lock_guard(_mutex); - // One more time under lock to avoid race. - if (_firstTimeInitExecuted.load()) { - return; - } - _firstTimeInitExecuted.store(true); _observers = HealthObserverRegistration::instantiateAllObservers(_svcCtx); @@ -425,7 +533,7 @@ void FaultManager::_firstTimeInitIfNeeded() { auto lk2 = stdx::lock_guard(_stateMutex); LOGV2(5956701, "Instantiated health observers, periodic health checking starts", - "managerState"_attr = _currentState, + "managerState"_attr = state(), "observersCount"_attr = _observers.size()); } @@ -440,6 +548,19 @@ std::vector<HealthObserver*> FaultManager::getHealthObservers() { return result; } +std::vector<HealthObserver*> FaultManager::getActiveHealthObservers() { + auto allObservers = getHealthObservers(); + std::vector<HealthObserver*> result; + result.reserve(allObservers.size()); + for (auto observer : allObservers) { + if (_config->getHealthObserverIntensity(observer->getType()) != + HealthObserverIntensityEnum::kOff) { + result.push_back(observer); + } + } + return result; +} + void FaultManager::progressMonitorCheckForTests(std::function<void(std::string cause)> crashCb) { _progressMonitor->progressMonitorCheck(crashCb); } diff --git a/src/mongo/db/process_health/fault_manager.h b/src/mongo/db/process_health/fault_manager.h index 1d89a98721f..597cfb54112 100644 --- a/src/mongo/db/process_health/fault_manager.h +++ b/src/mongo/db/process_health/fault_manager.h @@ -37,6 +37,7 @@ #include "mongo/db/process_health/health_monitoring_server_parameters_gen.h" #include "mongo/db/process_health/health_observer.h" #include "mongo/db/process_health/progress_monitor.h" +#include "mongo/db/process_health/state_machine.h" #include "mongo/db/service_context.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/atomic_word.h" @@ -55,7 +56,8 @@ namespace process_health { * * If an active fault state persists, FaultManager will terminate the server process. */ -class FaultManager : protected FaultFacetsContainerFactory { +class FaultManager : protected StateMachine<HealthCheckStatus, FaultState>, + protected FaultFacetsContainerFactory { FaultManager(const FaultManager&) = delete; FaultManager& operator=(const FaultManager&) = delete; @@ -71,12 +73,25 @@ public: std::function<void(std::string cause)> crashCb); virtual ~FaultManager(); + void setupStateMachine(); + + boost::optional<FaultState> handleStartupCheck(const OptionalMessageType& message); + boost::optional<FaultState> handleOk(const OptionalMessageType& message); + boost::optional<FaultState> handleTransientFault(const OptionalMessageType& message); + boost::optional<FaultState> handleActiveFault(const OptionalMessageType& message); + + void setInitialHealthCheckComplete(FaultState, FaultState, const OptionalMessageType&); + void logCurrentState(FaultState, FaultState, const OptionalMessageType&); + void logMessageReceived(FaultState state, const HealthCheckStatus& status); + void setTransientFaultDeadline(FaultState, FaultState, const OptionalMessageType&); + void clearTransientFaultDeadline(FaultState, FaultState, const OptionalMessageType&); + // Start periodic health checks, invoke it once during server startup. // It is unsafe to start health checks immediately during ServiceContext creation // because some ServiceContext fields might not be initialized yet. // Health checks cannot be stopped but could be effectively disabled with health-checker // specific flags. - void startPeriodicHealthChecks(); + SharedSemiFuture<void> startPeriodicHealthChecks(); static FaultManager* get(ServiceContext* svcCtx); @@ -101,9 +116,12 @@ public: Date_t getLastTransitionTime() const; protected: - // Starts the health check sequence and updates the internal state on completion. - // This is invoked by the internal timer. - virtual void healthCheck(); + // Returns all health observers not configured as Off + std::vector<HealthObserver*> getActiveHealthObservers(); + + // Runs a particular health observer. Then attempts to transition states. Then schedules next + // run. + virtual void healthCheck(HealthObserver* observer, CancellationToken token); // Protected interface FaultFacetsContainerFactory implementation. @@ -114,19 +132,7 @@ protected: void updateWithCheckStatus(HealthCheckStatus&& checkStatus) override; - // State machine related. - - void checkForStateTransition(); // Invoked by periodic thread. - - // Methods that represent particular events that trigger state transition. - void processFaultExistsEvent(); - void processFaultIsResolvedEvent(); - - // Makes a valid state transition or returns an error. - // State transition should be triggered by events above. - void transitionToState(FaultState newState); - - void schedulePeriodicHealthCheckThread(bool immediately = false); + void schedulePeriodicHealthCheckThread(); // TODO: move this into fault class; refactor to remove FaultInternal bool hasCriticalFacet(const FaultInternal* fault) const; @@ -135,7 +141,7 @@ protected: private: // One time init. - void _firstTimeInitIfNeeded(); + void _init(); std::unique_ptr<FaultManagerConfig> _config; ServiceContext* const _svcCtx; @@ -147,20 +153,16 @@ private: MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(5), "FaultManager::_mutex"); std::shared_ptr<FaultInternal> _fault; - // We lazily init all health observers. - AtomicWord<bool> _firstTimeInitExecuted{false}; // This source is canceled before the _taskExecutor shutdown(). It // can be used to check for the start of the shutdown sequence. CancellationSource _managerShuttingDownCancellationSource; // Manager owns all observer instances. std::vector<std::unique_ptr<HealthObserver>> _observers; - boost::optional<executor::TaskExecutor::CallbackHandle> _periodicHealthCheckCbHandle; SharedPromise<void> _initialHealthCheckCompletedPromise; // Protects the state below. mutable Mutex _stateMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "FaultManager::_stateMutex"); - FaultState _currentState = FaultState::kStartupCheck; Date_t _lastTransitionTime; @@ -184,6 +186,16 @@ private: std::unique_ptr<TransientFaultDeadline> _transientFaultDeadline; std::unique_ptr<ProgressMonitor> _progressMonitor; + stdx::unordered_set<FaultFacetType> _healthyObservations; + struct HealthCheckContext { + std::unique_ptr<ExecutorFuture<HealthCheckStatus>> result; + boost::optional<executor::TaskExecutor::CallbackHandle> resultStatus; + HealthCheckContext(std::unique_ptr<ExecutorFuture<HealthCheckStatus>> future, + boost::optional<executor::TaskExecutor::CallbackHandle> cbHandle) + : result(std::move(future)), resultStatus(cbHandle){}; + }; + + stdx::unordered_map<FaultFacetType, HealthCheckContext> _healthCheckContexts; }; } // namespace process_health diff --git a/src/mongo/db/process_health/fault_manager_config.h b/src/mongo/db/process_health/fault_manager_config.h index 251f7c1cf7f..830fb97ae46 100644 --- a/src/mongo/db/process_health/fault_manager_config.h +++ b/src/mongo/db/process_health/fault_manager_config.h @@ -42,10 +42,10 @@ namespace process_health { * Current fault state of the server in a simple actionable form. */ enum class FaultState { - kOk = 0, - // The manager conducts startup checks, new connections should be refused. - kStartupCheck, + kStartupCheck = 0, + + kOk, // The manager detected a fault, however the fault is either not severe // enough or is not observed for sufficiently long period of time. @@ -63,7 +63,7 @@ std::ostream& operator<<(std::ostream& os, const FaultState& state); /** * Types of health observers available. */ -enum class FaultFacetType { kMock1 = 0, kMock2, kLdap, kDns }; +enum class FaultFacetType { kSystem, kMock1, kMock2, kLdap, kDns }; class FaultManagerConfig { @@ -77,8 +77,13 @@ public: return intensities->_data->getLdap(); case FaultFacetType::kDns: return intensities->_data->getDns(); - // TODO: update this function with additional fault facets when they are added + // TODO: update this function with additional fault facets when they are added + case FaultFacetType::kSystem: + return HealthObserverIntensityEnum::kCritical; case FaultFacetType::kMock1: + if (_facetToIntensityMapForTest.contains(type)) { + return _facetToIntensityMapForTest.at(type); + } return HealthObserverIntensityEnum::kCritical; case FaultFacetType::kMock2: return HealthObserverIntensityEnum::kCritical; @@ -91,6 +96,10 @@ public: return getHealthObserverIntensity(type) != HealthObserverIntensityEnum::kOff; } + void setIntensityForType(FaultFacetType type, HealthObserverIntensityEnum intensity) { + _facetToIntensityMapForTest.insert({type, intensity}); + } + Milliseconds getActiveFaultDuration() const { return _activeFaultDuration; } @@ -135,6 +144,8 @@ private: bool _periodicChecksDisabledForTests = false; Milliseconds _activeFaultDuration = kActiveFaultDuration; + + stdx::unordered_map<FaultFacetType, HealthObserverIntensityEnum> _facetToIntensityMapForTest; }; } // namespace process_health diff --git a/src/mongo/db/process_health/fault_manager_test_suite.h b/src/mongo/db/process_health/fault_manager_test_suite.h index d45bc97bd1d..40667e394ce 100644 --- a/src/mongo/db/process_health/fault_manager_test_suite.h +++ b/src/mongo/db/process_health/fault_manager_test_suite.h @@ -80,26 +80,18 @@ public: "cause"_attr = cause); }) {} - void transitionStateTest(FaultState newState) { - transitionToState(newState); + void healthCheckTest(HealthObserver* observer, CancellationToken token) { + healthCheck(observer, token); } - void healthCheckTest() { - healthCheck(); + void schedulePeriodicHealthCheckThreadTest() { + schedulePeriodicHealthCheckThread(); } std::vector<HealthObserver*> getHealthObserversTest() { return getHealthObservers(); } - void processFaultExistsEventTest() { - processFaultExistsEvent(); - } - - void processFaultIsResolvedEventTest() { - return processFaultIsResolvedEvent(); - } - FaultFacetsContainerPtr getOrCreateFaultFacetsContainerTest() { return getOrCreateFaultFacetsContainer(); } @@ -117,6 +109,10 @@ public: FaultManagerConfig getConfigTest() { return getConfig(); } + + FaultState acceptTest(const HealthCheckStatus& message) { + return accept(message); + } }; /** @@ -125,7 +121,6 @@ public: class FaultManagerTest : public unittest::Test { public: void setUp() override { - RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; HealthObserverRegistration::resetObserverFactoriesForTest(); createServiceContextIfNeeded(); @@ -159,6 +154,11 @@ public: } void constructTaskExecutor() { + if (_executor) { + _executor->shutdown(); + _executor->join(); + } + auto network = std::shared_ptr<executor::NetworkInterface>( executor::makeNetworkInterface("FaultManagerTest").release()); ThreadPool::Options options; @@ -166,7 +166,6 @@ public: _executor = std::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(network)); - _executor->startup(); } void resetManager(std::unique_ptr<FaultManagerConfig> config = nullptr) { @@ -230,16 +229,6 @@ public: tickSource().advance(d); } - void assertInvalidStateTransition(FaultState newState) { - try { - manager().transitionStateTest(newState); - ASSERT(false); - } catch (const DBException& ex) { - ASSERT(ex.code() == ErrorCodes::BadValue); - // expected exception - } - } - static inline const Seconds kWaitTimeout{30}; static inline const Milliseconds kSleepTime{1}; void assertSoon(std::function<bool()> predicate, Milliseconds timeout = kWaitTimeout) { @@ -260,7 +249,7 @@ public: return true; else { advanceTime(kCheckTimeIncrement); - manager().healthCheckTest(); + manager().schedulePeriodicHealthCheckThreadTest(); return false; } }; diff --git a/src/mongo/db/process_health/fault_state_machine_test.cpp b/src/mongo/db/process_health/fault_state_machine_test.cpp index 9a8b6e62cf6..c1d3c5efc59 100644 --- a/src/mongo/db/process_health/fault_state_machine_test.cpp +++ b/src/mongo/db/process_health/fault_state_machine_test.cpp @@ -32,6 +32,7 @@ #include "mongo/db/process_health/fault_manager_test_suite.h" #include "mongo/db/process_health/health_check_status.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -43,171 +44,292 @@ using test::FaultManagerTestImpl; namespace { -std::shared_ptr<executor::ThreadPoolTaskExecutor> constructTaskExecutor() { - auto network = std::make_unique<executor::NetworkInterfaceMock>(); - auto executor = makeSharedThreadPoolTestExecutor(std::move(network)); - executor->startup(); - return executor; -} +TEST_F(FaultManagerTest, TransitionsFromStartupCheckToOkWhenAllObserversAreSuccessful) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0; }); + registerMockHealthObserver(FaultFacetType::kMock2, [] { return 0; }); -// State machine tests. -TEST_F(FaultManagerTest, StateTransitionsFromOk) { - auto serviceCtx = ServiceContext::make(); - std::vector<std::pair<FaultState, bool>> transitionValidPairs{ - {FaultState::kOk, false}, - {FaultState::kStartupCheck, false}, - {FaultState::kTransientFault, true}, - {FaultState::kActiveFault, false}}; - - for (auto& pair : transitionValidPairs) { - manager().transitionStateTest(FaultState::kOk); - - if (pair.second) { - manager().transitionStateTest(pair.first); - } else { - assertInvalidStateTransition(pair.first); - } + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + std::vector<FaultFacetType> faultFacetTypes{FaultFacetType::kMock1, FaultFacetType::kMock2}; - resetManager(); + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + + // send successful health check response from each + for (auto faultFacetType : faultFacetTypes) { + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + ASSERT(!hasFault()); + if (faultFacetType != faultFacetTypes.back()) { + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + } } + + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); + ASSERT(initialHealthCheckFuture.isReady()); } -TEST_F(FaultManagerTest, StateTransitionsFromStartupCheck) { - auto serviceCtx = ServiceContext::make(); - std::vector<std::pair<FaultState, bool>> transitionValidPairs{ - {FaultState::kOk, true}, - {FaultState::kStartupCheck, false}, - {FaultState::kTransientFault, true}, - {FaultState::kActiveFault, false}}; - - for (auto& pair : transitionValidPairs) { - if (pair.second) { - manager().transitionStateTest(pair.first); - } else { - assertInvalidStateTransition(pair.first); - } +TEST_F(FaultManagerTest, TransitionsFromStartupCheckToOkAfterFailureThenSuccess) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + const auto faultFacetType = FaultFacetType::kMock1; + registerMockHealthObserver(faultFacetType, [] { return 0; }); - resetManager(); - } + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + advanceTime(Milliseconds(100)); + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + ASSERT(hasFault()); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + ASSERT(!hasFault()); + + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); + ASSERT(initialHealthCheckFuture.isReady()); } -TEST_F(FaultManagerTest, StateTransitionsFromTransientFault) { - auto serviceCtx = ServiceContext::make(); - std::vector<std::pair<FaultState, bool>> transitionValidPairs{ - {FaultState::kOk, true}, - {FaultState::kStartupCheck, false}, - {FaultState::kTransientFault, false}, - {FaultState::kActiveFault, true}}; - - for (auto& pair : transitionValidPairs) { - manager().transitionStateTest(FaultState::kTransientFault); - - if (pair.second) { - manager().transitionStateTest(pair.first); - } else { - assertInvalidStateTransition(pair.first); - } +TEST_F(FaultManagerTest, TransitionsFromOkToTransientFaultAfterSuccessThenFailure) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; - resetManager(); - } + const auto faultFacetType = FaultFacetType::kMock1; + registerMockHealthObserver(faultFacetType, [] { return 0; }); + + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + ASSERT(!hasFault()); + + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); + ASSERT(initialHealthCheckFuture.isReady()); + + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + advanceTime(Milliseconds(100)); + assertSoon([this]() { + return hasFault() && manager().getFaultState() == FaultState::kTransientFault; + }); } -TEST_F(FaultManagerTest, StateTransitionsFromActiveFault) { - auto serviceCtx = ServiceContext::make(); - std::vector<std::pair<FaultState, bool>> transitionValidPairs{ - {FaultState::kOk, false}, - {FaultState::kStartupCheck, false}, - {FaultState::kTransientFault, false}, - {FaultState::kActiveFault, false}}; - - for (auto& pair : transitionValidPairs) { - manager().transitionStateTest(FaultState::kTransientFault); - manager().transitionStateTest(FaultState::kActiveFault); - - if (pair.second) { - manager().transitionStateTest(pair.first); - } else { - assertInvalidStateTransition(pair.first); - } +TEST_F(FaultManagerTest, StaysInOkOnSuccess) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + const auto faultFacetType = FaultFacetType::kMock1; + registerMockHealthObserver(faultFacetType, [] { return 0; }); - resetManager(); - } + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + ASSERT(!hasFault()); + + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); + ASSERT(initialHealthCheckFuture.isReady()); + + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + + ASSERT(!hasFault()); + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); } +TEST_F(FaultManagerTest, StaysInTransientFault) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + const auto faultFacetType = FaultFacetType::kMock1; + registerMockHealthObserver(faultFacetType, [] { return 0; }); -// State transitions triggered by events. -TEST_F(FaultManagerTest, EventsFromOk) { - std::vector<std::pair<std::function<void()>, FaultState>> validTransitions{ - {[this] { manager().processFaultIsResolvedEventTest(); }, FaultState::kOk}, - {[this] { - manager().getOrCreateFaultFacetsContainerTest(); - manager().processFaultExistsEventTest(); - }, - FaultState::kTransientFault}}; + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); - for (auto& pair : validTransitions) { - resetManager(); - manager().transitionStateTest(FaultState::kOk); + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + ASSERT(!hasFault()); - pair.first(); // Send event. - ASSERT_EQ(pair.second, manager().getFaultState()); - } + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); + ASSERT(initialHealthCheckFuture.isReady()); + + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + advanceTime(Milliseconds(100)); + assertSoon([this]() { + return hasFault() && manager().getFaultState() == FaultState::kTransientFault; + }); + + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + advanceTime(Milliseconds(100)); + assertSoon([this]() { + return hasFault() && manager().getFaultState() == FaultState::kTransientFault; + }); } -TEST_F(FaultManagerTest, EventsFromStartupCheck) { - std::vector<std::pair<std::function<void()>, FaultState>> validTransitions{ - {[this] { manager().processFaultIsResolvedEventTest(); }, FaultState::kOk}, - {[this] { - manager().getOrCreateFaultFacetsContainerTest(); - manager().processFaultExistsEventTest(); - }, - FaultState::kTransientFault}}; - - for (auto& pair : validTransitions) { - resetManager(); - ASSERT_EQ(FaultState::kStartupCheck, manager().getFaultState()); - - pair.first(); // Send event. - ASSERT_EQ(pair.second, manager().getFaultState()); - } +TEST_F(FaultManagerTest, TransitionsFromTransientFaultToOkOnFailureThenSuccess) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + const auto faultFacetType = FaultFacetType::kMock1; + registerMockHealthObserver(faultFacetType, [] { return 0; }); + + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + ASSERT(!hasFault()); + + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); + ASSERT(initialHealthCheckFuture.isReady()); + + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + advanceTime(Milliseconds(100)); + assertSoon([this]() { + return hasFault() && manager().getFaultState() == FaultState::kTransientFault; + }); + + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + + ASSERT(!hasFault()); + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); } -TEST_F(FaultManagerTest, EventsFromTransientFault) { - std::vector<std::pair<std::function<void()>, FaultState>> validTransitions{ - {[this] { manager().processFaultIsResolvedEventTest(); }, FaultState::kOk}, - {[this] { - manager().getOrCreateFaultFacetsContainerTest(); - manager().processFaultExistsEventTest(); - }, - FaultState::kTransientFault}}; - - for (auto& pair : validTransitions) { - resetManager(); - manager().transitionStateTest(FaultState::kTransientFault); - - pair.first(); // Send event. - ASSERT_EQ(pair.second, manager().getFaultState()); - } +TEST_F(FaultManagerTest, OneFacetIsResolved) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + registerMockHealthObserver(FaultFacetType::kMock1, [] { return 1.1; }); + registerMockHealthObserver(FaultFacetType::kMock2, [] { return 1.1; }); + + + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + manager().acceptTest(HealthCheckStatus(FaultFacetType::kMock1, 1.1, "failing health check 1")); + manager().acceptTest(HealthCheckStatus(FaultFacetType::kMock2, 1.1, "failing health check 2")); + assertSoon([this] { + return manager().getOrCreateFaultFacetsContainerTest()->getFacets().size() == 2; + }); + manager().acceptTest(HealthCheckStatus(FaultFacetType::kMock1)); + assertSoon([this] { + return manager().getOrCreateFaultFacetsContainerTest()->getFacets().front()->getType() == + FaultFacetType::kMock2; + }); + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); } -TEST_F(FaultManagerTest, EventsFromActiveFault) { - // No event can transition out of active fault. - std::vector<std::pair<std::function<void()>, FaultState>> validTransitions{ - {[this] { manager().processFaultIsResolvedEventTest(); }, FaultState::kActiveFault}, - {[this] { - manager().getOrCreateFaultFacetsContainerTest(); - manager().processFaultExistsEventTest(); - }, - FaultState::kActiveFault}}; - - for (auto& pair : validTransitions) { - resetManager(); - manager().transitionStateTest(FaultState::kTransientFault); - manager().transitionStateTest(FaultState::kActiveFault); - - pair.first(); // Send event. - ASSERT_EQ(pair.second, manager().getFaultState()); - } +DEATH_TEST_F(FaultManagerTest, TransitionsToActiveFaultAfterTimeoutFromTransientFault, "Fatal") { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + auto faultFacetType = FaultFacetType::kMock1; + auto config = test::getConfigWithDisabledPeriodicChecks(); + auto activeFaultDuration = Milliseconds(100); + config->setActiveFaultDurationForTests(activeFaultDuration); + resetManager(std::move(config)); + + registerMockHealthObserver(faultFacetType, [] { return 1.1; }); + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + ASSERT(manager().getFaultState() == FaultState::kOk); + + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + ASSERT(manager().getFaultState() == FaultState::kTransientFault); + + advanceTime(activeFaultDuration); + waitForTransitionIntoState(FaultState::kActiveFault); +} + +DEATH_TEST_F(FaultManagerTest, TransitionsToActiveFaultAfterTimeoutFromStartupCheck, "Fatal") { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + auto faultFacetType = FaultFacetType::kMock1; + auto config = test::getConfigWithDisabledPeriodicChecks(); + auto activeFaultDuration = Milliseconds(100); + config->setActiveFaultDurationForTests(activeFaultDuration); + resetManager(std::move(config)); + + registerMockHealthObserver(faultFacetType, [] { return 1.1; }); + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + + advanceTime(activeFaultDuration); + waitForTransitionIntoState(FaultState::kActiveFault); +} + +TEST_F(FaultManagerTest, DoesNotTransitionToActiveFaultIfResolved) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + auto faultFacetType = FaultFacetType::kMock1; + auto config = test::getConfigWithDisabledPeriodicChecks(); + auto activeFaultDuration = Milliseconds(100); + config->setActiveFaultDurationForTests(activeFaultDuration); + resetManager(std::move(config)); + + registerMockHealthObserver(faultFacetType, [] { return 1.1; }); + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + ASSERT(manager().getFaultState() == FaultState::kOk); + + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + ASSERT(manager().getFaultState() == FaultState::kTransientFault); + + advanceTime(activeFaultDuration / 2); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + + advanceTime(activeFaultDuration); + + ASSERT(manager().getFaultState() == FaultState::kOk); +} + +TEST_F(FaultManagerTest, HealthCheckWithOffFacetCreatesNoFault) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + const auto faultFacetType = FaultFacetType::kMock1; + auto config = std::make_unique<FaultManagerConfig>(); + config->setIntensityForType(faultFacetType, HealthObserverIntensityEnum::kOff); + resetManager(std::move(config)); + + registerMockHealthObserver(faultFacetType, [] { return 0; }); + + // Create another observer so that we don't skip the startup check state. + registerMockHealthObserver(FaultFacetType::kSystem, [] { return 0; }); + + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + advanceTime(Milliseconds(100)); + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); + ASSERT(initialHealthCheckFuture.isReady()); +} + +TEST_F(FaultManagerTest, AllOffFacetsSkipStartupCheck) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + const auto faultFacetType = FaultFacetType::kMock1; + auto config = std::make_unique<FaultManagerConfig>(); + config->setIntensityForType(faultFacetType, HealthObserverIntensityEnum::kOff); + resetManager(std::move(config)); + + registerMockHealthObserver(faultFacetType, [] { return 0; }); + + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); +} + +TEST_F(FaultManagerTest, HealthCheckWithOffFacetCreatesNoFaultInOk) { + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + const auto faultFacetType = FaultFacetType::kMock1; + auto config = std::make_unique<FaultManagerConfig>(); + config->disablePeriodicChecksForTests(); + auto configPtr = config.get(); + resetManager(std::move(config)); + + registerMockHealthObserver(faultFacetType, [] { return 0; }); + + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + + ASSERT(manager().getFaultState() == FaultState::kStartupCheck); + manager().acceptTest(HealthCheckStatus(faultFacetType)); + advanceTime(Milliseconds(100)); + ASSERT(!hasFault()); + + assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; }); + ASSERT(initialHealthCheckFuture.isReady()); + + configPtr->setIntensityForType(faultFacetType, HealthObserverIntensityEnum::kOff); + manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error")); + ASSERT(manager().getFaultState() == FaultState::kOk); } } // namespace diff --git a/src/mongo/db/process_health/health_check_status.h b/src/mongo/db/process_health/health_check_status.h index 826b82b16f0..da5284db13c 100644 --- a/src/mongo/db/process_health/health_check_status.h +++ b/src/mongo/db/process_health/health_check_status.h @@ -61,7 +61,9 @@ public: : _type(type), _severity(0), _description("resolved"_sd) {} HealthCheckStatus(const HealthCheckStatus&) = default; - HealthCheckStatus& operator=(const HealthCheckStatus&) = delete; + HealthCheckStatus& operator=(const HealthCheckStatus&) = default; + HealthCheckStatus(HealthCheckStatus&&) = default; + HealthCheckStatus& operator=(HealthCheckStatus&&) = default; /** * @return FaultFacetType of this status. @@ -101,16 +103,20 @@ public: static bool isActiveFault(double severity) { // Range is inclusive. - return severity >= kActiveFaultSeverity - kActiveFaultSeverityEpsilon; + return severity >= 1.0; + } + + bool isActiveFault() const { + return isActiveFault(getSeverity()); } private: friend std::ostream& operator<<(std::ostream&, const HealthCheckStatus&); friend StringBuilder& operator<<(StringBuilder& s, const HealthCheckStatus& hcs); - const FaultFacetType _type; - const double _severity; - const std::string _description; + FaultFacetType _type; + double _severity; + std::string _description; }; inline StringBuilder& operator<<(StringBuilder& s, const FaultFacetType& type) { diff --git a/src/mongo/db/process_health/health_observer.h b/src/mongo/db/process_health/health_observer.h index 319876e4017..9912a624272 100644 --- a/src/mongo/db/process_health/health_observer.h +++ b/src/mongo/db/process_health/health_observer.h @@ -83,9 +83,10 @@ public: * * @param factory Interface to get or create the factory of facets container. */ - virtual void periodicCheck(FaultFacetsContainerFactory& factory, - std::shared_ptr<executor::TaskExecutor> taskExecutor, - CancellationToken token) = 0; + virtual SharedSemiFuture<HealthCheckStatus> periodicCheck( + FaultFacetsContainerFactory& factory, + std::shared_ptr<executor::TaskExecutor> taskExecutor, + CancellationToken token) = 0; virtual HealthObserverLivenessStats getStats() const = 0; }; diff --git a/src/mongo/db/process_health/health_observer_base.cpp b/src/mongo/db/process_health/health_observer_base.cpp index dbe49da4ed4..a4faea9fc73 100644 --- a/src/mongo/db/process_health/health_observer_base.cpp +++ b/src/mongo/db/process_health/health_observer_base.cpp @@ -39,58 +39,46 @@ namespace process_health { HealthObserverBase::HealthObserverBase(ServiceContext* svcCtx) : _svcCtx(svcCtx) {} -void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory, - std::shared_ptr<executor::TaskExecutor> taskExecutor, - CancellationToken token) { +SharedSemiFuture<HealthCheckStatus> HealthObserverBase::periodicCheck( + FaultFacetsContainerFactory& factory, + std::shared_ptr<executor::TaskExecutor> taskExecutor, + CancellationToken token) { // If we have reached here, the intensity of this health observer must not be off { auto lk = stdx::lock_guard(_mutex); if (_currentlyRunningHealthCheck) { - return; + return _periodicCheckPromise->getFuture(); } + LOGV2_DEBUG(6007902, 2, "Start periodic health check", "observerType"_attr = getType()); const auto now = _svcCtx->getPreciseClockSource()->now(); - if (now - _lastTimeTheCheckWasRun < minimalCheckInterval()) { - LOGV2_DEBUG(6136802, - 3, - "Safety interval prevented new health check", - "observerType"_attr = getType()); - return; - } _lastTimeTheCheckWasRun = now; - - LOGV2_DEBUG(6007902, 2, "Start periodic health check", "observerType"_attr = getType()); - _currentlyRunningHealthCheck = true; + _periodicCheckPromise = std::make_unique<SharedPromise<HealthCheckStatus>>(); } - // Do the health check. - taskExecutor->schedule([this, &factory, token, taskExecutor](Status status) { + _periodicCheckPromise->setFrom( periodicCheckImpl({token, taskExecutor}) - .then([this, &factory](HealthCheckStatus&& checkStatus) mutable { - const auto severity = checkStatus.getSeverity(); - factory.updateWithCheckStatus(std::move(checkStatus)); - - auto lk = stdx::lock_guard(_mutex); - ++_completedChecksCount; - if (!HealthCheckStatus::isResolved(severity)) { - ++_completedChecksWithFaultCount; - } - }) - .onCompletion([this](Status status) { + .onCompletion([this](StatusWith<HealthCheckStatus> status) { if (!status.isOK()) { - // Health checkers should not throw, they should return FaultFacetPtr. - LOGV2_ERROR( - 6007901, "Unexpected failure during health check", "status"_attr = status); + return status; } + + auto healthStatus = status.getValue(); + const auto now = _svcCtx->getPreciseClockSource()->now(); auto lk = stdx::lock_guard(_mutex); + ++_completedChecksCount; invariant(_currentlyRunningHealthCheck); + if (!HealthCheckStatus::isResolved(healthStatus.getSeverity())) { + ++_completedChecksWithFaultCount; + } _currentlyRunningHealthCheck = false; _lastTimeCheckCompleted = now; - }) - .getAsync([this](Status status) {}); - }); + return status; + })); + + return _periodicCheckPromise->getFuture(); } HealthCheckStatus HealthObserverBase::makeHealthyStatus() const { diff --git a/src/mongo/db/process_health/health_observer_base.h b/src/mongo/db/process_health/health_observer_base.h index 99425808fab..f415a8e034a 100644 --- a/src/mongo/db/process_health/health_observer_base.h +++ b/src/mongo/db/process_health/health_observer_base.h @@ -66,9 +66,13 @@ public: // Implements the common logic for periodic checks. // Every observer should implement periodicCheckImpl() for specific tests. - void periodicCheck(FaultFacetsContainerFactory& factory, - std::shared_ptr<executor::TaskExecutor> taskExecutor, - CancellationToken token) override; + SharedSemiFuture<HealthCheckStatus> periodicCheck( + FaultFacetsContainerFactory& factory, + std::shared_ptr<executor::TaskExecutor> taskExecutor, + CancellationToken token) override; + + HealthCheckStatus makeHealthyStatus() const; + HealthCheckStatus makeSimpleFailedStatus(double severity, std::vector<Status>&& failures) const; HealthObserverLivenessStats getStats() const override; @@ -88,12 +92,6 @@ protected: virtual Future<HealthCheckStatus> periodicCheckImpl( PeriodicHealthCheckContext&& periodicCheckContext) = 0; - // Helper method to create a status without errors. - HealthCheckStatus makeHealthyStatus() const; - - // Make a generic error status. - HealthCheckStatus makeSimpleFailedStatus(double severity, std::vector<Status>&& failures) const; - HealthObserverLivenessStats getStatsLocked(WithLock) const; ServiceContext* const _svcCtx; @@ -103,6 +101,7 @@ protected: // Indicates if there any check running to prevent running checks concurrently. bool _currentlyRunningHealthCheck = false; + std::unique_ptr<SharedPromise<HealthCheckStatus>> _periodicCheckPromise; // Enforces the safety interval. Date_t _lastTimeTheCheckWasRun; Date_t _lastTimeCheckCompleted; diff --git a/src/mongo/db/process_health/health_observer_mock.h b/src/mongo/db/process_health/health_observer_mock.h index 1270af28cc3..230145bedf2 100644 --- a/src/mongo/db/process_health/health_observer_mock.h +++ b/src/mongo/db/process_health/health_observer_mock.h @@ -58,19 +58,28 @@ protected: Future<HealthCheckStatus> periodicCheckImpl( PeriodicHealthCheckContext&& periodicCheckContext) override { - // Detects mocked severity and handles it. - const double severity = _getSeverityCallback(); auto completionPf = makePromiseFuture<HealthCheckStatus>(); - if (HealthCheckStatus::isResolved(severity)) { - LOGV2(5936603, "Mock health observer returns a resolved severity"); - completionPf.promise.emplaceValue(HealthCheckStatus(getType())); - } else { - LOGV2(5936604, - "Mock health observer returns a fault severity", - "severity"_attr = severity); - completionPf.promise.emplaceValue(HealthCheckStatus(getType(), severity, "failed")); - } + + auto cbHandle = periodicCheckContext.taskExecutor->scheduleWork( + [this, promise = std::move(completionPf.promise)]( + const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { + try { + auto severity = _getSeverityCallback(); + if (HealthCheckStatus::isResolved(severity)) { + LOGV2(5936603, "Mock health observer returns a resolved severity"); + promise.emplaceValue(HealthCheckStatus(getType())); + } else { + LOGV2(5936604, + "Mock health observer returns a fault severity", + "severity"_attr = severity); + promise.emplaceValue(HealthCheckStatus(getType(), severity, "failed")); + } + } catch (const DBException& e) { + promise.setError(e.toStatus()); + } + }); + return std::move(completionPf.future); } diff --git a/src/mongo/db/process_health/health_observer_registration.cpp b/src/mongo/db/process_health/health_observer_registration.cpp index a1da3e30f94..12b0ff028a7 100644 --- a/src/mongo/db/process_health/health_observer_registration.cpp +++ b/src/mongo/db/process_health/health_observer_registration.cpp @@ -56,7 +56,8 @@ void HealthObserverRegistration::registerObserverFactory( std::vector<std::unique_ptr<HealthObserver>> HealthObserverRegistration::instantiateAllObservers( ServiceContext* svcCtx) { std::vector<std::unique_ptr<HealthObserver>> result; - for (auto& cb : *getObserverFactories()) { + auto factories = *getObserverFactories(); + for (auto& cb : factories) { result.push_back(cb(svcCtx)); } return result; diff --git a/src/mongo/db/process_health/health_observer_test.cpp b/src/mongo/db/process_health/health_observer_test.cpp index 820402994c6..55a9fa2c85e 100644 --- a/src/mongo/db/process_health/health_observer_test.cpp +++ b/src/mongo/db/process_health/health_observer_test.cpp @@ -55,164 +55,12 @@ TEST_F(FaultManagerTest, Registration) { ASSERT_EQ(FaultFacetType::kMock1, allObservers[0]->getType()); } -TEST_F(FaultManagerTest, HealthCheckCreatesObservers) { - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; }); - ASSERT_EQ(0, manager().getHealthObserversTest().size()); - - advanceTime(Milliseconds(100)); - manager().healthCheckTest(); - ASSERT_EQ(1, manager().getHealthObserversTest().size()); -} - -TEST_F(FaultManagerTest, HealthCheckCreatesFacetOnHealthCheckFoundFault) { - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; }); - - advanceTime(Milliseconds(100)); - manager().healthCheckTest(); - waitForTransitionIntoState(FaultState::kTransientFault); - auto currentFault = manager().currentFault(); - ASSERT_TRUE(currentFault); // Is created. -} - -TEST_F(FaultManagerTest, StateTransitionOnHealthCheckFoundFault) { - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; }); - ASSERT_EQ(FaultState::kStartupCheck, manager().getFaultState()); - - waitForTransitionIntoState(FaultState::kTransientFault); - ASSERT_EQ(FaultState::kTransientFault, manager().getFaultState()); -} - -TEST_F(FaultManagerTest, HealthCheckCreatesCorrectFacetOnHealthCheckFoundFault) { - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; }); - registerMockHealthObserver(FaultFacetType::kMock2, [] { return 0.0; }); - waitForTransitionIntoState(FaultState::kTransientFault); - - FaultInternal& internalFault = manager().getFault(); - ASSERT_TRUE(internalFault.getFaultFacet(FaultFacetType::kMock1)); - ASSERT_FALSE(internalFault.getFaultFacet(FaultFacetType::kMock2)); -} - -TEST_F(FaultManagerTest, SeverityIsMaxFromAllFacetsSeverity) { - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.8; }); - registerMockHealthObserver(FaultFacetType::kMock2, [] { return 0.5; }); - advanceTime(Milliseconds(100)); - manager().healthCheckTest(); - do { - waitForFaultBeingCreated(); - advanceTime(Milliseconds(100)); - manager().healthCheckTest(); - } while (manager().getFault().getFacets().size() != 2); // Race between two facets. - auto currentFault = manager().currentFault(); - - ASSERT_APPROX_EQUAL(0.8, currentFault->getSeverity(), 0.001); -} - -TEST_F(FaultManagerTest, HealthCheckCreatesFacetThenIsGarbageCollectedAndStateTransitioned) { - AtomicDouble severity{0.1}; - registerMockHealthObserver(FaultFacetType::kMock1, [&severity] { return severity.load(); }); - advanceTime(Milliseconds(100)); - manager().healthCheckTest(); - waitForFaultBeingCreated(); - ASSERT_TRUE(manager().currentFault()); // Is created. - - // Resolve and it should be garbage collected. - severity.store(0.0); - - waitForTransitionIntoState(FaultState::kOk); - - assertSoonWithHealthCheck([this]() { return !hasFault(); }); - - // State is transitioned. - ASSERT_EQ(FaultState::kOk, manager().getFaultState()); - resetManager(); // Before atomic fields above go out of scope. -} - -TEST_F(FaultManagerTest, HealthCheckCreates2FacetsThenIsGarbageCollected) { - AtomicDouble severity1{0.1}; - AtomicDouble severity2{0.1}; - registerMockHealthObserver(FaultFacetType::kMock1, [&severity1] { return severity1.load(); }); - registerMockHealthObserver(FaultFacetType::kMock2, [&severity2] { return severity2.load(); }); - manager().healthCheckTest(); - waitForFaultBeingCreated(); - - while (manager().getFault().getFacets().size() != 2) { - sleepFor(Milliseconds(1)); - advanceTime(Milliseconds(100)); - manager().healthCheckTest(); - } - - // Resolve one facet and it should be garbage collected. - severity1.store(0.0); - advanceTime(Milliseconds(100)); - manager().healthCheckTest(); - - FaultInternal& internalFault = manager().getFault(); - while (internalFault.getFaultFacet(FaultFacetType::kMock1)) { - sleepFor(Milliseconds(1)); - // Check is async, needs more turns for garbage collection to work. - advanceTime(Milliseconds(100)); - manager().healthCheckTest(); - } - ASSERT_FALSE(internalFault.getFaultFacet(FaultFacetType::kMock1)); - ASSERT_TRUE(internalFault.getFaultFacet(FaultFacetType::kMock2)); - resetManager(); // Before atomic fields above go out of scope. -} - -TEST_F(FaultManagerTest, HealthCheckWithOffFacetCreatesNoFault) { - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.0; }); - manager().healthCheckTest(); - waitForFaultBeingResolved(); - auto currentFault = manager().currentFault(); - ASSERT_TRUE(!currentFault); // Is not created. -} - -TEST_F(FaultManagerTest, DoesNotRestartCheckBeforeIntervalExpired) { - AtomicDouble severity{0.0}; - registerMockHealthObserver(FaultFacetType::kMock1, [&severity] { return severity.load(); }); - manager().healthCheckTest(); - waitForFaultBeingResolved(); - auto currentFault = manager().currentFault(); - ASSERT_TRUE(!currentFault); // Is not created. - - severity.store(0.1); - manager().healthCheckTest(); - currentFault = manager().currentFault(); - // The check did not run because the delay interval did not expire. - ASSERT_TRUE(!currentFault); - - advanceTime(Milliseconds(100)); - assertSoonWithHealthCheck([this]() { return hasFault(); }); - currentFault = manager().currentFault(); - ASSERT_TRUE(currentFault); // The fault was created. - resetManager(); // Before atomic fields above go out of scope. -} - -TEST_F(FaultManagerTest, HealthCheckWithCriticalFacetCreatesFault) { - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 1.1; }); - manager().healthCheckTest(); - waitForFaultBeingCreated(); - auto currentFault = manager().currentFault(); - ASSERT_TRUE(currentFault); -} - -TEST_F(FaultManagerTest, InitialHealthCheckDoesNotBlockIfTransitionToOkSucceeds) { - resetManager(); - RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; - - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.0; }); - manager().healthCheckTest(); - - auto currentFault = manager().currentFault(); - ASSERT_TRUE(!currentFault); // Is not created. - ASSERT_TRUE(manager().getFaultState() == FaultState::kOk); -} - TEST_F(FaultManagerTest, InitialHealthCheckDoesNotRunIfFeatureFlagNotEnabled) { resetManager(); RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", false}; registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.0; }); - manager().startPeriodicHealthChecks(); + static_cast<void>(manager().schedulePeriodicHealthCheckThreadTest()); auto currentFault = manager().currentFault(); ASSERT_TRUE(!currentFault); // Is not created. @@ -220,18 +68,40 @@ TEST_F(FaultManagerTest, InitialHealthCheckDoesNotRunIfFeatureFlagNotEnabled) { } TEST_F(FaultManagerTest, Stats) { - advanceTime(Milliseconds(100)); - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; }); - waitForTransitionIntoState(FaultState::kTransientFault); + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + auto faultFacetType = FaultFacetType::kMock1; + registerMockHealthObserver(faultFacetType, [] { return 0.1; }); + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); auto observer = manager().getHealthObserversTest()[0]; + manager().healthCheckTest(observer, CancellationToken::uncancelable()); + + assertSoon([this] { return static_cast<bool>(manager().currentFault()); }); assertSoon([&observer] { return !observer->getStats().currentlyRunningHealthCheck; }); + auto stats = observer->getStats(); ASSERT_TRUE(manager().getConfig().isHealthObserverEnabled(observer->getType())); - ASSERT_TRUE(stats.lastTimeCheckStarted >= clockSource().now()); - ASSERT_TRUE(stats.lastTimeCheckCompleted >= stats.lastTimeCheckStarted); - ASSERT_TRUE(stats.completedChecksCount >= 1); - ASSERT_TRUE(stats.completedChecksWithFaultCount >= 1); + ASSERT_EQ(stats.lastTimeCheckStarted, clockSource().now()); + ASSERT_EQ(stats.lastTimeCheckCompleted, stats.lastTimeCheckStarted); + ASSERT_GTE(stats.completedChecksCount, 1); + ASSERT_GTE(stats.completedChecksWithFaultCount, 1); + + // To complete initial health check. + manager().acceptTest(HealthCheckStatus(faultFacetType)); + + advanceTime(Milliseconds(200)); + auto prevStats = stats; + do { + manager().healthCheckTest(observer, CancellationToken::uncancelable()); + sleepmillis(1); + observer = manager().getHealthObserversTest()[0]; + stats = observer->getStats(); + } while (stats.completedChecksCount <= prevStats.completedChecksCount); + + ASSERT_GT(stats.lastTimeCheckStarted, prevStats.lastTimeCheckStarted); + ASSERT_GT(stats.lastTimeCheckCompleted, prevStats.lastTimeCheckCompleted); + ASSERT_GTE(stats.completedChecksCount, 2); + ASSERT_GTE(stats.completedChecksWithFaultCount, 2); } TEST_F(FaultManagerTest, ProgressMonitorCheck) { @@ -244,7 +114,11 @@ TEST_F(FaultManagerTest, ProgressMonitorCheck) { }); // Health check should get stuck here. - manager().healthCheckTest(); + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + auto observer = manager().getHealthObserversTest()[0]; + manager().healthCheckTest(observer, CancellationToken::uncancelable()); + // Verify that the 'crash callback' is invoked after timeout. bool crashTriggered = false; std::function<void(std::string cause)> crashCb = [&crashTriggered](std::string) { @@ -261,36 +135,39 @@ TEST_F(FaultManagerTest, ProgressMonitorCheck) { resetManager(); // Before fields above go out of scope. } -TEST_F(FaultManagerTest, TransitionsToActiveFaultAfterTimeout) { - auto config = test::getConfigWithDisabledPeriodicChecks(); - config->setActiveFaultDurationForTests(Milliseconds(10)); - resetManager(std::move(config)); - registerMockHealthObserver(FaultFacetType::kMock1, [] { return 1.1; }); - waitForTransitionIntoState(FaultState::kTransientFault); - ASSERT_TRUE(manager().getFaultState() == FaultState::kTransientFault); - advanceTime(Milliseconds(10)); - waitForTransitionIntoState(FaultState::kActiveFault); +TEST_F(FaultManagerTest, HealthCheckRunsPeriodically) { + resetManager(std::make_unique<FaultManagerConfig>()); + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + auto faultFacetType = FaultFacetType::kMock1; + int severity = 0; + registerMockHealthObserver(faultFacetType, [&severity] { return severity; }); + + assertSoon([this] { return (manager().getFaultState() == FaultState::kStartupCheck); }); + + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + assertSoon([this] { return (manager().getFaultState() == FaultState::kOk); }); + + severity = 1; + assertSoon([this] { return (manager().getFaultState() == FaultState::kTransientFault); }); + resetManager(); // Before fields above go out of scope. } -TEST_F(FaultManagerTest, DoesNotTransitionToActiveFaultIfResolved) { - const auto activeFaultDuration = manager().getConfigTest().getActiveFaultDuration(); - const auto start = clockSource().now(); +TEST_F(FaultManagerTest, PeriodicHealthCheckOnErrorMakesBadHealthStatus) { + resetManager(std::make_unique<FaultManagerConfig>()); + RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true}; + auto faultFacetType = FaultFacetType::kMock1; - // Initially unhealthy; Transitions to healthy before the active fault timeout. - registerMockHealthObserver(FaultFacetType::kMock1, [=] { - auto now = clockSource().now(); - auto elapsed = now - start; - auto quarterActiveFaultDuration = - Milliseconds(durationCount<Milliseconds>(activeFaultDuration) / 4); - if (elapsed < quarterActiveFaultDuration) { - return 1.1; - } - return 0.0; + registerMockHealthObserver(faultFacetType, [] { + uassert(ErrorCodes::InternalError, "test exception", false); + return 0.1; + }); + + ASSERT_TRUE(manager().getFaultState() == FaultState::kStartupCheck); + + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + assertSoon([this] { + return manager().currentFault() && manager().getFaultState() == FaultState::kStartupCheck; }); - waitForTransitionIntoState(FaultState::kTransientFault); - assertSoonWithHealthCheck([this]() { return manager().getFaultState() == FaultState::kOk; }); - advanceTime(activeFaultDuration); - assertSoonWithHealthCheck([this]() { return manager().getFaultState() == FaultState::kOk; }); } } // namespace diff --git a/src/mongo/db/process_health/state_machine.h b/src/mongo/db/process_health/state_machine.h index 7a658a434a8..3f5e09255d5 100644 --- a/src/mongo/db/process_health/state_machine.h +++ b/src/mongo/db/process_health/state_machine.h @@ -30,6 +30,7 @@ #include <vector> +#include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/functional.h" @@ -52,7 +53,7 @@ public: using StateCallback = unique_function<void(State, State, const OptionalMessageType&)>; // State machine accepts InputMessage and optionally transitions to state in the return value - using MessageHandler = unique_function<boost::optional<State>(InputMessage)>; + using MessageHandler = unique_function<boost::optional<State>(const OptionalMessageType&)>; using TransitionsContainer = stdx::unordered_map<State, std::vector<State>>; @@ -101,7 +102,8 @@ public: auto& initialState = getContextOrFatal(_initial); _current = &initialState; auto& handler = initialState.stateHandler; - handler->fireEnter(_current->state(), boost::none); + if (handler) + handler->fireEnter(_current->state(), boost::none); } // Define a valid transition. @@ -123,9 +125,19 @@ public: } // Accept message m, transition the state machine, and return the resulting state. - State accept(const InputMessage& m) { + // Upon the transition to the new state the state machine will call any registered hooks. + // + // In order to avoid deadlock while calling this function, authors should ensure + // that: + // 1. A recursive call only occurs from the current thread; or + // 2. For any hooks run as a result of accepting this message, no blocking calls are made + // involving shared resources with another thread that may call this function. + State accept(const OptionalMessageType& m) { tassertStarted(); + stdx::lock_guard<stdx::recursive_mutex> lk(_mutex); + auto& handler = _current->stateHandler; + auto result = handler->accept(m); if (result) { setState(*result, m); @@ -160,7 +172,7 @@ public: // Accepts input message m when state machine is in state _state. Optionally, the // state machine transitions to the state specified in the return value. Entry and exit // hooks will not fire if this method returns boost::none. - virtual boost::optional<State> accept(const InputMessage& m) noexcept = 0; + virtual boost::optional<State> accept(const OptionalMessageType& message) noexcept = 0; // The state this handler is defined for State state() const { @@ -187,6 +199,8 @@ public: cb(_state, newState, message); } + bool _isTransient = false; + protected: // The state we are handling const State _state; @@ -204,7 +218,7 @@ public: : StateHandler(state), _messageHandler(std::move(m)) {} ~LambdaStateHandler() override {} - boost::optional<State> accept(const InputMessage& m) noexcept override { + boost::optional<State> accept(const OptionalMessageType& m) noexcept override { return _messageHandler(m); } @@ -219,13 +233,22 @@ public: return context.stateHandler.get(); } - StateEventRegistryPtr registerHandler(State s, MessageHandler&& handler) { + StateEventRegistryPtr registerHandler(State s, MessageHandler&& handler, bool isTransient) { tassertNotStarted(); + auto& context = _states[s]; context.stateHandler = std::make_unique<LambdaStateHandler>(s, std::move(handler)); + if (isTransient) { + context.stateHandler->_isTransient = true; + } + return context.stateHandler.get(); } + StateEventRegistryPtr registerHandler(State s, MessageHandler&& handler) { + return registerHandler(s, std::move(handler), false); + } + protected: struct StateContext { StateHandlerPtr stateHandler; @@ -259,6 +282,10 @@ protected: // fire entry hooks for new state _current->stateHandler->fireEnter(previousContext.state(), message); + + if (_current->stateHandler->_isTransient) { + accept(message); + } } StateHandler* getHandlerOrFatal(State s) { @@ -281,6 +308,7 @@ protected: return getHandlerOrFatal(s); } + stdx::recursive_mutex _mutex; bool _started; State _initial; StateContext* _current = nullptr; diff --git a/src/mongo/db/process_health/state_machine_test.cpp b/src/mongo/db/process_health/state_machine_test.cpp index c7c3e48115f..f974a4adf8c 100644 --- a/src/mongo/db/process_health/state_machine_test.cpp +++ b/src/mongo/db/process_health/state_machine_test.cpp @@ -58,8 +58,8 @@ public: public: using StateMachineTest::StateHandler::StateHandler; - boost::optional<MachineState> accept(const Message& m) noexcept override { - return m.nextState; + boost::optional<MachineState> accept(const SM::OptionalMessageType& m) noexcept override { + return m->nextState; } }; @@ -95,7 +95,8 @@ TEST_F(StateMachineTestFixture, RegistersMessageHandlerAndStateHooksFluently) { }; subject() - ->registerHandler(MachineState::B, [](const Message& m) { return m.nextState; }) + ->registerHandler(MachineState::B, + [](const SM::OptionalMessageType& m) { return m->nextState; }) // hooks registerd for state B ->enter(hook()) ->exit(hook()); diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp index 7b4f3e9351a..3d65ee6c0d4 100644 --- a/src/mongo/s/mongos_main.cpp +++ b/src/mongo/s/mongos_main.cpp @@ -758,7 +758,15 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { PeriodicTask::startRunningPeriodicTasks(); - process_health::FaultManager::get(serviceContext)->startPeriodicHealthChecks(); + status = + process_health::FaultManager::get(serviceContext)->startPeriodicHealthChecks().getNoThrow(); + if (!status.isOK()) { + LOGV2_ERROR(5936510, + "Error completing initial health check: {error}", + "Error completing initial health check", + "error"_attr = redact(status)); + return EXIT_PROCESS_HEALTH_CHECK; + } SessionKiller::set(serviceContext, std::make_shared<SessionKiller>(serviceContext, killSessionsRemote)); |