summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Shuvalov <andrew.shuvalov@mongodb.com>2021-11-03 19:18:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-03 20:21:00 +0000
commit45c2f1f18f5704872a65d395d8085fb9385d59d8 (patch)
tree53bad7125915584255bbd88b711b5f25ea3d2fce
parent582e4151293d773c2de9fde20cc4726f79829ec0 (diff)
downloadmongo-45c2f1f18f5704872a65d395d8085fb9385d59d8.tar.gz
SERVER-59366 Progress monitor for periodic health check
-rw-r--r--src/mongo/db/process_health/SConscript1
-rw-r--r--src/mongo/db/process_health/fault_manager.cpp62
-rw-r--r--src/mongo/db/process_health/fault_manager.h39
-rw-r--r--src/mongo/db/process_health/fault_manager_config.h15
-rw-r--r--src/mongo/db/process_health/fault_manager_test.cpp4
-rw-r--r--src/mongo/db/process_health/fault_manager_test_suite.h32
-rw-r--r--src/mongo/db/process_health/health_observer.h28
-rw-r--r--src/mongo/db/process_health/health_observer_base.cpp42
-rw-r--r--src/mongo/db/process_health/health_observer_base.h34
-rw-r--r--src/mongo/db/process_health/health_observer_mock.h11
-rw-r--r--src/mongo/db/process_health/health_observer_registration.cpp11
-rw-r--r--src/mongo/db/process_health/health_observer_registration.h5
-rw-r--r--src/mongo/db/process_health/health_observer_test.cpp54
-rw-r--r--src/mongo/db/process_health/progress_monitor.cpp122
-rw-r--r--src/mongo/db/process_health/progress_monitor.h75
-rw-r--r--src/mongo/util/exit_code.h5
16 files changed, 466 insertions, 74 deletions
diff --git a/src/mongo/db/process_health/SConscript b/src/mongo/db/process_health/SConscript
index 61395478ea9..8d9d5b807f0 100644
--- a/src/mongo/db/process_health/SConscript
+++ b/src/mongo/db/process_health/SConscript
@@ -15,6 +15,7 @@ env.Library(
'health_monitoring_server_parameters.cpp',
'health_observer_base.cpp',
'health_observer_registration.cpp',
+ 'progress_monitor.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/process_health/fault_manager.cpp b/src/mongo/db/process_health/fault_manager.cpp
index 0f7833c9f8a..c237e27773b 100644
--- a/src/mongo/db/process_health/fault_manager.cpp
+++ b/src/mongo/db/process_health/fault_manager.cpp
@@ -44,6 +44,7 @@
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/exit_code.h"
namespace mongo {
@@ -70,9 +71,6 @@ ServiceContext::ConstructorActionRegisterer faultManagerRegisterer{
} // namespace
-
-static constexpr auto kPeriodicHealthCheckInterval{Milliseconds(50)};
-
FaultManager* FaultManager::get(ServiceContext* svcCtx) {
return sFaultManager(svcCtx).get();
}
@@ -107,17 +105,34 @@ FaultManager::TransientFaultDeadline::~TransientFaultDeadline() {
FaultManager::FaultManager(ServiceContext* svcCtx,
std::shared_ptr<executor::TaskExecutor> taskExecutor,
std::unique_ptr<FaultManagerConfig> config)
- : _config(std::move(config)), _svcCtx(svcCtx), _taskExecutor(taskExecutor) {
+ : _config(std::move(config)),
+ _svcCtx(svcCtx),
+ _taskExecutor(taskExecutor),
+ _crashCb([](std::string cause) {
+ LOGV2_ERROR(5936605,
+ "Fault manager progress monitor is terminating the server",
+ "cause"_attr = cause);
+ // This calls the exit_group syscall on Linux
+ ::_exit(ExitCode::EXIT_PROCESS_HEALTH_CHECK);
+ }) {
invariant(_svcCtx);
invariant(_svcCtx->getFastClockSource());
+ invariant(_svcCtx->getPreciseClockSource());
}
+FaultManager::FaultManager(ServiceContext* svcCtx,
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ std::unique_ptr<FaultManagerConfig> config,
+ std::function<void(std::string cause)> crashCb)
+ : _config(std::move(config)), _svcCtx(svcCtx), _taskExecutor(taskExecutor), _crashCb(crashCb) {}
+
void FaultManager::schedulePeriodicHealthCheckThread(bool immediately) {
if (!feature_flags::gFeatureFlagHealthMonitoring.isEnabled(
serverGlobalParams.featureCompatibility)) {
return;
}
+ const auto kPeriodicHealthCheckInterval = getConfig().getPeriodicHealthCheckInterval();
auto lk = stdx::lock_guard(_mutex);
const auto cb = [this](const mongo::executor::TaskExecutor::CallbackArgs& cbData) {
@@ -139,12 +154,23 @@ void FaultManager::schedulePeriodicHealthCheckThread(bool immediately) {
}
FaultManager::~FaultManager() {
+ _managerShuttingDownCancellationSource.cancel();
_taskExecutor->shutdown();
+
+ LOGV2(5936601, "Shutting down periodic health checks");
if (_periodicHealthCheckCbHandle) {
_taskExecutor->cancel(*_periodicHealthCheckCbHandle);
}
+ // All health checks must use the _taskExecutor, joining it
+ // should guarantee that health checks are done. Hovewer, if a health
+ // check is stuck in some blocking call the _progressMonitorThread will
+ // kill the process after timeout.
_taskExecutor->join();
+ // Must be joined after _taskExecutor.
+ if (_progressMonitor) {
+ _progressMonitor->join();
+ }
if (!_initialHealthCheckCompletedPromise.getFuture().isReady()) {
_initialHealthCheckCompletedPromise.emplaceValue();
@@ -194,15 +220,16 @@ FaultFacetsContainerPtr FaultManager::getOrCreateFaultFacetsContainer() {
void FaultManager::healthCheck() {
// One time init.
- _initHealthObserversIfNeeded();
+ _firstTimeInitIfNeeded();
ON_BLOCK_EXIT([this] { schedulePeriodicHealthCheckThread(); });
std::vector<HealthObserver*> observers = FaultManager::getHealthObservers();
// Start checks outside of lock.
+ auto token = _managerShuttingDownCancellationSource.token();
for (auto observer : observers) {
- observer->periodicCheck(*this, _taskExecutor);
+ observer->periodicCheck(*this, _taskExecutor, token);
}
// Garbage collect all resolved fault facets.
@@ -267,6 +294,11 @@ bool FaultManager::hasCriticalFacet(const FaultInternal* fault) const {
return false;
}
+FaultManagerConfig FaultManager::getConfig() const {
+ auto lk = stdx::lock_guard(_mutex);
+ return *_config;
+}
+
void FaultManager::processFaultExistsEvent() {
FaultState currentState = getFaultState();
@@ -341,20 +373,19 @@ void FaultManager::transitionToState(FaultState newState) {
_currentState = newState;
}
-void FaultManager::_initHealthObserversIfNeeded() {
- if (_initializedAllHealthObservers.load()) {
+void FaultManager::_firstTimeInitIfNeeded() {
+ if (_firstTimeInitExecuted.load()) {
return;
}
auto lk = stdx::lock_guard(_mutex);
// One more time under lock to avoid race.
- if (_initializedAllHealthObservers.load()) {
+ if (_firstTimeInitExecuted.load()) {
return;
}
- _initializedAllHealthObservers.store(true);
+ _firstTimeInitExecuted.store(true);
- _observers = HealthObserverRegistration::instantiateAllObservers(_svcCtx->getFastClockSource(),
- _svcCtx->getTickSource());
+ _observers = HealthObserverRegistration::instantiateAllObservers(_svcCtx);
// Verify that all observer types are unique.
std::set<FaultFacetType> allTypes;
@@ -363,6 +394,9 @@ void FaultManager::_initHealthObserversIfNeeded() {
}
invariant(allTypes.size() == _observers.size());
+ // Start the monitor thread after all observers are initialized.
+ _progressMonitor = std::make_unique<ProgressMonitor>(this, _svcCtx, _crashCb);
+
auto lk2 = stdx::lock_guard(_stateMutex);
LOGV2(5956701,
"Instantiated health observers, periodic health checking starts",
@@ -381,5 +415,9 @@ std::vector<HealthObserver*> FaultManager::getHealthObservers() {
return result;
}
+void FaultManager::progressMonitorCheckForTests(std::function<void(std::string cause)> crashCb) {
+ _progressMonitor->progressMonitorCheck(crashCb);
+}
+
} // namespace process_health
} // namespace mongo
diff --git a/src/mongo/db/process_health/fault_manager.h b/src/mongo/db/process_health/fault_manager.h
index 2bf518da975..15b95353dce 100644
--- a/src/mongo/db/process_health/fault_manager.h
+++ b/src/mongo/db/process_health/fault_manager.h
@@ -36,6 +36,7 @@
#include "mongo/db/process_health/fault_manager_config.h"
#include "mongo/db/process_health/health_monitoring_server_parameters_gen.h"
#include "mongo/db/process_health/health_observer.h"
+#include "mongo/db/process_health/progress_monitor.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/atomic_word.h"
@@ -60,9 +61,14 @@ class FaultManager : protected FaultFacetsContainerFactory {
public:
// The taskExecutor provided should not be already started.
- explicit FaultManager(ServiceContext* svcCtx,
- std::shared_ptr<executor::TaskExecutor> taskExecutor,
- std::unique_ptr<FaultManagerConfig> config);
+ FaultManager(ServiceContext* svcCtx,
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ std::unique_ptr<FaultManagerConfig> config);
+ // Variant with explicit crash callback for tests.
+ FaultManager(ServiceContext* svcCtx,
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ std::unique_ptr<FaultManagerConfig> config,
+ std::function<void(std::string cause)> crashCb);
virtual ~FaultManager();
// Start periodic health checks, invoke it once during server startup.
@@ -84,15 +90,18 @@ public:
// Returns the current fault, if any. Otherwise returns an empty pointer.
FaultConstPtr currentFault() const;
+ // All observers remain valid for the manager lifetime, thus returning
+ // just pointers is safe, as long as they are used while manager exists.
+ std::vector<HealthObserver*> getHealthObservers();
+
+ // Gets the aggregate configuration for all process health environment.
+ FaultManagerConfig getConfig() const;
+
protected:
// Starts the health check sequence and updates the internal state on completion.
// This is invoked by the internal timer.
virtual void healthCheck();
- // All observers remain valid for the manager lifetime, thus returning
- // just pointers is safe, as long as they are used while manager exists.
- std::vector<HealthObserver*> getHealthObservers();
-
// Protected interface FaultFacetsContainerFactory implementation.
// The interface FaultFacetsContainerFactory is implemented by the member '_fault'.
@@ -119,23 +128,29 @@ protected:
// TODO: move this into fault class; refactor to remove FaultInternal
bool hasCriticalFacet(const FaultInternal* fault) const;
- std::unique_ptr<FaultManagerConfig> _config;
+ void progressMonitorCheckForTests(std::function<void(std::string cause)> crashCb);
private:
// One time init.
- void _initHealthObserversIfNeeded();
+ void _firstTimeInitIfNeeded();
+ std::unique_ptr<FaultManagerConfig> _config;
ServiceContext* const _svcCtx;
+ std::shared_ptr<executor::TaskExecutor> _taskExecutor;
+ // Callback used to crash the server.
+ std::function<void(std::string cause)> _crashCb;
mutable Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(5), "FaultManager::_mutex");
std::shared_ptr<FaultInternal> _fault;
// We lazily init all health observers.
- AtomicWord<bool> _initializedAllHealthObservers{false};
+ AtomicWord<bool> _firstTimeInitExecuted{false};
+ // This source is canceled before the _taskExecutor shutdown(). It
+ // can be used to check for the start of the shutdown sequence.
+ CancellationSource _managerShuttingDownCancellationSource;
// Manager owns all observer instances.
std::vector<std::unique_ptr<HealthObserver>> _observers;
- std::shared_ptr<executor::TaskExecutor> _taskExecutor;
boost::optional<executor::TaskExecutor::CallbackHandle> _periodicHealthCheckCbHandle;
SharedPromise<void> _initialHealthCheckCompletedPromise;
@@ -162,6 +177,8 @@ private:
ExecutorFuture<void> activeFaultTransition;
};
std::unique_ptr<TransientFaultDeadline> _transientFaultDeadline;
+
+ std::unique_ptr<ProgressMonitor> _progressMonitor;
};
} // namespace process_health
diff --git a/src/mongo/db/process_health/fault_manager_config.h b/src/mongo/db/process_health/fault_manager_config.h
index d127abc20a4..2c64565018f 100644
--- a/src/mongo/db/process_health/fault_manager_config.h
+++ b/src/mongo/db/process_health/fault_manager_config.h
@@ -98,10 +98,23 @@ public:
MONGO_UNREACHABLE;
}
}
- Milliseconds getActiveFaultDuration() {
+
+ Milliseconds getActiveFaultDuration() const {
return kActiveFaultDuration;
}
+ Milliseconds getPeriodicHealthCheckInterval() const {
+ return Milliseconds(50);
+ }
+
+ Milliseconds getPeriodicLivenessCheckInterval() const {
+ return Milliseconds(50);
+ }
+
+ Seconds getPeriodicLivenessDeadline() const {
+ return Seconds(300);
+ }
+
protected:
// If the server persists in TransientFault for more than this duration
// it will move to the ActiveFault state and terminate.
diff --git a/src/mongo/db/process_health/fault_manager_test.cpp b/src/mongo/db/process_health/fault_manager_test.cpp
index 50cb1de29f8..d216206c505 100644
--- a/src/mongo/db/process_health/fault_manager_test.cpp
+++ b/src/mongo/db/process_health/fault_manager_test.cpp
@@ -49,9 +49,9 @@ TEST(FaultManagerTest, Registration) {
// Tests the default health observer intensity of non-critical
TEST_F(FaultManagerTest, GetHealthObserverIntensity) {
auto config = manager().getConfig();
- ASSERT(config->getHealthObserverIntensity(FaultFacetType::kLdap) ==
+ ASSERT(config.getHealthObserverIntensity(FaultFacetType::kLdap) ==
HealthObserverIntensityEnum::kNonCritical);
- ASSERT(config->getHealthObserverIntensity(FaultFacetType::kDns) ==
+ ASSERT(config.getHealthObserverIntensity(FaultFacetType::kDns) ==
HealthObserverIntensityEnum::kNonCritical);
}
diff --git a/src/mongo/db/process_health/fault_manager_test_suite.h b/src/mongo/db/process_health/fault_manager_test_suite.h
index 2258168ebeb..5f7a59ca538 100644
--- a/src/mongo/db/process_health/fault_manager_test_suite.h
+++ b/src/mongo/db/process_health/fault_manager_test_suite.h
@@ -56,7 +56,13 @@ class FaultManagerTestImpl : public FaultManager {
public:
FaultManagerTestImpl(ServiceContext* svcCtx,
std::shared_ptr<executor::TaskExecutor> taskExecutor)
- : FaultManager(svcCtx, taskExecutor, std::make_unique<FaultManagerConfig>()) {}
+ : FaultManager(
+ svcCtx, taskExecutor, std::make_unique<FaultManagerConfig>(), [](std::string cause) {
+ // In tests, do not crash.
+ LOGV2(5936606,
+ "Fault manager progress monitor triggered the termination",
+ "cause"_attr = cause);
+ }) {}
void transitionStateTest(FaultState newState) {
transitionToState(newState);
@@ -88,8 +94,12 @@ public:
return *(static_cast<FaultInternal*>(fault.get()));
}
- FaultManagerConfig* getConfig() {
- return _config.get();
+ void progressMonitorCheckTest(std::function<void(std::string cause)> crashCb) {
+ progressMonitorCheckForTests(crashCb);
+ }
+
+ FaultManagerConfig getConfigTest() {
+ return getConfig();
}
};
@@ -104,6 +114,7 @@ public:
_svcCtx = ServiceContext::make();
_svcCtx->setFastClockSource(std::make_unique<ClockSourceMock>());
+ _svcCtx->setPreciseClockSource(std::make_unique<ClockSourceMock>());
_svcCtx->setTickSource(std::make_unique<TickSourceMock<Milliseconds>>());
resetManager();
@@ -130,9 +141,8 @@ public:
void registerMockHealthObserver(FaultFacetType mockType,
std::function<double()> getSeverityCallback) {
HealthObserverRegistration::registerObserverFactory(
- [mockType, getSeverityCallback](ClockSource* clockSource, TickSource* tickSource) {
- return std::make_unique<HealthObserverMock>(
- mockType, clockSource, tickSource, getSeverityCallback);
+ [mockType, getSeverityCallback](ServiceContext* svcCtx) {
+ return std::make_unique<HealthObserverMock>(mockType, svcCtx, getSeverityCallback);
});
}
@@ -144,6 +154,10 @@ public:
return *static_cast<ClockSourceMock*>(_svcCtx->getFastClockSource());
}
+ ServiceContext* svcCtx() const {
+ return _svcCtx.get();
+ }
+
TickSourceMock<Milliseconds>& tickSource() {
return *static_cast<TickSourceMock<Milliseconds>*>(_svcCtx->getTickSource());
}
@@ -152,7 +166,13 @@ public:
void advanceTime(Duration d) {
executor::NetworkInterfaceMock::InNetworkGuard guard(_net);
_net->advanceTime(_net->now() + d);
+ advanceClockSourcesTime(d);
+ }
+
+ template <typename Duration>
+ void advanceClockSourcesTime(Duration d) {
clockSource().advance(d);
+ static_cast<ClockSourceMock*>(_svcCtx->getPreciseClockSource())->advance(d);
tickSource().advance(d);
}
diff --git a/src/mongo/db/process_health/health_observer.h b/src/mongo/db/process_health/health_observer.h
index c4e775be70c..40c9d379db7 100644
--- a/src/mongo/db/process_health/health_observer.h
+++ b/src/mongo/db/process_health/health_observer.h
@@ -32,11 +32,32 @@
#include "mongo/db/process_health/fault_facets_container.h"
#include "mongo/db/process_health/fault_manager_config.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/util/future.h"
namespace mongo {
namespace process_health {
/**
+ * Liveness data and stats.
+ */
+struct HealthObserverLivenessStats {
+ // True if this observer is enabled.
+ bool isEnabled = false;
+ // true is this observer is currently running a health check.
+ bool currentlyRunningHealthCheck = false;
+ // When the last or current check started, depending if currently
+ // running one.
+ Date_t lastTimeCheckStarted = Date_t::max();
+ // When the last check completed (not the current one).
+ Date_t lastTimeCheckCompleted = Date_t::max();
+ // Incremented when a check is done.
+ int completedChecksCount = 0;
+ // Incremented when check completed with fault.
+ // This doesn't take into account critical vs non-critical.
+ int completedChecksWithFaultCount = 0;
+};
+
+/**
* Interface to conduct periodic health checks.
* Every instance of health observer is wired internally to update the state of the FaultManager
* when a problem is detected.
@@ -65,12 +86,15 @@ public:
* @param factory Interface to get or create the factory of facets container.
*/
virtual void periodicCheck(FaultFacetsContainerFactory& factory,
- std::shared_ptr<executor::TaskExecutor> taskExecutor) = 0;
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ CancellationToken token) = 0;
/**
* @return HealthObserverIntensity
*/
- virtual HealthObserverIntensity getIntensity() = 0;
+ virtual HealthObserverIntensity getIntensity() const = 0;
+
+ virtual HealthObserverLivenessStats getStats() const = 0;
};
} // namespace process_health
diff --git a/src/mongo/db/process_health/health_observer_base.cpp b/src/mongo/db/process_health/health_observer_base.cpp
index 918093b34b4..d3f6dd03c30 100644
--- a/src/mongo/db/process_health/health_observer_base.cpp
+++ b/src/mongo/db/process_health/health_observer_base.cpp
@@ -37,11 +37,11 @@
namespace mongo {
namespace process_health {
-HealthObserverBase::HealthObserverBase(ClockSource* clockSource, TickSource* tickSource)
- : _clockSource(clockSource), _tickSource(tickSource) {}
+HealthObserverBase::HealthObserverBase(ServiceContext* svcCtx) : _svcCtx(svcCtx) {}
void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory,
- std::shared_ptr<executor::TaskExecutor> taskExecutor) {
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ CancellationToken token) {
// TODO(SERVER-59368): fix this for runtime options support.
if (getIntensity() == HealthObserverIntensity::kOff) {
return;
@@ -53,10 +53,11 @@ void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory,
return;
}
- if (_clockSource->now() - _lastTimeTheCheckWasRun < minimalCheckInterval()) {
+ const auto now = _svcCtx->getPreciseClockSource()->now();
+ if (now - _lastTimeTheCheckWasRun < minimalCheckInterval()) {
return;
}
- _lastTimeTheCheckWasRun = _clockSource->now();
+ _lastTimeTheCheckWasRun = now;
LOGV2_DEBUG(6007902, 2, "Start periodic health check", "observerType"_attr = getType());
@@ -64,10 +65,17 @@ void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory,
}
// Do the health check.
- taskExecutor->schedule([this, &factory](Status status) {
- periodicCheckImpl({})
+ taskExecutor->schedule([this, &factory, token, taskExecutor](Status status) {
+ periodicCheckImpl({token, taskExecutor})
.then([this, &factory](HealthCheckStatus&& checkStatus) mutable {
+ const auto severity = checkStatus.getSeverity();
factory.updateWithCheckStatus(std::move(checkStatus));
+
+ auto lk = stdx::lock_guard(_mutex);
+ ++_completedChecksCount;
+ if (!HealthCheckStatus::isResolved(severity)) {
+ ++_completedChecksWithFaultCount;
+ }
})
.onCompletion([this](Status status) {
if (!status.isOK()) {
@@ -75,15 +83,17 @@ void HealthObserverBase::periodicCheck(FaultFacetsContainerFactory& factory,
LOGV2_ERROR(
6007901, "Unexpected failure during health check", "status"_attr = status);
}
+ const auto now = _svcCtx->getPreciseClockSource()->now();
auto lk = stdx::lock_guard(_mutex);
invariant(_currentlyRunningHealthCheck);
_currentlyRunningHealthCheck = false;
+ _lastTimeCheckCompleted = now;
})
.getAsync([this](Status status) {});
});
}
-HealthObserverIntensity HealthObserverBase::getIntensity() {
+HealthObserverIntensity HealthObserverBase::getIntensity() const {
return _intensity;
}
@@ -107,5 +117,21 @@ HealthCheckStatus HealthObserverBase::makeSimpleFailedStatus(double severity,
return HealthCheckStatus(getType(), severity, sb.stringData());
}
+HealthObserverLivenessStats HealthObserverBase::getStats() const {
+ auto lk = stdx::lock_guard(_mutex);
+ return getStatsLocked(lk);
+}
+
+HealthObserverLivenessStats HealthObserverBase::getStatsLocked(WithLock) const {
+ HealthObserverLivenessStats stats;
+ stats.isEnabled = _intensity != HealthObserverIntensity::kOff;
+ stats.currentlyRunningHealthCheck = _currentlyRunningHealthCheck;
+ stats.lastTimeCheckStarted = _lastTimeTheCheckWasRun;
+ stats.lastTimeCheckCompleted = _lastTimeCheckCompleted;
+ stats.completedChecksCount = _completedChecksCount;
+ stats.completedChecksWithFaultCount = _completedChecksWithFaultCount;
+ return stats;
+}
+
} // namespace process_health
} // namespace mongo
diff --git a/src/mongo/db/process_health/health_observer_base.h b/src/mongo/db/process_health/health_observer_base.h
index 5c227e6355c..382f237c6fa 100644
--- a/src/mongo/db/process_health/health_observer_base.h
+++ b/src/mongo/db/process_health/health_observer_base.h
@@ -42,15 +42,19 @@ namespace process_health {
*/
class HealthObserverBase : public HealthObserver {
public:
- HealthObserverBase(ClockSource* clockSource, TickSource* tickSource);
+ explicit HealthObserverBase(ServiceContext* svcCtx);
virtual ~HealthObserverBase() = default;
ClockSource* clockSource() const {
- return _clockSource;
+ return _svcCtx->getPreciseClockSource();
}
TickSource* tickSource() const {
- return _tickSource;
+ return _svcCtx->getTickSource();
+ }
+
+ ServiceContext* svcCtx() const {
+ return _svcCtx;
}
/**
@@ -63,10 +67,16 @@ public:
// Implements the common logic for periodic checks.
// Every observer should implement periodicCheckImpl() for specific tests.
void periodicCheck(FaultFacetsContainerFactory& factory,
- std::shared_ptr<executor::TaskExecutor> taskExecutor) override;
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ CancellationToken token) override;
+
+ HealthObserverLivenessStats getStats() const override;
protected:
- struct PeriodicHealthCheckContext {};
+ struct PeriodicHealthCheckContext {
+ CancellationToken cancellationToken;
+ std::shared_ptr<executor::TaskExecutor> taskExecutor;
+ };
/**
* The main method every health observer should implement for a particular
@@ -77,7 +87,7 @@ protected:
virtual Future<HealthCheckStatus> periodicCheckImpl(
PeriodicHealthCheckContext&& periodicCheckContext) = 0;
- HealthObserverIntensity getIntensity() override;
+ HealthObserverIntensity getIntensity() const override;
// Helper method to create a status without errors.
HealthCheckStatus makeHealthyStatus() const;
@@ -85,17 +95,21 @@ protected:
// Make a generic error status.
HealthCheckStatus makeSimpleFailedStatus(double severity, std::vector<Status>&& failures) const;
- ClockSource* const _clockSource;
- TickSource* const _tickSource;
+ HealthObserverLivenessStats getStatsLocked(WithLock) const;
+
+ ServiceContext* const _svcCtx;
mutable Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "HealthObserverBase::_mutex");
- // Initially disable all observers until enabled by config options.
- HealthObserverIntensity _intensity = HealthObserverIntensity::kOff;
+ // TODO: remove this field, should be done in config.
+ HealthObserverIntensity _intensity = HealthObserverIntensity::kNonCritical;
// Indicates if there any check running to prevent running checks concurrently.
bool _currentlyRunningHealthCheck = false;
// Enforces the safety interval.
Date_t _lastTimeTheCheckWasRun;
+ Date_t _lastTimeCheckCompleted;
+ int _completedChecksCount = 0;
+ int _completedChecksWithFaultCount = 0;
};
} // namespace process_health
diff --git a/src/mongo/db/process_health/health_observer_mock.h b/src/mongo/db/process_health/health_observer_mock.h
index 96c984960ae..17516f13c98 100644
--- a/src/mongo/db/process_health/health_observer_mock.h
+++ b/src/mongo/db/process_health/health_observer_mock.h
@@ -43,10 +43,9 @@ namespace process_health {
class HealthObserverMock : public HealthObserverBase {
public:
HealthObserverMock(FaultFacetType mockType,
- ClockSource* clockSource,
- TickSource* tickSource,
+ ServiceContext* svcCtx,
std::function<double()> getSeverityCallback)
- : HealthObserverBase(clockSource, tickSource),
+ : HealthObserverBase(svcCtx),
_mockType(mockType),
_getSeverityCallback(getSeverityCallback) {}
@@ -64,14 +63,18 @@ protected:
auto completionPf = makePromiseFuture<HealthCheckStatus>();
if (HealthCheckStatus::isResolved(severity)) {
+ LOGV2(5936603, "Mock health observer returns a resolved severity");
completionPf.promise.emplaceValue(HealthCheckStatus(getType()));
} else {
+ LOGV2(5936604,
+ "Mock health observer returns a fault severity",
+ "severity"_attr = severity);
completionPf.promise.emplaceValue(HealthCheckStatus(getType(), severity, "failed"));
}
return std::move(completionPf.future);
}
- HealthObserverIntensity getIntensity() override {
+ HealthObserverIntensity getIntensity() const override {
return HealthObserverIntensity::kNonCritical;
}
diff --git a/src/mongo/db/process_health/health_observer_registration.cpp b/src/mongo/db/process_health/health_observer_registration.cpp
index f59aa3b3155..a1da3e30f94 100644
--- a/src/mongo/db/process_health/health_observer_registration.cpp
+++ b/src/mongo/db/process_health/health_observer_registration.cpp
@@ -34,8 +34,8 @@ namespace process_health {
namespace {
-using HealthObserverFactoryCallback = std::function<std::unique_ptr<HealthObserver>(
- ClockSource* clockSource, TickSource* tickSource)>;
+using HealthObserverFactoryCallback =
+ std::function<std::unique_ptr<HealthObserver>(ServiceContext* svcCtx)>;
// Returns static vector of all registrations.
// No synchronization is required as all the factories are registered during
@@ -49,16 +49,15 @@ std::vector<HealthObserverFactoryCallback>* getObserverFactories() {
} // namespace
void HealthObserverRegistration::registerObserverFactory(
- std::function<std::unique_ptr<HealthObserver>(ClockSource* clockSource, TickSource* tickSource)>
- factoryCallback) {
+ std::function<std::unique_ptr<HealthObserver>(ServiceContext* svcCtx)> factoryCallback) {
getObserverFactories()->push_back(std::move(factoryCallback));
}
std::vector<std::unique_ptr<HealthObserver>> HealthObserverRegistration::instantiateAllObservers(
- ClockSource* clockSource, TickSource* tickSource) {
+ ServiceContext* svcCtx) {
std::vector<std::unique_ptr<HealthObserver>> result;
for (auto& cb : *getObserverFactories()) {
- result.push_back(cb(clockSource, tickSource));
+ result.push_back(cb(svcCtx));
}
return result;
}
diff --git a/src/mongo/db/process_health/health_observer_registration.h b/src/mongo/db/process_health/health_observer_registration.h
index b7c72bd2b7a..6606e13f05a 100644
--- a/src/mongo/db/process_health/health_observer_registration.h
+++ b/src/mongo/db/process_health/health_observer_registration.h
@@ -52,15 +52,14 @@ public:
* @param factoryCallback creates observer instance when invoked.
*/
static void registerObserverFactory(
- std::function<std::unique_ptr<HealthObserver>(ClockSource* clockSource,
- TickSource* tickSource)> factoryCallback);
+ std::function<std::unique_ptr<HealthObserver>(ServiceContext* svcCtx)> factoryCallback);
/**
* Invokes all registered factories and returns new instances.
* The ownership of all observers is transferred to the invoker.
*/
static std::vector<std::unique_ptr<HealthObserver>> instantiateAllObservers(
- ClockSource* clockSource, TickSource* tickSource);
+ ServiceContext* svcCtx);
/**
* Test-only method to cleanup the list of registered factories.
diff --git a/src/mongo/db/process_health/health_observer_test.cpp b/src/mongo/db/process_health/health_observer_test.cpp
index 630f2f8d4a1..f75dfc5eb14 100644
--- a/src/mongo/db/process_health/health_observer_test.cpp
+++ b/src/mongo/db/process_health/health_observer_test.cpp
@@ -50,8 +50,7 @@ namespace {
// by the instantiate method below will be greater than expected.
TEST_F(FaultManagerTest, Registration) {
registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0; });
- auto allObservers =
- HealthObserverRegistration::instantiateAllObservers(&clockSource(), &tickSource());
+ auto allObservers = HealthObserverRegistration::instantiateAllObservers(svcCtx());
ASSERT_EQ(1, allObservers.size());
ASSERT_EQ(FaultFacetType::kMock1, allObservers[0]->getType());
}
@@ -70,7 +69,7 @@ TEST_F(FaultManagerTest, HealthCheckCreatesFacetOnHealthCheckFoundFault) {
advanceTime(Milliseconds(100));
manager().healthCheckTest();
- waitForFaultBeingCreated();
+ waitForTransitionIntoState(FaultState::kTransientFault);
auto currentFault = manager().currentFault();
ASSERT_TRUE(currentFault); // Is created.
}
@@ -221,16 +220,58 @@ TEST_F(FaultManagerTest, InitialHealthCheckDoesNotRunIfFeatureFlagNotEnabled) {
ASSERT_TRUE(manager().getFaultState() == FaultState::kStartupCheck);
}
+TEST_F(FaultManagerTest, Stats) {
+ advanceTime(Milliseconds(100));
+ registerMockHealthObserver(FaultFacetType::kMock1, [] { return 0.1; });
+ waitForTransitionIntoState(FaultState::kTransientFault);
+
+ auto observer = manager().getHealthObserversTest()[0];
+ auto stats = observer->getStats();
+ ASSERT_TRUE(stats.isEnabled);
+ ASSERT_FALSE(stats.currentlyRunningHealthCheck);
+ ASSERT_TRUE(stats.lastTimeCheckStarted >= clockSource().now());
+ ASSERT_TRUE(stats.lastTimeCheckCompleted >= stats.lastTimeCheckStarted);
+ ASSERT_TRUE(stats.completedChecksCount >= 1);
+ ASSERT_TRUE(stats.completedChecksWithFaultCount >= 1);
+}
+
+TEST_F(FaultManagerTest, ProgressMonitorCheck) {
+ AtomicWord<bool> shouldBlock{true};
+ registerMockHealthObserver(FaultFacetType::kMock1, [this, &shouldBlock] {
+ while (shouldBlock.load()) {
+ sleepFor(Milliseconds(1));
+ }
+ return 0.1;
+ });
+
+ // Health check should get stuck here.
+ manager().healthCheckTest();
+ // Verify that the 'crash callback' is invoked after timeout.
+ bool crashTriggered = false;
+ std::function<void(std::string cause)> crashCb = [&crashTriggered](std::string) {
+ crashTriggered = true;
+ };
+ manager().progressMonitorCheckTest(crashCb);
+ // The progress check passed because the simulated time did not advance.
+ ASSERT_FALSE(crashTriggered);
+ advanceClockSourcesTime(manager().getConfig().getPeriodicLivenessDeadline() + Seconds(1));
+ manager().progressMonitorCheckTest(crashCb);
+ // The progress check simulated a crash.
+ ASSERT_TRUE(crashTriggered);
+ shouldBlock.store(false);
+ resetManager(); // Before fields above go out of scope.
+}
+
TEST_F(FaultManagerTest, TransitionsToActiveFaultAfterTimeout) {
registerMockHealthObserver(FaultFacetType::kMock1, [] { return 1.1; });
waitForTransitionIntoState(FaultState::kTransientFault);
ASSERT_TRUE(manager().getFaultState() == FaultState::kTransientFault);
- advanceTime(manager().getConfig()->getActiveFaultDuration() + Milliseconds(1));
+ advanceTime(manager().getConfig().getActiveFaultDuration() + Milliseconds(1));
waitForTransitionIntoState(FaultState::kActiveFault);
}
TEST_F(FaultManagerTest, DoesNotTransitionToActiveFaultIfResolved) {
- const auto activeFaultDuration = manager().getConfig()->getActiveFaultDuration();
+ const auto activeFaultDuration = manager().getConfigTest().getActiveFaultDuration();
const auto start = clockSource().now();
// Initially unhealthy; Transitions to healthy before the active fault timeout.
@@ -241,9 +282,8 @@ TEST_F(FaultManagerTest, DoesNotTransitionToActiveFaultIfResolved) {
Milliseconds(durationCount<Milliseconds>(activeFaultDuration) / 4);
if (elapsed < quarterActiveFaultDuration) {
return 1.1;
- } else {
- return 0.0;
}
+ return 0.0;
});
waitForTransitionIntoState(FaultState::kTransientFault);
assertSoonWithHealthCheck([this]() { return manager().getFaultState() == FaultState::kOk; });
diff --git a/src/mongo/db/process_health/progress_monitor.cpp b/src/mongo/db/process_health/progress_monitor.cpp
new file mode 100644
index 00000000000..75fdd4f93c1
--- /dev/null
+++ b/src/mongo/db/process_health/progress_monitor.cpp
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kProcessHealth
+
+#include "mongo/db/process_health/progress_monitor.h"
+
+#include "mongo/db/process_health/fault_manager.h"
+#include "mongo/db/process_health/health_observer.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+namespace process_health {
+
+ProgressMonitor::ProgressMonitor(FaultManager* faultManager,
+ ServiceContext* svcCtx,
+ std::function<void(std::string cause)> crashCb)
+ : _faultManager(faultManager), _svcCtx(svcCtx), _crashCb(crashCb) {
+ // Start the monitor thread, this should happen after all observers are initialized.
+ _progressMonitorThread = stdx::thread([this] { _progressMonitorLoop(); });
+}
+
+void ProgressMonitor::join() {
+ _terminate.store(true);
+
+ // The _progressMonitorThread is watching the _taskExecutor join()
+ // completion and thus can be joined only after the _taskExecutor completes.
+ LOGV2(5936602, "Stopping the periodic health checks liveness monitor");
+ if (_progressMonitorThread.joinable()) {
+ _progressMonitorThread.join();
+ }
+}
+
+void ProgressMonitor::progressMonitorCheck(std::function<void(std::string cause)> crashCb) {
+ std::vector<HealthObserver*> observers = _faultManager->getHealthObservers();
+ const auto now = _svcCtx->getPreciseClockSource()->now();
+ std::vector<HealthObserver*> secondPass;
+
+ // Check the liveness of every health observer.
+ for (auto observer : observers) {
+ const auto stats = observer->getStats();
+ if (!stats.isEnabled) {
+ continue;
+ }
+
+ // Special case: if the health observer is enabled but did not run
+ // for a very long time, it could be a race. We should check it later.
+ if (!stats.currentlyRunningHealthCheck &&
+ now - stats.lastTimeCheckStarted >
+ _faultManager->getConfig().getPeriodicLivenessDeadline() * 2) {
+ secondPass.push_back(observer);
+ continue;
+ }
+
+ if (stats.currentlyRunningHealthCheck &&
+ now - stats.lastTimeCheckStarted >
+ _faultManager->getConfig().getPeriodicLivenessDeadline()) {
+ // Crash because this health checker is running for too long.
+ crashCb(str::stream() << "Health observer " << observer->getType()
+ << " is still running since "
+ << stats.lastTimeCheckStarted.toString());
+ }
+ }
+
+ if (secondPass.empty()) {
+ return;
+ }
+ // The observer is enabled but did not run for a while. Sleep two cycles
+ // and check again. Note: this should be rare.
+ sleepFor(_faultManager->getConfig().getPeriodicHealthCheckInterval() * 2);
+ for (auto observer : secondPass) {
+ const auto stats = observer->getStats();
+ if (stats.isEnabled && !stats.currentlyRunningHealthCheck &&
+ now - stats.lastTimeCheckStarted >
+ _faultManager->getConfig().getPeriodicLivenessDeadline() * 2) {
+ // Crash because this health checker was never started.
+ crashCb(str::stream() << "Health observer " << observer->getType()
+ << " did not run since "
+ << stats.lastTimeCheckStarted.toString());
+ continue;
+ }
+ }
+}
+
+void ProgressMonitor::_progressMonitorLoop() {
+ Client::initThread("Health checks progress monitor"_sd, _svcCtx, nullptr);
+
+ while (!_terminate.load()) {
+ progressMonitorCheck(_crashCb);
+
+ sleepFor(_faultManager->getConfig().getPeriodicLivenessCheckInterval());
+ }
+}
+
+} // namespace process_health
+} // namespace mongo
diff --git a/src/mongo/db/process_health/progress_monitor.h b/src/mongo/db/process_health/progress_monitor.h
new file mode 100644
index 00000000000..9b7e1f79b4e
--- /dev/null
+++ b/src/mongo/db/process_health/progress_monitor.h
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include <functional>
+#include <string>
+#include <thread>
+
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+namespace process_health {
+
+class FaultManager;
+
+/**
+ * Tracks that the health checks are invoked regularly and have progress.
+ */
+class ProgressMonitor {
+public:
+ ProgressMonitor(FaultManager* faultManager,
+ ServiceContext* svcCtx,
+ std::function<void(std::string cause)> crashCb);
+
+ // Signals that the monitoring can stop and blocks until the thread is joined.
+ // Invoked to signal that the task executor is joined.
+ void join();
+
+ // Checks that the health checks are invoked and are not stuck forever. Invokes the callback
+ // after timeout configured by options.
+ void progressMonitorCheck(std::function<void(std::string cause)> crashCb);
+
+private:
+ // Checks that the periodic health checks actually make progress.
+ void _progressMonitorLoop();
+
+ FaultManager* const _faultManager;
+ ServiceContext* const _svcCtx;
+ // Callback used to crash the server.
+ const std::function<void(std::string cause)> _crashCb;
+ // This flag is set after the _taskExecutor join() returns, thus
+ // we know no more health checks are still running.
+ AtomicWord<bool> _terminate{false};
+ stdx::thread _progressMonitorThread;
+};
+
+} // namespace process_health
+} // namespace mongo
diff --git a/src/mongo/util/exit_code.h b/src/mongo/util/exit_code.h
index 1b5473d6ce8..79024f32f86 100644
--- a/src/mongo/util/exit_code.h
+++ b/src/mongo/util/exit_code.h
@@ -56,8 +56,9 @@ enum ExitCode : int {
EXIT_WATCHDOG = 61, // Internal Watchdog has terminated mongod
EXIT_NEED_DOWNGRADE =
62, // The current binary version is not appropriate to run on the existing datafiles.
- EXIT_THREAD_SANITIZER = 66, // Default Exit code for Thread Sanitizer failures
- EXIT_UNCAUGHT = 100, // top level exception that wasn't caught
+ EXIT_THREAD_SANITIZER = 66, // Default Exit code for Thread Sanitizer failures
+ EXIT_PROCESS_HEALTH_CHECK = 67, // Process health check triggered the crash.
+ EXIT_UNCAUGHT = 100, // top level exception that wasn't caught
EXIT_TEST = 101
};