summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKshitij Gupta <kshitij.gupta@mongodb.com>2021-12-06 18:52:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-06 19:29:21 +0000
commitee87e655744dcd5fc7331489e621a794e2ad4141 (patch)
treeacaea87ebdd021b28b92ef1e2dcd1750c59f65fd
parentea6a59377c01ed48157557aaaae0bd8191b7fa4e (diff)
downloadmongo-ee87e655744dcd5fc7331489e621a794e2ad4141.tar.gz
SERVER-59365: Use the new state machine.
Co-authored-by: Andrew Witten<andrew.witten@mongodb.com> Co-authored-by: Lamont Nelson<lamont.nelson@mongodb.com>
-rw-r--r--src/mongo/db/process_health/fault.h6
-rw-r--r--src/mongo/db/process_health/fault_impl.cpp8
-rw-r--r--src/mongo/db/process_health/fault_manager.cpp461
-rw-r--r--src/mongo/db/process_health/fault_manager.h58
-rw-r--r--src/mongo/db/process_health/fault_manager_config.h21
-rw-r--r--src/mongo/db/process_health/fault_manager_test_suite.h39
-rw-r--r--src/mongo/db/process_health/fault_state_machine_test.cpp406
-rw-r--r--src/mongo/db/process_health/health_check_status.h16
-rw-r--r--src/mongo/db/process_health/health_observer.h7
-rw-r--r--src/mongo/db/process_health/health_observer_base.cpp54
-rw-r--r--src/mongo/db/process_health/health_observer_base.h17
-rw-r--r--src/mongo/db/process_health/health_observer_mock.h31
-rw-r--r--src/mongo/db/process_health/health_observer_registration.cpp3
-rw-r--r--src/mongo/db/process_health/health_observer_test.cpp251
-rw-r--r--src/mongo/db/process_health/state_machine.h40
-rw-r--r--src/mongo/db/process_health/state_machine_test.cpp7
-rw-r--r--src/mongo/s/mongos_main.cpp10
17 files changed, 810 insertions, 625 deletions
diff --git a/src/mongo/db/process_health/fault.h b/src/mongo/db/process_health/fault.h
index b0d8205879b..383c4121e5e 100644
--- a/src/mongo/db/process_health/fault.h
+++ b/src/mongo/db/process_health/fault.h
@@ -73,6 +73,12 @@ public:
* Describes the current fault.
*/
virtual void appendDescription(BSONObjBuilder* builder) const = 0;
+
+ BSONObj toBSON() const {
+ BSONObjBuilder builder;
+ appendDescription(&builder);
+ return builder.obj();
+ }
};
using FaultConstPtr = std::shared_ptr<const Fault>;
diff --git a/src/mongo/db/process_health/fault_impl.cpp b/src/mongo/db/process_health/fault_impl.cpp
index f25920b81fc..77ca9a82cc5 100644
--- a/src/mongo/db/process_health/fault_impl.cpp
+++ b/src/mongo/db/process_health/fault_impl.cpp
@@ -116,7 +116,13 @@ void FaultImpl::garbageCollectResolvedFacets() {
_facets.end());
}
-void FaultImpl::appendDescription(BSONObjBuilder* builder) const {}
+void FaultImpl::appendDescription(BSONObjBuilder* builder) const {
+ builder->append("id", getId().toBSON());
+ builder->append("severity", getSeverity());
+ builder->append("duration", getDuration().toBSON());
+ // TODO (SERVER-61914): Add fault facet details
+ builder->append("facets", static_cast<int>(_facets.size()));
+}
} // namespace process_health
} // namespace mongo
diff --git a/src/mongo/db/process_health/fault_manager.cpp b/src/mongo/db/process_health/fault_manager.cpp
index 512b5a51b50..a9266e9d3b6 100644
--- a/src/mongo/db/process_health/fault_manager.cpp
+++ b/src/mongo/db/process_health/fault_manager.cpp
@@ -91,15 +91,16 @@ FaultManager::TransientFaultDeadline::TransientFaultDeadline(
std::shared_ptr<executor::TaskExecutor> executor,
Milliseconds timeout)
: cancelActiveFaultTransition(CancellationSource()),
- activeFaultTransition(
- executor->sleepFor(timeout, cancelActiveFaultTransition.token())
- .thenRunOn(executor)
- .then([faultManager]() { faultManager->transitionToState(FaultState::kActiveFault); })
- .onError([](Status status) {
- LOGV2_WARNING(5937001,
- "The Fault Manager transient fault deadline was disabled.",
- "status"_attr = status);
- })) {}
+ activeFaultTransition(executor->sleepFor(timeout, cancelActiveFaultTransition.token())
+ .thenRunOn(executor)
+ .then([faultManager]() { faultManager->accept(boost::none); })
+ .onError([](Status status) {
+ LOGV2_DEBUG(
+ 5937001,
+ 1,
+ "The Fault Manager transient fault deadline was disabled.",
+ "status"_attr = status);
+ })) {}
FaultManager::TransientFaultDeadline::~TransientFaultDeadline() {
if (!cancelActiveFaultTransition.token().isCanceled()) {
@@ -110,7 +111,8 @@ FaultManager::TransientFaultDeadline::~TransientFaultDeadline() {
FaultManager::FaultManager(ServiceContext* svcCtx,
std::shared_ptr<executor::TaskExecutor> taskExecutor,
std::unique_ptr<FaultManagerConfig> config)
- : _config(std::move(config)),
+ : StateMachine(FaultState::kStartupCheck),
+ _config(std::move(config)),
_svcCtx(svcCtx),
_taskExecutor(taskExecutor),
_crashCb([](std::string cause) {
@@ -124,41 +126,206 @@ FaultManager::FaultManager(ServiceContext* svcCtx,
invariant(_svcCtx->getFastClockSource());
invariant(_svcCtx->getPreciseClockSource());
_lastTransitionTime = _svcCtx->getFastClockSource()->now();
+ setupStateMachine();
}
FaultManager::FaultManager(ServiceContext* svcCtx,
std::shared_ptr<executor::TaskExecutor> taskExecutor,
std::unique_ptr<FaultManagerConfig> config,
std::function<void(std::string cause)> crashCb)
- : _config(std::move(config)), _svcCtx(svcCtx), _taskExecutor(taskExecutor), _crashCb(crashCb) {
+ : StateMachine(FaultState::kStartupCheck),
+ _config(std::move(config)),
+ _svcCtx(svcCtx),
+ _taskExecutor(taskExecutor),
+ _crashCb(crashCb) {
_lastTransitionTime = _svcCtx->getFastClockSource()->now();
+ setupStateMachine();
}
-void FaultManager::schedulePeriodicHealthCheckThread(bool immediately) {
- if (!feature_flags::gFeatureFlagHealthMonitoring.isEnabled(
- serverGlobalParams.featureCompatibility)) {
- return;
+void FaultManager::setupStateMachine() {
+ validTransitions({
+ {FaultState::kStartupCheck, {FaultState::kOk, FaultState::kActiveFault}},
+ {FaultState::kOk, {FaultState::kTransientFault}},
+ {FaultState::kTransientFault, {FaultState::kOk, FaultState::kActiveFault}},
+ {FaultState::kActiveFault, {}},
+ });
+
+ auto bindThis = [&](auto&& pmf) { return [=](auto&&... a) { return (this->*pmf)(a...); }; };
+
+ registerHandler(FaultState::kStartupCheck, bindThis(&FaultManager::handleStartupCheck))
+ ->enter(bindThis(&FaultManager::logCurrentState))
+ ->exit(bindThis(&FaultManager::setInitialHealthCheckComplete));
+
+ registerHandler(FaultState::kOk, bindThis(&FaultManager::handleOk))
+ ->enter(bindThis(&FaultManager::clearTransientFaultDeadline))
+ ->enter(bindThis(&FaultManager::logCurrentState));
+
+ registerHandler(FaultState::kTransientFault, bindThis(&FaultManager::handleTransientFault))
+ ->enter(bindThis(&FaultManager::setTransientFaultDeadline))
+ ->enter(bindThis(&FaultManager::logCurrentState));
+
+ registerHandler(FaultState::kActiveFault,
+ bindThis(&FaultManager::handleActiveFault),
+ true /* transient state */)
+ ->enter(bindThis(&FaultManager::logCurrentState));
+
+ start();
+}
+
+boost::optional<FaultState> FaultManager::handleStartupCheck(const OptionalMessageType& message) {
+ if (!message) {
+ return FaultState::kActiveFault;
}
- const auto kPeriodicHealthCheckInterval = getConfig().getPeriodicHealthCheckInterval();
- auto lk = stdx::lock_guard(_mutex);
+ HealthCheckStatus status = message.get();
+
+ auto activeObservers = getActiveHealthObservers();
+ stdx::unordered_set<FaultFacetType> activeObserversTypes;
+ std::for_each(activeObservers.begin(),
+ activeObservers.end(),
+ [&activeObserversTypes](HealthObserver* observer) {
+ activeObserversTypes.insert(observer->getType());
+ });
- const auto cb = [this](const mongo::executor::TaskExecutor::CallbackArgs& cbData) {
- if (!cbData.status.isOK()) {
- return;
+
+ auto lk = stdx::lock_guard(_stateMutex);
+ logMessageReceived(state(), status);
+
+ if (status.isActiveFault()) {
+ _healthyObservations.erase(status.getType());
+ } else {
+ _healthyObservations.insert(status.getType());
+ }
+
+ updateWithCheckStatus(HealthCheckStatus(status));
+ auto optionalActiveFault = getFaultFacetsContainer();
+ if (optionalActiveFault) {
+ optionalActiveFault->garbageCollectResolvedFacets();
+ }
+
+ if (optionalActiveFault && hasCriticalFacet(_fault.get()) && !_transientFaultDeadline) {
+ setTransientFaultDeadline(
+ FaultState::kStartupCheck, FaultState::kStartupCheck, boost::none);
+ }
+
+ // If the whole fault becomes resolved, garbage collect it
+ // with proper locking.
+ std::shared_ptr<FaultInternal> faultToDelete;
+ {
+ auto lk = stdx::lock_guard(_mutex);
+ if (_fault && _fault->getFacets().empty()) {
+ faultToDelete.swap(_fault);
}
+ }
- healthCheck();
- };
+ if (activeObserversTypes == _healthyObservations) {
+ return FaultState::kOk;
+ }
+ return boost::none;
+}
+
+boost::optional<FaultState> FaultManager::handleOk(const OptionalMessageType& message) {
+ invariant(message);
+
+ HealthCheckStatus status = message.get();
+ auto lk = stdx::lock_guard(_stateMutex);
+ logMessageReceived(state(), status);
+
+ if (_config->getHealthObserverIntensity(status.getType()) ==
+ HealthObserverIntensityEnum::kOff) {
+ return boost::none;
+ }
+
+ updateWithCheckStatus(HealthCheckStatus(status));
+
+ if (!HealthCheckStatus::isResolved(status.getSeverity())) {
+ return FaultState::kTransientFault;
+ }
+
+ return boost::none;
+}
+
+boost::optional<FaultState> FaultManager::handleTransientFault(const OptionalMessageType& message) {
+ if (!message) {
+ return FaultState::kActiveFault;
+ }
+
+ HealthCheckStatus status = message.get();
+ auto lk = stdx::lock_guard(_stateMutex);
+ logMessageReceived(state(), status);
+
+ updateWithCheckStatus(HealthCheckStatus(status));
- auto periodicThreadCbHandleStatus = immediately
- ? _taskExecutor->scheduleWork(cb)
- : _taskExecutor->scheduleWorkAt(_taskExecutor->now() + kPeriodicHealthCheckInterval, cb);
+ auto optionalActiveFault = getFaultFacetsContainer();
+ if (optionalActiveFault) {
+ optionalActiveFault->garbageCollectResolvedFacets();
+ }
+
+ // If the whole fault becomes resolved, garbage collect it
+ // with proper locking.
+ if (_fault && _fault->getFacets().empty()) {
+ _fault.reset();
+ return FaultState::kOk;
+ }
+ return boost::none;
+}
- uassert(5936101,
- "Failed to initialize periodic health check work.",
- periodicThreadCbHandleStatus.isOK());
- _periodicHealthCheckCbHandle = periodicThreadCbHandleStatus.getValue();
+boost::optional<FaultState> FaultManager::handleActiveFault(const OptionalMessageType& message) {
+ LOGV2_FATAL(5936509, "Fault manager received active fault");
+ return boost::none;
+}
+
+void FaultManager::logMessageReceived(FaultState state, const HealthCheckStatus& status) {
+ LOGV2_DEBUG(5936504,
+ 1,
+ "Fault manager recieved health check result",
+ "state"_attr = (str::stream() << state),
+ "result"_attr = status,
+ "passed"_attr = (!status.isActiveFault(status.getSeverity())));
+}
+
+void FaultManager::logCurrentState(FaultState, FaultState newState, const OptionalMessageType&) {
+ LOGV2(5936503, "Fault manager changed state ", "state"_attr = (str::stream() << newState));
+}
+
+void FaultManager::setTransientFaultDeadline(FaultState, FaultState, const OptionalMessageType&) {
+ _transientFaultDeadline = std::make_unique<TransientFaultDeadline>(
+ this, _taskExecutor, _config->getActiveFaultDuration());
+}
+
+void FaultManager::clearTransientFaultDeadline(FaultState, FaultState, const OptionalMessageType&) {
+ _transientFaultDeadline = nullptr;
+}
+
+void FaultManager::setInitialHealthCheckComplete(FaultState,
+ FaultState newState,
+ const OptionalMessageType&) {
+ LOGV2_DEBUG(5936502,
+ 0,
+ "The fault manager initial health checks have completed",
+ "state"_attr = (str::stream() << newState));
+ _initialHealthCheckCompletedPromise.emplaceValue();
+}
+
+void FaultManager::schedulePeriodicHealthCheckThread() {
+ if (!feature_flags::gFeatureFlagHealthMonitoring.isEnabled(
+ serverGlobalParams.featureCompatibility) ||
+ _config->periodicChecksDisabledForTests()) {
+ return;
+ }
+
+ auto observers = getHealthObservers();
+ for (auto observer : observers) {
+ LOGV2_DEBUG(
+ 59365, 1, "starting health observer", "observerType"_attr = observer->getType());
+
+ // TODO (SERVER-59368): The system should properly handle a health checker being turned
+ // on/off
+ auto token = _managerShuttingDownCancellationSource.token();
+ if (_config->isHealthObserverEnabled(observer->getType())) {
+ healthCheck(observer, token);
+ }
+ }
}
FaultManager::~FaultManager() {
@@ -166,8 +333,11 @@ FaultManager::~FaultManager() {
_taskExecutor->shutdown();
LOGV2(5936601, "Shutting down periodic health checks");
- if (_periodicHealthCheckCbHandle) {
- _taskExecutor->cancel(*_periodicHealthCheckCbHandle);
+ for (auto& pair : _healthCheckContexts) {
+ auto cbHandle = pair.second.resultStatus;
+ if (cbHandle) {
+ _taskExecutor->cancel(cbHandle.get());
+ }
}
// All health checks must use the _taskExecutor, joining it
@@ -181,32 +351,37 @@ FaultManager::~FaultManager() {
}
if (!_initialHealthCheckCompletedPromise.getFuture().isReady()) {
- _initialHealthCheckCompletedPromise.emplaceValue();
+ _initialHealthCheckCompletedPromise.setError(
+ {ErrorCodes::CommandFailed, "Fault manager failed initial health check"});
}
LOGV2_DEBUG(6136801, 1, "Done shutting down periodic health checks");
}
-void FaultManager::startPeriodicHealthChecks() {
+SharedSemiFuture<void> FaultManager::startPeriodicHealthChecks() {
if (!feature_flags::gFeatureFlagHealthMonitoring.isEnabled(
serverGlobalParams.featureCompatibility)) {
LOGV2_DEBUG(6187201, 1, "Health checks disabled by feature flag");
- return;
+ return Future<void>::makeReady();
}
_taskExecutor->startup();
- invariant(getFaultState() == FaultState::kStartupCheck);
- {
- auto lk = stdx::lock_guard(_mutex);
- invariant(!_periodicHealthCheckCbHandle);
+ invariant(state() == FaultState::kStartupCheck);
+
+ _init();
+
+ if (getActiveHealthObservers().size() == 0) {
+ LOGV2_DEBUG(5936511, 2, "No active health observers are configured.");
+ setState(FaultState::kOk, HealthCheckStatus(FaultFacetType::kSystem));
+ } else {
+ schedulePeriodicHealthCheckThread();
}
- schedulePeriodicHealthCheckThread(true /* immediately */);
- _initialHealthCheckCompletedPromise.getFuture().get();
+ return _initialHealthCheckCompletedPromise.getFuture();
}
FaultState FaultManager::getFaultState() const {
stdx::lock_guard<Latch> lk(_stateMutex);
- return _currentState;
+ return state();
}
Date_t FaultManager::getLastTransitionTime() const {
@@ -233,46 +408,72 @@ FaultFacetsContainerPtr FaultManager::getOrCreateFaultFacetsContainer() {
return std::static_pointer_cast<FaultFacetsContainer>(_fault);
}
-void FaultManager::healthCheck() {
- // One time init.
- _firstTimeInitIfNeeded();
+void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token) {
+ auto schedulerCb = [this, observer, token] {
+ auto periodicThreadCbHandleStatus = this->_taskExecutor->scheduleWorkAt(
+ _taskExecutor->now() + this->_config->kPeriodicHealthCheckInterval,
+ [this, observer, token](const mongo::executor::TaskExecutor::CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ return;
+ }
+ healthCheck(observer, token);
+ });
+
+ if (!periodicThreadCbHandleStatus.isOK()) {
+ if (ErrorCodes::isA<ErrorCategory::ShutdownError>(
+ periodicThreadCbHandleStatus.getStatus().code())) {
+ return;
+ }
- ON_BLOCK_EXIT([this] {
- if (!_config->periodicChecksDisabledForTests()) {
- schedulePeriodicHealthCheckThread();
+ uassert(5936101,
+ fmt::format("Failed to initialize periodic health check work. Reason: {}",
+ periodicThreadCbHandleStatus.getStatus().codeString()),
+ periodicThreadCbHandleStatus.isOK());
}
- });
-
- std::vector<HealthObserver*> observers = FaultManager::getHealthObservers();
- // Start checks outside of lock.
- auto token = _managerShuttingDownCancellationSource.token();
- for (auto observer : observers) {
- // TODO: SERVER-59368, fix bug where health observer is turned off when in transient fault
- // state
- if (_config->getHealthObserverIntensity(observer->getType()) !=
- HealthObserverIntensityEnum::kOff)
- observer->periodicCheck(*this, _taskExecutor, token);
- }
+ _healthCheckContexts.at(observer->getType()).resultStatus =
+ std::move(periodicThreadCbHandleStatus.getValue());
+ };
- // Garbage collect all resolved fault facets.
- auto optionalActiveFault = getFaultFacetsContainer();
- if (optionalActiveFault) {
- optionalActiveFault->garbageCollectResolvedFacets();
- }
+ auto acceptNotOKStatus = [this, observer](Status s) {
+ auto healthCheckStatus = HealthCheckStatus(observer->getType(), 1.0, s.reason());
+ LOGV2_ERROR(
+ 6007901, "Unexpected failure during health check", "status"_attr = healthCheckStatus);
+ this->accept(healthCheckStatus);
+ return healthCheckStatus;
+ };
- // If the whole fault becomes resolved, garbage collect it
- // with proper locking.
- std::shared_ptr<FaultInternal> faultToDelete;
- {
- auto lk = stdx::lock_guard(_mutex);
- if (_fault && _fault->getFacets().empty()) {
- faultToDelete.swap(_fault);
- }
+ // If health observer is disabled, then do nothing and schedule another run (health observer may
+ // become enabled).
+ // TODO (SERVER-59368): The system should properly handle a health checker being turned on/off
+ if (!_config->isHealthObserverEnabled(observer->getType())) {
+ schedulerCb();
+ return;
}
- // Actions above can result in a state change.
- checkForStateTransition();
+ _healthCheckContexts.insert({observer->getType(), HealthCheckContext(nullptr, boost::none)});
+ // Run asynchronous health check. When complete, check for state transition (and perform if
+ // necessary). Then schedule the next run.
+ auto healthCheckFuture = observer->periodicCheck(*this, _taskExecutor, token)
+ .thenRunOn(_taskExecutor)
+ .onCompletion([this, acceptNotOKStatus, schedulerCb](
+ StatusWith<HealthCheckStatus> status) {
+ ON_BLOCK_EXIT([this, schedulerCb]() {
+ if (!_config->periodicChecksDisabledForTests()) {
+ schedulerCb();
+ }
+ });
+
+ if (!status.isOK()) {
+ return acceptNotOKStatus(status.getStatus());
+ }
+
+ this->accept(status.getValue());
+ return status.getValue();
+ });
+ auto futurePtr =
+ std::make_unique<ExecutorFuture<HealthCheckStatus>>(std::move(healthCheckFuture));
+ _healthCheckContexts.at(observer->getType()).result = std::move(futurePtr);
}
void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) {
@@ -281,6 +482,7 @@ void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) {
if (container) {
container->updateWithSuppliedFacet(checkStatus.getType(), nullptr);
}
+
return;
}
@@ -296,15 +498,6 @@ void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) {
}
}
-void FaultManager::checkForStateTransition() {
- FaultConstPtr fault = currentFault();
- if (fault && !HealthCheckStatus::isResolved(fault->getSeverity())) {
- processFaultExistsEvent();
- } else if (!fault || HealthCheckStatus::isResolved(fault->getSeverity())) {
- processFaultIsResolvedEvent();
- }
-}
-
bool FaultManager::hasCriticalFacet(const FaultInternal* fault) const {
invariant(fault);
const auto& facets = fault->getFacets();
@@ -322,93 +515,8 @@ FaultManagerConfig FaultManager::getConfig() const {
return *_config;
}
-void FaultManager::processFaultExistsEvent() {
- FaultState currentState = getFaultState();
-
- switch (currentState) {
- case FaultState::kStartupCheck:
- case FaultState::kOk: {
- transitionToState(FaultState::kTransientFault);
- if (hasCriticalFacet(_fault.get())) {
- // This will transition the FaultManager to ActiveFault state after the timeout
- // occurs.
- _transientFaultDeadline = std::make_unique<TransientFaultDeadline>(
- this, _taskExecutor, _config->getActiveFaultDuration());
- }
- break;
- }
- case FaultState::kTransientFault:
- case FaultState::kActiveFault:
- // NOP.
- break;
- default:
- MONGO_UNREACHABLE;
- break;
- }
-}
-
-void FaultManager::processFaultIsResolvedEvent() {
- FaultState currentState = getFaultState();
-
- switch (currentState) {
- case FaultState::kOk:
- // NOP.
- break;
- case FaultState::kStartupCheck:
- transitionToState(FaultState::kOk);
- _initialHealthCheckCompletedPromise.emplaceValue();
- break;
- case FaultState::kTransientFault:
- // Clear the transient fault deadline timer.
- _transientFaultDeadline.reset();
- transitionToState(FaultState::kOk);
- break;
- case FaultState::kActiveFault:
- // Too late, this state cannot be resolved to Ok.
- break;
- default:
- MONGO_UNREACHABLE;
- break;
- }
-}
-
-void FaultManager::transitionToState(FaultState newState) {
- // Maps currentState to valid newStates
- static const stdx::unordered_map<FaultState, std::vector<FaultState>> validTransitions = {
- {FaultState::kStartupCheck, {FaultState::kOk, FaultState::kTransientFault}},
- {FaultState::kOk, {FaultState::kTransientFault}},
- {FaultState::kTransientFault, {FaultState::kOk, FaultState::kActiveFault}},
- {FaultState::kActiveFault, {}},
- };
-
- stdx::lock_guard<Latch> lk(_stateMutex);
- const auto& validStates = validTransitions.at(_currentState);
- auto validIt = std::find(validStates.begin(), validStates.end(), newState);
- uassert(ErrorCodes::BadValue,
- str::stream() << "Invalid fault manager transition from " << _currentState << " to "
- << newState,
- validIt != validStates.end());
-
- LOGV2(5936201,
- "Transitioned fault manager state",
- "newState"_attr = str::stream() << newState,
- "oldState"_attr = str::stream() << _currentState);
-
- _lastTransitionTime = _svcCtx->getFastClockSource()->now();
- _currentState = newState;
-}
-
-void FaultManager::_firstTimeInitIfNeeded() {
- if (_firstTimeInitExecuted.load()) {
- return;
- }
-
+void FaultManager::_init() {
auto lk = stdx::lock_guard(_mutex);
- // One more time under lock to avoid race.
- if (_firstTimeInitExecuted.load()) {
- return;
- }
- _firstTimeInitExecuted.store(true);
_observers = HealthObserverRegistration::instantiateAllObservers(_svcCtx);
@@ -425,7 +533,7 @@ void FaultManager::_firstTimeInitIfNeeded() {
auto lk2 = stdx::lock_guard(_stateMutex);
LOGV2(5956701,
"Instantiated health observers, periodic health checking starts",
- "managerState"_attr = _currentState,
+ "managerState"_attr = state(),
"observersCount"_attr = _observers.size());
}
@@ -440,6 +548,19 @@ std::vector<HealthObserver*> FaultManager::getHealthObservers() {
return result;
}
+std::vector<HealthObserver*> FaultManager::getActiveHealthObservers() {
+ auto allObservers = getHealthObservers();
+ std::vector<HealthObserver*> result;
+ result.reserve(allObservers.size());
+ for (auto observer : allObservers) {
+ if (_config->getHealthObserverIntensity(observer->getType()) !=
+ HealthObserverIntensityEnum::kOff) {
+ result.push_back(observer);
+ }
+ }
+ return result;
+}
+
void FaultManager::progressMonitorCheckForTests(std::function<void(std::string cause)> crashCb) {
_progressMonitor->progressMonitorCheck(crashCb);
}
diff --git a/src/mongo/db/process_health/fault_manager.h b/src/mongo/db/process_health/fault_manager.h
index 1d89a98721f..597cfb54112 100644
--- a/src/mongo/db/process_health/fault_manager.h
+++ b/src/mongo/db/process_health/fault_manager.h
@@ -37,6 +37,7 @@
#include "mongo/db/process_health/health_monitoring_server_parameters_gen.h"
#include "mongo/db/process_health/health_observer.h"
#include "mongo/db/process_health/progress_monitor.h"
+#include "mongo/db/process_health/state_machine.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/atomic_word.h"
@@ -55,7 +56,8 @@ namespace process_health {
*
* If an active fault state persists, FaultManager will terminate the server process.
*/
-class FaultManager : protected FaultFacetsContainerFactory {
+class FaultManager : protected StateMachine<HealthCheckStatus, FaultState>,
+ protected FaultFacetsContainerFactory {
FaultManager(const FaultManager&) = delete;
FaultManager& operator=(const FaultManager&) = delete;
@@ -71,12 +73,25 @@ public:
std::function<void(std::string cause)> crashCb);
virtual ~FaultManager();
+ void setupStateMachine();
+
+ boost::optional<FaultState> handleStartupCheck(const OptionalMessageType& message);
+ boost::optional<FaultState> handleOk(const OptionalMessageType& message);
+ boost::optional<FaultState> handleTransientFault(const OptionalMessageType& message);
+ boost::optional<FaultState> handleActiveFault(const OptionalMessageType& message);
+
+ void setInitialHealthCheckComplete(FaultState, FaultState, const OptionalMessageType&);
+ void logCurrentState(FaultState, FaultState, const OptionalMessageType&);
+ void logMessageReceived(FaultState state, const HealthCheckStatus& status);
+ void setTransientFaultDeadline(FaultState, FaultState, const OptionalMessageType&);
+ void clearTransientFaultDeadline(FaultState, FaultState, const OptionalMessageType&);
+
// Start periodic health checks, invoke it once during server startup.
// It is unsafe to start health checks immediately during ServiceContext creation
// because some ServiceContext fields might not be initialized yet.
// Health checks cannot be stopped but could be effectively disabled with health-checker
// specific flags.
- void startPeriodicHealthChecks();
+ SharedSemiFuture<void> startPeriodicHealthChecks();
static FaultManager* get(ServiceContext* svcCtx);
@@ -101,9 +116,12 @@ public:
Date_t getLastTransitionTime() const;
protected:
- // Starts the health check sequence and updates the internal state on completion.
- // This is invoked by the internal timer.
- virtual void healthCheck();
+ // Returns all health observers not configured as Off
+ std::vector<HealthObserver*> getActiveHealthObservers();
+
+ // Runs a particular health observer. Then attempts to transition states. Then schedules next
+ // run.
+ virtual void healthCheck(HealthObserver* observer, CancellationToken token);
// Protected interface FaultFacetsContainerFactory implementation.
@@ -114,19 +132,7 @@ protected:
void updateWithCheckStatus(HealthCheckStatus&& checkStatus) override;
- // State machine related.
-
- void checkForStateTransition(); // Invoked by periodic thread.
-
- // Methods that represent particular events that trigger state transition.
- void processFaultExistsEvent();
- void processFaultIsResolvedEvent();
-
- // Makes a valid state transition or returns an error.
- // State transition should be triggered by events above.
- void transitionToState(FaultState newState);
-
- void schedulePeriodicHealthCheckThread(bool immediately = false);
+ void schedulePeriodicHealthCheckThread();
// TODO: move this into fault class; refactor to remove FaultInternal
bool hasCriticalFacet(const FaultInternal* fault) const;
@@ -135,7 +141,7 @@ protected:
private:
// One time init.
- void _firstTimeInitIfNeeded();
+ void _init();
std::unique_ptr<FaultManagerConfig> _config;
ServiceContext* const _svcCtx;
@@ -147,20 +153,16 @@ private:
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(5), "FaultManager::_mutex");
std::shared_ptr<FaultInternal> _fault;
- // We lazily init all health observers.
- AtomicWord<bool> _firstTimeInitExecuted{false};
// This source is canceled before the _taskExecutor shutdown(). It
// can be used to check for the start of the shutdown sequence.
CancellationSource _managerShuttingDownCancellationSource;
// Manager owns all observer instances.
std::vector<std::unique_ptr<HealthObserver>> _observers;
- boost::optional<executor::TaskExecutor::CallbackHandle> _periodicHealthCheckCbHandle;
SharedPromise<void> _initialHealthCheckCompletedPromise;
// Protects the state below.
mutable Mutex _stateMutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "FaultManager::_stateMutex");
- FaultState _currentState = FaultState::kStartupCheck;
Date_t _lastTransitionTime;
@@ -184,6 +186,16 @@ private:
std::unique_ptr<TransientFaultDeadline> _transientFaultDeadline;
std::unique_ptr<ProgressMonitor> _progressMonitor;
+ stdx::unordered_set<FaultFacetType> _healthyObservations;
+ struct HealthCheckContext {
+ std::unique_ptr<ExecutorFuture<HealthCheckStatus>> result;
+ boost::optional<executor::TaskExecutor::CallbackHandle> resultStatus;
+ HealthCheckContext(std::unique_ptr<ExecutorFuture<HealthCheckStatus>> future,
+ boost::optional<executor::TaskExecutor::CallbackHandle> cbHandle)
+ : result(std::move(future)), resultStatus(cbHandle){};
+ };
+
+ stdx::unordered_map<FaultFacetType, HealthCheckContext> _healthCheckContexts;
};
} // namespace process_health
diff --git a/src/mongo/db/process_health/fault_manager_config.h b/src/mongo/db/process_health/fault_manager_config.h
index 251f7c1cf7f..830fb97ae46 100644
--- a/src/mongo/db/process_health/fault_manager_config.h
+++ b/src/mongo/db/process_health/fault_manager_config.h
@@ -42,10 +42,10 @@ namespace process_health {
* Current fault state of the server in a simple actionable form.
*/
enum class FaultState {
- kOk = 0,
-
// The manager conducts startup checks, new connections should be refused.
- kStartupCheck,
+ kStartupCheck = 0,
+
+ kOk,
// The manager detected a fault, however the fault is either not severe
// enough or is not observed for sufficiently long period of time.
@@ -63,7 +63,7 @@ std::ostream& operator<<(std::ostream& os, const FaultState& state);
/**
* Types of health observers available.
*/
-enum class FaultFacetType { kMock1 = 0, kMock2, kLdap, kDns };
+enum class FaultFacetType { kSystem, kMock1, kMock2, kLdap, kDns };
class FaultManagerConfig {
@@ -77,8 +77,13 @@ public:
return intensities->_data->getLdap();
case FaultFacetType::kDns:
return intensities->_data->getDns();
- // TODO: update this function with additional fault facets when they are added
+ // TODO: update this function with additional fault facets when they are added
+ case FaultFacetType::kSystem:
+ return HealthObserverIntensityEnum::kCritical;
case FaultFacetType::kMock1:
+ if (_facetToIntensityMapForTest.contains(type)) {
+ return _facetToIntensityMapForTest.at(type);
+ }
return HealthObserverIntensityEnum::kCritical;
case FaultFacetType::kMock2:
return HealthObserverIntensityEnum::kCritical;
@@ -91,6 +96,10 @@ public:
return getHealthObserverIntensity(type) != HealthObserverIntensityEnum::kOff;
}
+ void setIntensityForType(FaultFacetType type, HealthObserverIntensityEnum intensity) {
+ _facetToIntensityMapForTest.insert({type, intensity});
+ }
+
Milliseconds getActiveFaultDuration() const {
return _activeFaultDuration;
}
@@ -135,6 +144,8 @@ private:
bool _periodicChecksDisabledForTests = false;
Milliseconds _activeFaultDuration = kActiveFaultDuration;
+
+ stdx::unordered_map<FaultFacetType, HealthObserverIntensityEnum> _facetToIntensityMapForTest;
};
} // namespace process_health
diff --git a/src/mongo/db/process_health/fault_manager_test_suite.h b/src/mongo/db/process_health/fault_manager_test_suite.h
index d45bc97bd1d..40667e394ce 100644
--- a/src/mongo/db/process_health/fault_manager_test_suite.h
+++ b/src/mongo/db/process_health/fault_manager_test_suite.h
@@ -80,26 +80,18 @@ public:
"cause"_attr = cause);
}) {}
- void transitionStateTest(FaultState newState) {
- transitionToState(newState);
+ void healthCheckTest(HealthObserver* observer, CancellationToken token) {
+ healthCheck(observer, token);
}
- void healthCheckTest() {
- healthCheck();
+ void schedulePeriodicHealthCheckThreadTest() {
+ schedulePeriodicHealthCheckThread();
}
std::vector<HealthObserver*> getHealthObserversTest() {
return getHealthObservers();
}
- void processFaultExistsEventTest() {
- processFaultExistsEvent();
- }
-
- void processFaultIsResolvedEventTest() {
- return processFaultIsResolvedEvent();
- }
-
FaultFacetsContainerPtr getOrCreateFaultFacetsContainerTest() {
return getOrCreateFaultFacetsContainer();
}
@@ -117,6 +109,10 @@ public:
FaultManagerConfig getConfigTest() {
return getConfig();
}
+
+ FaultState acceptTest(const HealthCheckStatus& message) {
+ return accept(message);
+ }
};
/**
@@ -125,7 +121,6 @@ public:
class FaultManagerTest : public unittest::Test {
public:
void setUp() override {
- RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
HealthObserverRegistration::resetObserverFactoriesForTest();
createServiceContextIfNeeded();
@@ -159,6 +154,11 @@ public:
}
void constructTaskExecutor() {
+ if (_executor) {
+ _executor->shutdown();
+ _executor->join();
+ }
+
auto network = std::shared_ptr<executor::NetworkInterface>(
executor::makeNetworkInterface("FaultManagerTest").release());
ThreadPool::Options options;
@@ -166,7 +166,6 @@ public:
_executor =
std::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(network));
- _executor->startup();
}
void resetManager(std::unique_ptr<FaultManagerConfig> config = nullptr) {
@@ -230,16 +229,6 @@ public:
tickSource().advance(d);
}
- void assertInvalidStateTransition(FaultState newState) {
- try {
- manager().transitionStateTest(newState);
- ASSERT(false);
- } catch (const DBException& ex) {
- ASSERT(ex.code() == ErrorCodes::BadValue);
- // expected exception
- }
- }
-
static inline const Seconds kWaitTimeout{30};
static inline const Milliseconds kSleepTime{1};
void assertSoon(std::function<bool()> predicate, Milliseconds timeout = kWaitTimeout) {
@@ -260,7 +249,7 @@ public:
return true;
else {
advanceTime(kCheckTimeIncrement);
- manager().healthCheckTest();
+ manager().schedulePeriodicHealthCheckThreadTest();
return false;
}
};
diff --git a/src/mongo/db/process_health/fault_state_machine_test.cpp b/src/mongo/db/process_health/fault_state_machine_test.cpp
index 9a8b6e62cf6..c1d3c5efc59 100644
--- a/src/mongo/db/process_health/fault_state_machine_test.cpp
+++ b/src/mongo/db/process_health/fault_state_machine_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/process_health/fault_manager_test_suite.h"
#include "mongo/db/process_health/health_check_status.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -43,171 +44,292 @@ using test::FaultManagerTestImpl;
namespace {
-std::shared_ptr<executor::ThreadPoolTaskExecutor> constructTaskExecutor() {
- auto network = std::make_unique<executor::NetworkInterfaceMock>();
- auto executor = makeSharedThreadPoolTestExecutor(std::move(network));
- executor->startup();
- return executor;
-}
+TEST_F(FaultManagerTest, TransitionsFromStartupCheckToOkWhenAllObserversAreSuccessful) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0; });
+ registerMockHealthObserver(FaultFacetType::kMock2, [] { return 0; });
-// State machine tests.
-TEST_F(FaultManagerTest, StateTransitionsFromOk) {
- auto serviceCtx = ServiceContext::make();
- std::vector<std::pair<FaultState, bool>> transitionValidPairs{
- {FaultState::kOk, false},
- {FaultState::kStartupCheck, false},
- {FaultState::kTransientFault, true},
- {FaultState::kActiveFault, false}};
-
- for (auto& pair : transitionValidPairs) {
- manager().transitionStateTest(FaultState::kOk);
-
- if (pair.second) {
- manager().transitionStateTest(pair.first);
- } else {
- assertInvalidStateTransition(pair.first);
- }
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+ std::vector<FaultFacetType> faultFacetTypes{FaultFacetType::kMock1, FaultFacetType::kMock2};
- resetManager();
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+
+ // send successful health check response from each
+ for (auto faultFacetType : faultFacetTypes) {
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+ ASSERT(!hasFault());
+ if (faultFacetType != faultFacetTypes.back()) {
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ }
}
+
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+ ASSERT(initialHealthCheckFuture.isReady());
}
-TEST_F(FaultManagerTest, StateTransitionsFromStartupCheck) {
- auto serviceCtx = ServiceContext::make();
- std::vector<std::pair<FaultState, bool>> transitionValidPairs{
- {FaultState::kOk, true},
- {FaultState::kStartupCheck, false},
- {FaultState::kTransientFault, true},
- {FaultState::kActiveFault, false}};
-
- for (auto& pair : transitionValidPairs) {
- if (pair.second) {
- manager().transitionStateTest(pair.first);
- } else {
- assertInvalidStateTransition(pair.first);
- }
+TEST_F(FaultManagerTest, TransitionsFromStartupCheckToOkAfterFailureThenSuccess) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ const auto faultFacetType = FaultFacetType::kMock1;
+ registerMockHealthObserver(faultFacetType, [] { return 0; });
- resetManager();
- }
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ advanceTime(Milliseconds(100));
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ ASSERT(hasFault());
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+ ASSERT(!hasFault());
+
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+ ASSERT(initialHealthCheckFuture.isReady());
}
-TEST_F(FaultManagerTest, StateTransitionsFromTransientFault) {
- auto serviceCtx = ServiceContext::make();
- std::vector<std::pair<FaultState, bool>> transitionValidPairs{
- {FaultState::kOk, true},
- {FaultState::kStartupCheck, false},
- {FaultState::kTransientFault, false},
- {FaultState::kActiveFault, true}};
-
- for (auto& pair : transitionValidPairs) {
- manager().transitionStateTest(FaultState::kTransientFault);
-
- if (pair.second) {
- manager().transitionStateTest(pair.first);
- } else {
- assertInvalidStateTransition(pair.first);
- }
+TEST_F(FaultManagerTest, TransitionsFromOkToTransientFaultAfterSuccessThenFailure) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
- resetManager();
- }
+ const auto faultFacetType = FaultFacetType::kMock1;
+ registerMockHealthObserver(faultFacetType, [] { return 0; });
+
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+ ASSERT(!hasFault());
+
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+ ASSERT(initialHealthCheckFuture.isReady());
+
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ advanceTime(Milliseconds(100));
+ assertSoon([this]() {
+ return hasFault() && manager().getFaultState() == FaultState::kTransientFault;
+ });
}
-TEST_F(FaultManagerTest, StateTransitionsFromActiveFault) {
- auto serviceCtx = ServiceContext::make();
- std::vector<std::pair<FaultState, bool>> transitionValidPairs{
- {FaultState::kOk, false},
- {FaultState::kStartupCheck, false},
- {FaultState::kTransientFault, false},
- {FaultState::kActiveFault, false}};
-
- for (auto& pair : transitionValidPairs) {
- manager().transitionStateTest(FaultState::kTransientFault);
- manager().transitionStateTest(FaultState::kActiveFault);
-
- if (pair.second) {
- manager().transitionStateTest(pair.first);
- } else {
- assertInvalidStateTransition(pair.first);
- }
+TEST_F(FaultManagerTest, StaysInOkOnSuccess) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ const auto faultFacetType = FaultFacetType::kMock1;
+ registerMockHealthObserver(faultFacetType, [] { return 0; });
- resetManager();
- }
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+ ASSERT(!hasFault());
+
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+ ASSERT(initialHealthCheckFuture.isReady());
+
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+
+ ASSERT(!hasFault());
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
}
+TEST_F(FaultManagerTest, StaysInTransientFault) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ const auto faultFacetType = FaultFacetType::kMock1;
+ registerMockHealthObserver(faultFacetType, [] { return 0; });
-// State transitions triggered by events.
-TEST_F(FaultManagerTest, EventsFromOk) {
- std::vector<std::pair<std::function<void()>, FaultState>> validTransitions{
- {[this] { manager().processFaultIsResolvedEventTest(); }, FaultState::kOk},
- {[this] {
- manager().getOrCreateFaultFacetsContainerTest();
- manager().processFaultExistsEventTest();
- },
- FaultState::kTransientFault}};
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
- for (auto& pair : validTransitions) {
- resetManager();
- manager().transitionStateTest(FaultState::kOk);
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+ ASSERT(!hasFault());
- pair.first(); // Send event.
- ASSERT_EQ(pair.second, manager().getFaultState());
- }
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+ ASSERT(initialHealthCheckFuture.isReady());
+
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ advanceTime(Milliseconds(100));
+ assertSoon([this]() {
+ return hasFault() && manager().getFaultState() == FaultState::kTransientFault;
+ });
+
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ advanceTime(Milliseconds(100));
+ assertSoon([this]() {
+ return hasFault() && manager().getFaultState() == FaultState::kTransientFault;
+ });
}
-TEST_F(FaultManagerTest, EventsFromStartupCheck) {
- std::vector<std::pair<std::function<void()>, FaultState>> validTransitions{
- {[this] { manager().processFaultIsResolvedEventTest(); }, FaultState::kOk},
- {[this] {
- manager().getOrCreateFaultFacetsContainerTest();
- manager().processFaultExistsEventTest();
- },
- FaultState::kTransientFault}};
-
- for (auto& pair : validTransitions) {
- resetManager();
- ASSERT_EQ(FaultState::kStartupCheck, manager().getFaultState());
-
- pair.first(); // Send event.
- ASSERT_EQ(pair.second, manager().getFaultState());
- }
+TEST_F(FaultManagerTest, TransitionsFromTransientFaultToOkOnFailureThenSuccess) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ const auto faultFacetType = FaultFacetType::kMock1;
+ registerMockHealthObserver(faultFacetType, [] { return 0; });
+
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+ ASSERT(!hasFault());
+
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+ ASSERT(initialHealthCheckFuture.isReady());
+
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ advanceTime(Milliseconds(100));
+ assertSoon([this]() {
+ return hasFault() && manager().getFaultState() == FaultState::kTransientFault;
+ });
+
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+
+ ASSERT(!hasFault());
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
}
-TEST_F(FaultManagerTest, EventsFromTransientFault) {
- std::vector<std::pair<std::function<void()>, FaultState>> validTransitions{
- {[this] { manager().processFaultIsResolvedEventTest(); }, FaultState::kOk},
- {[this] {
- manager().getOrCreateFaultFacetsContainerTest();
- manager().processFaultExistsEventTest();
- },
- FaultState::kTransientFault}};
-
- for (auto& pair : validTransitions) {
- resetManager();
- manager().transitionStateTest(FaultState::kTransientFault);
-
- pair.first(); // Send event.
- ASSERT_EQ(pair.second, manager().getFaultState());
- }
+TEST_F(FaultManagerTest, OneFacetIsResolved) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ registerMockHealthObserver(FaultFacetType::kMock1, [] { return 1.1; });
+ registerMockHealthObserver(FaultFacetType::kMock2, [] { return 1.1; });
+
+
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ manager().acceptTest(HealthCheckStatus(FaultFacetType::kMock1, 1.1, "failing health check 1"));
+ manager().acceptTest(HealthCheckStatus(FaultFacetType::kMock2, 1.1, "failing health check 2"));
+ assertSoon([this] {
+ return manager().getOrCreateFaultFacetsContainerTest()->getFacets().size() == 2;
+ });
+ manager().acceptTest(HealthCheckStatus(FaultFacetType::kMock1));
+ assertSoon([this] {
+ return manager().getOrCreateFaultFacetsContainerTest()->getFacets().front()->getType() ==
+ FaultFacetType::kMock2;
+ });
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
}
-TEST_F(FaultManagerTest, EventsFromActiveFault) {
- // No event can transition out of active fault.
- std::vector<std::pair<std::function<void()>, FaultState>> validTransitions{
- {[this] { manager().processFaultIsResolvedEventTest(); }, FaultState::kActiveFault},
- {[this] {
- manager().getOrCreateFaultFacetsContainerTest();
- manager().processFaultExistsEventTest();
- },
- FaultState::kActiveFault}};
-
- for (auto& pair : validTransitions) {
- resetManager();
- manager().transitionStateTest(FaultState::kTransientFault);
- manager().transitionStateTest(FaultState::kActiveFault);
-
- pair.first(); // Send event.
- ASSERT_EQ(pair.second, manager().getFaultState());
- }
+DEATH_TEST_F(FaultManagerTest, TransitionsToActiveFaultAfterTimeoutFromTransientFault, "Fatal") {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ auto faultFacetType = FaultFacetType::kMock1;
+ auto config = test::getConfigWithDisabledPeriodicChecks();
+ auto activeFaultDuration = Milliseconds(100);
+ config->setActiveFaultDurationForTests(activeFaultDuration);
+ resetManager(std::move(config));
+
+ registerMockHealthObserver(faultFacetType, [] { return 1.1; });
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ ASSERT(manager().getFaultState() == FaultState::kOk);
+
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ ASSERT(manager().getFaultState() == FaultState::kTransientFault);
+
+ advanceTime(activeFaultDuration);
+ waitForTransitionIntoState(FaultState::kActiveFault);
+}
+
+DEATH_TEST_F(FaultManagerTest, TransitionsToActiveFaultAfterTimeoutFromStartupCheck, "Fatal") {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ auto faultFacetType = FaultFacetType::kMock1;
+ auto config = test::getConfigWithDisabledPeriodicChecks();
+ auto activeFaultDuration = Milliseconds(100);
+ config->setActiveFaultDurationForTests(activeFaultDuration);
+ resetManager(std::move(config));
+
+ registerMockHealthObserver(faultFacetType, [] { return 1.1; });
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+
+ advanceTime(activeFaultDuration);
+ waitForTransitionIntoState(FaultState::kActiveFault);
+}
+
+TEST_F(FaultManagerTest, DoesNotTransitionToActiveFaultIfResolved) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ auto faultFacetType = FaultFacetType::kMock1;
+ auto config = test::getConfigWithDisabledPeriodicChecks();
+ auto activeFaultDuration = Milliseconds(100);
+ config->setActiveFaultDurationForTests(activeFaultDuration);
+ resetManager(std::move(config));
+
+ registerMockHealthObserver(faultFacetType, [] { return 1.1; });
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ ASSERT(manager().getFaultState() == FaultState::kOk);
+
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ ASSERT(manager().getFaultState() == FaultState::kTransientFault);
+
+ advanceTime(activeFaultDuration / 2);
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+
+ advanceTime(activeFaultDuration);
+
+ ASSERT(manager().getFaultState() == FaultState::kOk);
+}
+
+TEST_F(FaultManagerTest, HealthCheckWithOffFacetCreatesNoFault) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ const auto faultFacetType = FaultFacetType::kMock1;
+ auto config = std::make_unique<FaultManagerConfig>();
+ config->setIntensityForType(faultFacetType, HealthObserverIntensityEnum::kOff);
+ resetManager(std::move(config));
+
+ registerMockHealthObserver(faultFacetType, [] { return 0; });
+
+ // Create another observer so that we don't skip the startup check state.
+ registerMockHealthObserver(FaultFacetType::kSystem, [] { return 0; });
+
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ advanceTime(Milliseconds(100));
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+ ASSERT(initialHealthCheckFuture.isReady());
+}
+
+TEST_F(FaultManagerTest, AllOffFacetsSkipStartupCheck) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ const auto faultFacetType = FaultFacetType::kMock1;
+ auto config = std::make_unique<FaultManagerConfig>();
+ config->setIntensityForType(faultFacetType, HealthObserverIntensityEnum::kOff);
+ resetManager(std::move(config));
+
+ registerMockHealthObserver(faultFacetType, [] { return 0; });
+
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+}
+
+TEST_F(FaultManagerTest, HealthCheckWithOffFacetCreatesNoFaultInOk) {
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ const auto faultFacetType = FaultFacetType::kMock1;
+ auto config = std::make_unique<FaultManagerConfig>();
+ config->disablePeriodicChecksForTests();
+ auto configPtr = config.get();
+ resetManager(std::move(config));
+
+ registerMockHealthObserver(faultFacetType, [] { return 0; });
+
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+
+ ASSERT(manager().getFaultState() == FaultState::kStartupCheck);
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+ advanceTime(Milliseconds(100));
+ ASSERT(!hasFault());
+
+ assertSoon([this]() { return manager().getFaultState() == FaultState::kOk; });
+ ASSERT(initialHealthCheckFuture.isReady());
+
+ configPtr->setIntensityForType(faultFacetType, HealthObserverIntensityEnum::kOff);
+ manager().acceptTest(HealthCheckStatus(faultFacetType, 1.0, "error"));
+ ASSERT(manager().getFaultState() == FaultState::kOk);
}
} // namespace
diff --git a/src/mongo/db/process_health/health_check_status.h b/src/mongo/db/process_health/health_check_status.h
index 826b82b16f0..da5284db13c 100644
--- a/src/mongo/db/process_health/health_check_status.h
+++ b/src/mongo/db/process_health/health_check_status.h
@@ -61,7 +61,9 @@ public:
: _type(type), _severity(0), _description("resolved"_sd) {}
HealthCheckStatus(const HealthCheckStatus&) = default;
- HealthCheckStatus& operator=(const HealthCheckStatus&) = delete;
+ HealthCheckStatus& operator=(const HealthCheckStatus&) = default;
+ HealthCheckStatus(HealthCheckStatus&&) = default;
+ HealthCheckStatus& operator=(HealthCheckStatus&&) = default;
/**
* @return FaultFacetType of this status.
@@ -101,16 +103,20 @@ public:
static bool isActiveFault(double severity) {
// Range is inclusive.
- return severity >= kActiveFaultSeverity - kActiveFaultSeverityEpsilon;
+ return severity >= 1.0;
+ }
+
+ bool isActiveFault() const {
+ return isActiveFault(getSeverity());
}
private:
friend std::ostream& operator<<(std::ostream&, const HealthCheckStatus&);
friend StringBuilder& operator<<(StringBuilder& s, const HealthCheckStatus& hcs);
- const FaultFacetType _type;
- const double _severity;
- const std::string _description;
+ FaultFacetType _type;
+ double _severity;
+ std::string _description;
};
inline StringBuilder& operator<<(StringBuilder& s, const FaultFacetType& type) {
diff --git a/src/mongo/db/process_health/health_observer.h b/src/mongo/db/process_health/health_observer.h
index 319876e4017..9912a624272 100644
--- a/src/mongo/db/process_health/health_observer.h
+++ b/src/mongo/db/process_health/health_observer.h
@@ -83,9 +83,10 @@ public:
*
* @param factory Interface to get or create the factory of facets container.
*/
- virtual void periodicCheck(FaultFacetsContainerFactory& factory,
- std::shared_ptr<executor::TaskExecutor> taskExecutor,
- CancellationToken token) = 0;
+ virtual SharedSemiFuture<HealthCheckStatus> periodicCheck(
+ FaultFacetsContainerFactory& factory,
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ CancellationToken token) = 0;
virtual HealthObserverLivenessStats getStats() const = 0;
};
diff --git a/src/mongo/db/process_health/health_observer_base.cpp b/src/mongo/db/process_health/health_observer_base.cpp
index dbe49da4ed4..a4faea9fc73 100644
--- a/src/mongo/db/process_health/health_observer_base.cpp
+++ b/src/mongo/db/process_health/health_observer_base.cpp
@@ -39,58 +39,46 @@ namespace process_health {
HealthObserverBase::HealthObserverBase(ServiceContext* svcCtx) : _svcCtx(svcCtx) {}
-void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory,
- std::shared_ptr<executor::TaskExecutor> taskExecutor,
- CancellationToken token) {
+SharedSemiFuture<HealthCheckStatus> HealthObserverBase::periodicCheck(
+ FaultFacetsContainerFactory& factory,
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ CancellationToken token) {
// If we have reached here, the intensity of this health observer must not be off
{
auto lk = stdx::lock_guard(_mutex);
if (_currentlyRunningHealthCheck) {
- return;
+ return _periodicCheckPromise->getFuture();
}
+ LOGV2_DEBUG(6007902, 2, "Start periodic health check", "observerType"_attr = getType());
const auto now = _svcCtx->getPreciseClockSource()->now();
- if (now - _lastTimeTheCheckWasRun < minimalCheckInterval()) {
- LOGV2_DEBUG(6136802,
- 3,
- "Safety interval prevented new health check",
- "observerType"_attr = getType());
- return;
- }
_lastTimeTheCheckWasRun = now;
-
- LOGV2_DEBUG(6007902, 2, "Start periodic health check", "observerType"_attr = getType());
-
_currentlyRunningHealthCheck = true;
+ _periodicCheckPromise = std::make_unique<SharedPromise<HealthCheckStatus>>();
}
- // Do the health check.
- taskExecutor->schedule([this, &factory, token, taskExecutor](Status status) {
+ _periodicCheckPromise->setFrom(
periodicCheckImpl({token, taskExecutor})
- .then([this, &factory](HealthCheckStatus&& checkStatus) mutable {
- const auto severity = checkStatus.getSeverity();
- factory.updateWithCheckStatus(std::move(checkStatus));
-
- auto lk = stdx::lock_guard(_mutex);
- ++_completedChecksCount;
- if (!HealthCheckStatus::isResolved(severity)) {
- ++_completedChecksWithFaultCount;
- }
- })
- .onCompletion([this](Status status) {
+ .onCompletion([this](StatusWith<HealthCheckStatus> status) {
if (!status.isOK()) {
- // Health checkers should not throw, they should return FaultFacetPtr.
- LOGV2_ERROR(
- 6007901, "Unexpected failure during health check", "status"_attr = status);
+ return status;
}
+
+ auto healthStatus = status.getValue();
+
const auto now = _svcCtx->getPreciseClockSource()->now();
auto lk = stdx::lock_guard(_mutex);
+ ++_completedChecksCount;
invariant(_currentlyRunningHealthCheck);
+ if (!HealthCheckStatus::isResolved(healthStatus.getSeverity())) {
+ ++_completedChecksWithFaultCount;
+ }
_currentlyRunningHealthCheck = false;
_lastTimeCheckCompleted = now;
- })
- .getAsync([this](Status status) {});
- });
+ return status;
+ }));
+
+ return _periodicCheckPromise->getFuture();
}
HealthCheckStatus HealthObserverBase::makeHealthyStatus() const {
diff --git a/src/mongo/db/process_health/health_observer_base.h b/src/mongo/db/process_health/health_observer_base.h
index 99425808fab..f415a8e034a 100644
--- a/src/mongo/db/process_health/health_observer_base.h
+++ b/src/mongo/db/process_health/health_observer_base.h
@@ -66,9 +66,13 @@ public:
// Implements the common logic for periodic checks.
// Every observer should implement periodicCheckImpl() for specific tests.
- void periodicCheck(FaultFacetsContainerFactory& factory,
- std::shared_ptr<executor::TaskExecutor> taskExecutor,
- CancellationToken token) override;
+ SharedSemiFuture<HealthCheckStatus> periodicCheck(
+ FaultFacetsContainerFactory& factory,
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ CancellationToken token) override;
+
+ HealthCheckStatus makeHealthyStatus() const;
+ HealthCheckStatus makeSimpleFailedStatus(double severity, std::vector<Status>&& failures) const;
HealthObserverLivenessStats getStats() const override;
@@ -88,12 +92,6 @@ protected:
virtual Future<HealthCheckStatus> periodicCheckImpl(
PeriodicHealthCheckContext&& periodicCheckContext) = 0;
- // Helper method to create a status without errors.
- HealthCheckStatus makeHealthyStatus() const;
-
- // Make a generic error status.
- HealthCheckStatus makeSimpleFailedStatus(double severity, std::vector<Status>&& failures) const;
-
HealthObserverLivenessStats getStatsLocked(WithLock) const;
ServiceContext* const _svcCtx;
@@ -103,6 +101,7 @@ protected:
// Indicates if there any check running to prevent running checks concurrently.
bool _currentlyRunningHealthCheck = false;
+ std::unique_ptr<SharedPromise<HealthCheckStatus>> _periodicCheckPromise;
// Enforces the safety interval.
Date_t _lastTimeTheCheckWasRun;
Date_t _lastTimeCheckCompleted;
diff --git a/src/mongo/db/process_health/health_observer_mock.h b/src/mongo/db/process_health/health_observer_mock.h
index 1270af28cc3..230145bedf2 100644
--- a/src/mongo/db/process_health/health_observer_mock.h
+++ b/src/mongo/db/process_health/health_observer_mock.h
@@ -58,19 +58,28 @@ protected:
Future<HealthCheckStatus> periodicCheckImpl(
PeriodicHealthCheckContext&& periodicCheckContext) override {
- // Detects mocked severity and handles it.
- const double severity = _getSeverityCallback();
auto completionPf = makePromiseFuture<HealthCheckStatus>();
- if (HealthCheckStatus::isResolved(severity)) {
- LOGV2(5936603, "Mock health observer returns a resolved severity");
- completionPf.promise.emplaceValue(HealthCheckStatus(getType()));
- } else {
- LOGV2(5936604,
- "Mock health observer returns a fault severity",
- "severity"_attr = severity);
- completionPf.promise.emplaceValue(HealthCheckStatus(getType(), severity, "failed"));
- }
+
+ auto cbHandle = periodicCheckContext.taskExecutor->scheduleWork(
+ [this, promise = std::move(completionPf.promise)](
+ const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
+ try {
+ auto severity = _getSeverityCallback();
+ if (HealthCheckStatus::isResolved(severity)) {
+ LOGV2(5936603, "Mock health observer returns a resolved severity");
+ promise.emplaceValue(HealthCheckStatus(getType()));
+ } else {
+ LOGV2(5936604,
+ "Mock health observer returns a fault severity",
+ "severity"_attr = severity);
+ promise.emplaceValue(HealthCheckStatus(getType(), severity, "failed"));
+ }
+ } catch (const DBException& e) {
+ promise.setError(e.toStatus());
+ }
+ });
+
return std::move(completionPf.future);
}
diff --git a/src/mongo/db/process_health/health_observer_registration.cpp b/src/mongo/db/process_health/health_observer_registration.cpp
index a1da3e30f94..12b0ff028a7 100644
--- a/src/mongo/db/process_health/health_observer_registration.cpp
+++ b/src/mongo/db/process_health/health_observer_registration.cpp
@@ -56,7 +56,8 @@ void HealthObserverRegistration::registerObserverFactory(
std::vector<std::unique_ptr<HealthObserver>> HealthObserverRegistration::instantiateAllObservers(
ServiceContext* svcCtx) {
std::vector<std::unique_ptr<HealthObserver>> result;
- for (auto& cb : *getObserverFactories()) {
+ auto factories = *getObserverFactories();
+ for (auto& cb : factories) {
result.push_back(cb(svcCtx));
}
return result;
diff --git a/src/mongo/db/process_health/health_observer_test.cpp b/src/mongo/db/process_health/health_observer_test.cpp
index 820402994c6..55a9fa2c85e 100644
--- a/src/mongo/db/process_health/health_observer_test.cpp
+++ b/src/mongo/db/process_health/health_observer_test.cpp
@@ -55,164 +55,12 @@ TEST_F(FaultManagerTest, Registration) {
ASSERT_EQ(FaultFacetType::kMock1, allObservers[0]->getType());
}
-TEST_F(FaultManagerTest, HealthCheckCreatesObservers) {
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; });
- ASSERT_EQ(0, manager().getHealthObserversTest().size());
-
- advanceTime(Milliseconds(100));
- manager().healthCheckTest();
- ASSERT_EQ(1, manager().getHealthObserversTest().size());
-}
-
-TEST_F(FaultManagerTest, HealthCheckCreatesFacetOnHealthCheckFoundFault) {
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; });
-
- advanceTime(Milliseconds(100));
- manager().healthCheckTest();
- waitForTransitionIntoState(FaultState::kTransientFault);
- auto currentFault = manager().currentFault();
- ASSERT_TRUE(currentFault); // Is created.
-}
-
-TEST_F(FaultManagerTest, StateTransitionOnHealthCheckFoundFault) {
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; });
- ASSERT_EQ(FaultState::kStartupCheck, manager().getFaultState());
-
- waitForTransitionIntoState(FaultState::kTransientFault);
- ASSERT_EQ(FaultState::kTransientFault, manager().getFaultState());
-}
-
-TEST_F(FaultManagerTest, HealthCheckCreatesCorrectFacetOnHealthCheckFoundFault) {
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; });
- registerMockHealthObserver(FaultFacetType::kMock2, [] { return 0.0; });
- waitForTransitionIntoState(FaultState::kTransientFault);
-
- FaultInternal& internalFault = manager().getFault();
- ASSERT_TRUE(internalFault.getFaultFacet(FaultFacetType::kMock1));
- ASSERT_FALSE(internalFault.getFaultFacet(FaultFacetType::kMock2));
-}
-
-TEST_F(FaultManagerTest, SeverityIsMaxFromAllFacetsSeverity) {
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.8; });
- registerMockHealthObserver(FaultFacetType::kMock2, [] { return 0.5; });
- advanceTime(Milliseconds(100));
- manager().healthCheckTest();
- do {
- waitForFaultBeingCreated();
- advanceTime(Milliseconds(100));
- manager().healthCheckTest();
- } while (manager().getFault().getFacets().size() != 2); // Race between two facets.
- auto currentFault = manager().currentFault();
-
- ASSERT_APPROX_EQUAL(0.8, currentFault->getSeverity(), 0.001);
-}
-
-TEST_F(FaultManagerTest, HealthCheckCreatesFacetThenIsGarbageCollectedAndStateTransitioned) {
- AtomicDouble severity{0.1};
- registerMockHealthObserver(FaultFacetType::kMock1, [&severity] { return severity.load(); });
- advanceTime(Milliseconds(100));
- manager().healthCheckTest();
- waitForFaultBeingCreated();
- ASSERT_TRUE(manager().currentFault()); // Is created.
-
- // Resolve and it should be garbage collected.
- severity.store(0.0);
-
- waitForTransitionIntoState(FaultState::kOk);
-
- assertSoonWithHealthCheck([this]() { return !hasFault(); });
-
- // State is transitioned.
- ASSERT_EQ(FaultState::kOk, manager().getFaultState());
- resetManager(); // Before atomic fields above go out of scope.
-}
-
-TEST_F(FaultManagerTest, HealthCheckCreates2FacetsThenIsGarbageCollected) {
- AtomicDouble severity1{0.1};
- AtomicDouble severity2{0.1};
- registerMockHealthObserver(FaultFacetType::kMock1, [&severity1] { return severity1.load(); });
- registerMockHealthObserver(FaultFacetType::kMock2, [&severity2] { return severity2.load(); });
- manager().healthCheckTest();
- waitForFaultBeingCreated();
-
- while (manager().getFault().getFacets().size() != 2) {
- sleepFor(Milliseconds(1));
- advanceTime(Milliseconds(100));
- manager().healthCheckTest();
- }
-
- // Resolve one facet and it should be garbage collected.
- severity1.store(0.0);
- advanceTime(Milliseconds(100));
- manager().healthCheckTest();
-
- FaultInternal& internalFault = manager().getFault();
- while (internalFault.getFaultFacet(FaultFacetType::kMock1)) {
- sleepFor(Milliseconds(1));
- // Check is async, needs more turns for garbage collection to work.
- advanceTime(Milliseconds(100));
- manager().healthCheckTest();
- }
- ASSERT_FALSE(internalFault.getFaultFacet(FaultFacetType::kMock1));
- ASSERT_TRUE(internalFault.getFaultFacet(FaultFacetType::kMock2));
- resetManager(); // Before atomic fields above go out of scope.
-}
-
-TEST_F(FaultManagerTest, HealthCheckWithOffFacetCreatesNoFault) {
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.0; });
- manager().healthCheckTest();
- waitForFaultBeingResolved();
- auto currentFault = manager().currentFault();
- ASSERT_TRUE(!currentFault); // Is not created.
-}
-
-TEST_F(FaultManagerTest, DoesNotRestartCheckBeforeIntervalExpired) {
- AtomicDouble severity{0.0};
- registerMockHealthObserver(FaultFacetType::kMock1, [&severity] { return severity.load(); });
- manager().healthCheckTest();
- waitForFaultBeingResolved();
- auto currentFault = manager().currentFault();
- ASSERT_TRUE(!currentFault); // Is not created.
-
- severity.store(0.1);
- manager().healthCheckTest();
- currentFault = manager().currentFault();
- // The check did not run because the delay interval did not expire.
- ASSERT_TRUE(!currentFault);
-
- advanceTime(Milliseconds(100));
- assertSoonWithHealthCheck([this]() { return hasFault(); });
- currentFault = manager().currentFault();
- ASSERT_TRUE(currentFault); // The fault was created.
- resetManager(); // Before atomic fields above go out of scope.
-}
-
-TEST_F(FaultManagerTest, HealthCheckWithCriticalFacetCreatesFault) {
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 1.1; });
- manager().healthCheckTest();
- waitForFaultBeingCreated();
- auto currentFault = manager().currentFault();
- ASSERT_TRUE(currentFault);
-}
-
-TEST_F(FaultManagerTest, InitialHealthCheckDoesNotBlockIfTransitionToOkSucceeds) {
- resetManager();
- RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
-
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.0; });
- manager().healthCheckTest();
-
- auto currentFault = manager().currentFault();
- ASSERT_TRUE(!currentFault); // Is not created.
- ASSERT_TRUE(manager().getFaultState() == FaultState::kOk);
-}
-
TEST_F(FaultManagerTest, InitialHealthCheckDoesNotRunIfFeatureFlagNotEnabled) {
resetManager();
RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", false};
registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.0; });
- manager().startPeriodicHealthChecks();
+ static_cast<void>(manager().schedulePeriodicHealthCheckThreadTest());
auto currentFault = manager().currentFault();
ASSERT_TRUE(!currentFault); // Is not created.
@@ -220,18 +68,40 @@ TEST_F(FaultManagerTest, InitialHealthCheckDoesNotRunIfFeatureFlagNotEnabled) {
}
TEST_F(FaultManagerTest, Stats) {
- advanceTime(Milliseconds(100));
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; });
- waitForTransitionIntoState(FaultState::kTransientFault);
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ auto faultFacetType = FaultFacetType::kMock1;
+ registerMockHealthObserver(faultFacetType, [] { return 0.1; });
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
auto observer = manager().getHealthObserversTest()[0];
+ manager().healthCheckTest(observer, CancellationToken::uncancelable());
+
+ assertSoon([this] { return static_cast<bool>(manager().currentFault()); });
assertSoon([&observer] { return !observer->getStats().currentlyRunningHealthCheck; });
+
auto stats = observer->getStats();
ASSERT_TRUE(manager().getConfig().isHealthObserverEnabled(observer->getType()));
- ASSERT_TRUE(stats.lastTimeCheckStarted >= clockSource().now());
- ASSERT_TRUE(stats.lastTimeCheckCompleted >= stats.lastTimeCheckStarted);
- ASSERT_TRUE(stats.completedChecksCount >= 1);
- ASSERT_TRUE(stats.completedChecksWithFaultCount >= 1);
+ ASSERT_EQ(stats.lastTimeCheckStarted, clockSource().now());
+ ASSERT_EQ(stats.lastTimeCheckCompleted, stats.lastTimeCheckStarted);
+ ASSERT_GTE(stats.completedChecksCount, 1);
+ ASSERT_GTE(stats.completedChecksWithFaultCount, 1);
+
+ // To complete initial health check.
+ manager().acceptTest(HealthCheckStatus(faultFacetType));
+
+ advanceTime(Milliseconds(200));
+ auto prevStats = stats;
+ do {
+ manager().healthCheckTest(observer, CancellationToken::uncancelable());
+ sleepmillis(1);
+ observer = manager().getHealthObserversTest()[0];
+ stats = observer->getStats();
+ } while (stats.completedChecksCount <= prevStats.completedChecksCount);
+
+ ASSERT_GT(stats.lastTimeCheckStarted, prevStats.lastTimeCheckStarted);
+ ASSERT_GT(stats.lastTimeCheckCompleted, prevStats.lastTimeCheckCompleted);
+ ASSERT_GTE(stats.completedChecksCount, 2);
+ ASSERT_GTE(stats.completedChecksWithFaultCount, 2);
}
TEST_F(FaultManagerTest, ProgressMonitorCheck) {
@@ -244,7 +114,11 @@ TEST_F(FaultManagerTest, ProgressMonitorCheck) {
});
// Health check should get stuck here.
- manager().healthCheckTest();
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+ auto observer = manager().getHealthObserversTest()[0];
+ manager().healthCheckTest(observer, CancellationToken::uncancelable());
+
// Verify that the 'crash callback' is invoked after timeout.
bool crashTriggered = false;
std::function<void(std::string cause)> crashCb = [&crashTriggered](std::string) {
@@ -261,36 +135,39 @@ TEST_F(FaultManagerTest, ProgressMonitorCheck) {
resetManager(); // Before fields above go out of scope.
}
-TEST_F(FaultManagerTest, TransitionsToActiveFaultAfterTimeout) {
- auto config = test::getConfigWithDisabledPeriodicChecks();
- config->setActiveFaultDurationForTests(Milliseconds(10));
- resetManager(std::move(config));
- registerMockHealthObserver(FaultFacetType::kMock1, [] { return 1.1; });
- waitForTransitionIntoState(FaultState::kTransientFault);
- ASSERT_TRUE(manager().getFaultState() == FaultState::kTransientFault);
- advanceTime(Milliseconds(10));
- waitForTransitionIntoState(FaultState::kActiveFault);
+TEST_F(FaultManagerTest, HealthCheckRunsPeriodically) {
+ resetManager(std::make_unique<FaultManagerConfig>());
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ auto faultFacetType = FaultFacetType::kMock1;
+ int severity = 0;
+ registerMockHealthObserver(faultFacetType, [&severity] { return severity; });
+
+ assertSoon([this] { return (manager().getFaultState() == FaultState::kStartupCheck); });
+
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+ assertSoon([this] { return (manager().getFaultState() == FaultState::kOk); });
+
+ severity = 1;
+ assertSoon([this] { return (manager().getFaultState() == FaultState::kTransientFault); });
+ resetManager(); // Before fields above go out of scope.
}
-TEST_F(FaultManagerTest, DoesNotTransitionToActiveFaultIfResolved) {
- const auto activeFaultDuration = manager().getConfigTest().getActiveFaultDuration();
- const auto start = clockSource().now();
+TEST_F(FaultManagerTest, PeriodicHealthCheckOnErrorMakesBadHealthStatus) {
+ resetManager(std::make_unique<FaultManagerConfig>());
+ RAIIServerParameterControllerForTest _controller{"featureFlagHealthMonitoring", true};
+ auto faultFacetType = FaultFacetType::kMock1;
- // Initially unhealthy; Transitions to healthy before the active fault timeout.
- registerMockHealthObserver(FaultFacetType::kMock1, [=] {
- auto now = clockSource().now();
- auto elapsed = now - start;
- auto quarterActiveFaultDuration =
- Milliseconds(durationCount<Milliseconds>(activeFaultDuration) / 4);
- if (elapsed < quarterActiveFaultDuration) {
- return 1.1;
- }
- return 0.0;
+ registerMockHealthObserver(faultFacetType, [] {
+ uassert(ErrorCodes::InternalError, "test exception", false);
+ return 0.1;
+ });
+
+ ASSERT_TRUE(manager().getFaultState() == FaultState::kStartupCheck);
+
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+ assertSoon([this] {
+ return manager().currentFault() && manager().getFaultState() == FaultState::kStartupCheck;
});
- waitForTransitionIntoState(FaultState::kTransientFault);
- assertSoonWithHealthCheck([this]() { return manager().getFaultState() == FaultState::kOk; });
- advanceTime(activeFaultDuration);
- assertSoonWithHealthCheck([this]() { return manager().getFaultState() == FaultState::kOk; });
}
} // namespace
diff --git a/src/mongo/db/process_health/state_machine.h b/src/mongo/db/process_health/state_machine.h
index 7a658a434a8..3f5e09255d5 100644
--- a/src/mongo/db/process_health/state_machine.h
+++ b/src/mongo/db/process_health/state_machine.h
@@ -30,6 +30,7 @@
#include <vector>
+#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/util/functional.h"
@@ -52,7 +53,7 @@ public:
using StateCallback = unique_function<void(State, State, const OptionalMessageType&)>;
// State machine accepts InputMessage and optionally transitions to state in the return value
- using MessageHandler = unique_function<boost::optional<State>(InputMessage)>;
+ using MessageHandler = unique_function<boost::optional<State>(const OptionalMessageType&)>;
using TransitionsContainer = stdx::unordered_map<State, std::vector<State>>;
@@ -101,7 +102,8 @@ public:
auto& initialState = getContextOrFatal(_initial);
_current = &initialState;
auto& handler = initialState.stateHandler;
- handler->fireEnter(_current->state(), boost::none);
+ if (handler)
+ handler->fireEnter(_current->state(), boost::none);
}
// Define a valid transition.
@@ -123,9 +125,19 @@ public:
}
// Accept message m, transition the state machine, and return the resulting state.
- State accept(const InputMessage& m) {
+ // Upon the transition to the new state the state machine will call any registered hooks.
+ //
+ // In order to avoid deadlock while calling this function, authors should ensure
+ // that:
+ // 1. A recursive call only occurs from the current thread; or
+ // 2. For any hooks run as a result of accepting this message, no blocking calls are made
+ // involving shared resources with another thread that may call this function.
+ State accept(const OptionalMessageType& m) {
tassertStarted();
+ stdx::lock_guard<stdx::recursive_mutex> lk(_mutex);
+
auto& handler = _current->stateHandler;
+
auto result = handler->accept(m);
if (result) {
setState(*result, m);
@@ -160,7 +172,7 @@ public:
// Accepts input message m when state machine is in state _state. Optionally, the
// state machine transitions to the state specified in the return value. Entry and exit
// hooks will not fire if this method returns boost::none.
- virtual boost::optional<State> accept(const InputMessage& m) noexcept = 0;
+ virtual boost::optional<State> accept(const OptionalMessageType& message) noexcept = 0;
// The state this handler is defined for
State state() const {
@@ -187,6 +199,8 @@ public:
cb(_state, newState, message);
}
+ bool _isTransient = false;
+
protected:
// The state we are handling
const State _state;
@@ -204,7 +218,7 @@ public:
: StateHandler(state), _messageHandler(std::move(m)) {}
~LambdaStateHandler() override {}
- boost::optional<State> accept(const InputMessage& m) noexcept override {
+ boost::optional<State> accept(const OptionalMessageType& m) noexcept override {
return _messageHandler(m);
}
@@ -219,13 +233,22 @@ public:
return context.stateHandler.get();
}
- StateEventRegistryPtr registerHandler(State s, MessageHandler&& handler) {
+ StateEventRegistryPtr registerHandler(State s, MessageHandler&& handler, bool isTransient) {
tassertNotStarted();
+
auto& context = _states[s];
context.stateHandler = std::make_unique<LambdaStateHandler>(s, std::move(handler));
+ if (isTransient) {
+ context.stateHandler->_isTransient = true;
+ }
+
return context.stateHandler.get();
}
+ StateEventRegistryPtr registerHandler(State s, MessageHandler&& handler) {
+ return registerHandler(s, std::move(handler), false);
+ }
+
protected:
struct StateContext {
StateHandlerPtr stateHandler;
@@ -259,6 +282,10 @@ protected:
// fire entry hooks for new state
_current->stateHandler->fireEnter(previousContext.state(), message);
+
+ if (_current->stateHandler->_isTransient) {
+ accept(message);
+ }
}
StateHandler* getHandlerOrFatal(State s) {
@@ -281,6 +308,7 @@ protected:
return getHandlerOrFatal(s);
}
+ stdx::recursive_mutex _mutex;
bool _started;
State _initial;
StateContext* _current = nullptr;
diff --git a/src/mongo/db/process_health/state_machine_test.cpp b/src/mongo/db/process_health/state_machine_test.cpp
index c7c3e48115f..f974a4adf8c 100644
--- a/src/mongo/db/process_health/state_machine_test.cpp
+++ b/src/mongo/db/process_health/state_machine_test.cpp
@@ -58,8 +58,8 @@ public:
public:
using StateMachineTest::StateHandler::StateHandler;
- boost::optional<MachineState> accept(const Message& m) noexcept override {
- return m.nextState;
+ boost::optional<MachineState> accept(const SM::OptionalMessageType& m) noexcept override {
+ return m->nextState;
}
};
@@ -95,7 +95,8 @@ TEST_F(StateMachineTestFixture, RegistersMessageHandlerAndStateHooksFluently) {
};
subject()
- ->registerHandler(MachineState::B, [](const Message& m) { return m.nextState; })
+ ->registerHandler(MachineState::B,
+ [](const SM::OptionalMessageType& m) { return m->nextState; })
// hooks registerd for state B
->enter(hook())
->exit(hook());
diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp
index 7b4f3e9351a..3d65ee6c0d4 100644
--- a/src/mongo/s/mongos_main.cpp
+++ b/src/mongo/s/mongos_main.cpp
@@ -758,7 +758,15 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
PeriodicTask::startRunningPeriodicTasks();
- process_health::FaultManager::get(serviceContext)->startPeriodicHealthChecks();
+ status =
+ process_health::FaultManager::get(serviceContext)->startPeriodicHealthChecks().getNoThrow();
+ if (!status.isOK()) {
+ LOGV2_ERROR(5936510,
+ "Error completing initial health check: {error}",
+ "Error completing initial health check",
+ "error"_attr = redact(status));
+ return EXIT_PROCESS_HEALTH_CHECK;
+ }
SessionKiller::set(serviceContext,
std::make_shared<SessionKiller>(serviceContext, killSessionsRemote));