From e146ca34cbec7301ba1be1cc3c10c56a53a107fc Mon Sep 17 00:00:00 2001 From: Andrew Shuvalov Date: Fri, 11 Mar 2022 15:23:57 +0000 Subject: SERVER-64182 avoid duplicate health checks; cleanups --- src/mongo/db/process_health/fault_manager.cpp | 148 +++++++++++++-------- src/mongo/db/process_health/fault_manager.h | 17 ++- .../db/process_health/fault_manager_test_suite.h | 10 ++ .../health_monitoring_server_parameters.cpp | 4 +- .../db/process_health/health_observer_test.cpp | 29 ++++ 5 files changed, 144 insertions(+), 64 deletions(-) diff --git a/src/mongo/db/process_health/fault_manager.cpp b/src/mongo/db/process_health/fault_manager.cpp index ab1c653aa68..6518316b139 100644 --- a/src/mongo/db/process_health/fault_manager.cpp +++ b/src/mongo/db/process_health/fault_manager.cpp @@ -142,7 +142,8 @@ void FaultManager::healthMonitoringIntensitiesUpdated(HealthObserverIntensities if (auto* observer = manager->getHealthObserver(toFaultFacetType(setting.getType())); observer != nullptr) { - manager->healthCheck(observer, cancellationToken); + manager->scheduleNextHealthCheck( + observer, cancellationToken, true /* immediate */); } } else if (newIntensity == HealthObserverIntensityEnum::kOff) { // {critical, non-critical} -> off @@ -432,7 +433,7 @@ void FaultManager::schedulePeriodicHealthCheckThread() { setState(FaultState::kActiveFault, HealthCheckStatus(observer->getType())); return; } - healthCheck(observer, token); + scheduleNextHealthCheck(observer, token, true /* immediate */); } LOGV2(5936804, "Health observers started", "detail"_attr = listOfActiveObservers); } @@ -445,7 +446,7 @@ FaultManager::~FaultManager() { { stdx::lock_guard lock(_mutex); for (auto& pair : _healthCheckContexts) { - auto cbHandle = pair.second.resultStatus; + auto cbHandle = pair.second.callbackHandle; if (cbHandle) { _taskExecutor->cancel(cbHandle.get()); } @@ -519,52 +520,74 @@ FaultPtr FaultManager::getOrCreateFault() { return _fault; } -void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token) { - auto schedulerCb = [this, observer, token] { - auto scheduledTime = _taskExecutor->now() + - _config->getPeriodicHealthCheckInterval(observer->getType()) + +void FaultManager::scheduleNextHealthCheck(HealthObserver* observer, + CancellationToken token, + bool immediately) { + stdx::lock_guard lock(_mutex); + + // Check that context callbackHandle is not set and if future exists, it is ready. + auto existingIt = _healthCheckContexts.find(observer->getType()); + if (existingIt != _healthCheckContexts.end()) { + if (existingIt->second.callbackHandle) { + LOGV2_WARNING(6418201, + "Cannot schedule health check while another one is in queue", + "observerType"_attr = str::stream() << observer->getType()); + return; + } + if (existingIt->second.result && !existingIt->second.result->isReady()) { + LOGV2_WARNING(6418202, + "Cannot schedule health check while another one is currently executing", + "observerType"_attr = str::stream() << observer->getType()); + return; + } + } + _healthCheckContexts.insert_or_assign(observer->getType(), + HealthCheckContext(nullptr, boost::none)); + + auto scheduledTime = immediately + ? _taskExecutor->now() + : _taskExecutor->now() + _config->getPeriodicHealthCheckInterval(observer->getType()) + std::min(observer->healthCheckJitter(), FaultManagerConfig::kPeriodicHealthCheckMaxJitter); - LOGV2_DEBUG(5939701, - 3, - "Schedule next health check", - "observerType"_attr = str::stream() << observer->getType(), - "scheduledTime"_attr = scheduledTime); - - auto periodicThreadCbHandleStatus = _taskExecutor->scheduleWorkAt( - scheduledTime, - [this, observer, token](const mongo::executor::TaskExecutor::CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - LOGV2_DEBUG(5939702, - 1, - "Fault manager received an error", - "status"_attr = cbData.status); - if (ErrorCodes::isA(cbData.status.code())) { - return; - } - // continue health checking otherwise + LOGV2_DEBUG(5939701, + 3, + "Schedule next health check", + "observerType"_attr = str::stream() << observer->getType(), + "scheduledTime"_attr = scheduledTime); + + auto periodicThreadCbHandleStatus = _taskExecutor->scheduleWorkAt( + scheduledTime, + [this, observer, token](const mongo::executor::TaskExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + LOGV2_DEBUG( + 5939702, 1, "Fault manager received an error", "status"_attr = cbData.status); + if (ErrorCodes::isA(cbData.status.code())) { + return; } - healthCheck(observer, token); - }); - - if (!periodicThreadCbHandleStatus.isOK()) { - if (ErrorCodes::isA( - periodicThreadCbHandleStatus.getStatus().code())) { - return; + // continue health checking otherwise } + healthCheck(observer, token); + }); - uassert(5936101, - str::stream() << "Failed to schedule periodic health check for " - << observer->getType() << ": " - << periodicThreadCbHandleStatus.getStatus().codeString(), - periodicThreadCbHandleStatus.isOK()); + if (!periodicThreadCbHandleStatus.isOK()) { + if (ErrorCodes::isA( + periodicThreadCbHandleStatus.getStatus().code())) { + LOGV2_DEBUG(6418203, 1, "Not scheduling health check because of shutdown"); + return; } - stdx::lock_guard lock(_mutex); - _healthCheckContexts.at(observer->getType()).resultStatus = - std::move(periodicThreadCbHandleStatus.getValue()); - }; + uassert(5936101, + str::stream() << "Failed to schedule periodic health check for " + << observer->getType() << ": " + << periodicThreadCbHandleStatus.getStatus().codeString(), + periodicThreadCbHandleStatus.isOK()); + } + + _healthCheckContexts.at(observer->getType()).callbackHandle = + std::move(periodicThreadCbHandleStatus.getValue()); +} +void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token) { auto acceptNotOKStatus = [this, observer](Status s) { auto healthCheckStatus = HealthCheckStatus(observer->getType(), Severity::kFailure, s.reason()); @@ -574,22 +597,34 @@ void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token return healthCheckStatus; }; - { - stdx::lock_guard lock(_mutex); - _healthCheckContexts.insert( - {observer->getType(), HealthCheckContext(nullptr, boost::none)}); - } // Run asynchronous health check. Send output to the state machine. Schedule next run. - auto healthCheckFuture = - observer->periodicCheck(_taskExecutor, token) - .thenRunOn(_taskExecutor) - .onCompletion([this, acceptNotOKStatus, schedulerCb, observer]( - StatusWith status) { - ON_BLOCK_EXIT([this, schedulerCb, observer]() { + auto healthCheckFuture = observer->periodicCheck(_taskExecutor, token); + + stdx::lock_guard lock(_mutex); + auto contextIt = _healthCheckContexts.find(observer->getType()); + if (contextIt == _healthCheckContexts.end()) { + LOGV2_ERROR(6418204, "Unexpected failure during health check: context not found"); + return; + } + contextIt->second.result = + std::make_unique>(std::move(healthCheckFuture)); + + contextIt->second.result->thenRunOn(_taskExecutor) + .onCompletion( + [this, acceptNotOKStatus, observer, token](StatusWith status) { + ON_BLOCK_EXIT([this, observer, token]() { + { + stdx::lock_guard lock(_mutex); + // Rescheduling requires the previous handle to be cleaned. + auto contextIt = _healthCheckContexts.find(observer->getType()); + if (contextIt != _healthCheckContexts.end()) { + contextIt->second.callbackHandle = {}; + } + } if (!_config->periodicChecksDisabledForTests() && _config->isHealthObserverEnabled(observer->getType())) { - schedulerCb(); + scheduleNextHealthCheck(observer, token, false /* immediate */); } }); @@ -599,11 +634,8 @@ void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token accept(status.getValue()); return status.getValue(); - }); - - stdx::lock_guard lock(_mutex); - _healthCheckContexts.at(observer->getType()).result = - std::make_unique>(std::move(healthCheckFuture)); + }) + .getAsync([](StatusOrStatusWith) {}); } void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) { diff --git a/src/mongo/db/process_health/fault_manager.h b/src/mongo/db/process_health/fault_manager.h index 14234fc75d4..4e23c629a09 100644 --- a/src/mongo/db/process_health/fault_manager.h +++ b/src/mongo/db/process_health/fault_manager.h @@ -153,6 +153,10 @@ protected: void progressMonitorCheckForTests(std::function crashCb); + void scheduleNextHealthCheck(HealthObserver* observer, + CancellationToken token, + bool immediately); + private: // One time init. void _init(); @@ -202,12 +206,17 @@ private: std::unique_ptr _progressMonitor; stdx::unordered_set _healthyObservations; + + // The stages of health check context modifications: + // 1. Schedule and set callbackHandle + // 2. When scheduled check starts, reset callbackHandle and set result future + // 3. When result is ready, repeat struct HealthCheckContext { - std::unique_ptr> result; - boost::optional resultStatus; - HealthCheckContext(std::unique_ptr> future, + std::unique_ptr> result; + boost::optional callbackHandle; + HealthCheckContext(std::unique_ptr> future, boost::optional cbHandle) - : result(std::move(future)), resultStatus(cbHandle){}; + : result(std::move(future)), callbackHandle(cbHandle){}; }; stdx::unordered_map _healthCheckContexts; diff --git a/src/mongo/db/process_health/fault_manager_test_suite.h b/src/mongo/db/process_health/fault_manager_test_suite.h index 584aad99e81..67e9f2d506c 100644 --- a/src/mongo/db/process_health/fault_manager_test_suite.h +++ b/src/mongo/db/process_health/fault_manager_test_suite.h @@ -114,6 +114,10 @@ public: FaultState acceptTest(const HealthCheckStatus& message) { return accept(message); } + + void scheduleNextImmediateCheckForTest(HealthObserver* observer) { + scheduleNextHealthCheck(observer, CancellationToken::uncancelable(), true); + } }; /** @@ -193,6 +197,12 @@ public: registerMockHealthObserver(mockType, getSeverityCallback, Milliseconds(Seconds(30))); } + template + void scheduleNextImmediateCheck(FaultFacetType type) { + auto& obsrv = observer(type); + manager().scheduleNextImmediateCheckForTest(&obsrv); + } + template void registerHealthObserver() { HealthObserverRegistration::registerObserverFactory( diff --git a/src/mongo/db/process_health/health_monitoring_server_parameters.cpp b/src/mongo/db/process_health/health_monitoring_server_parameters.cpp index 010ecb3b64d..52dbf31a1c6 100644 --- a/src/mongo/db/process_health/health_monitoring_server_parameters.cpp +++ b/src/mongo/db/process_health/health_monitoring_server_parameters.cpp @@ -120,7 +120,7 @@ void HealthMonitoringProgressMonitorServerParameter::append(OperationContext*, Status PeriodicHealthCheckIntervalsServerParameter::setFromString(const std::string& value) { const auto oldValue = **_data; auto newValue = HealthObserverIntervals::parse( - IDLParserErrorContext("health monitoring liveness"), fromjson(value)); + IDLParserErrorContext("health monitoring interval"), fromjson(value)); newValue = mergeConfigValues(oldValue, newValue); **_data = newValue; return Status::OK(); @@ -129,7 +129,7 @@ Status PeriodicHealthCheckIntervalsServerParameter::setFromString(const std::str Status PeriodicHealthCheckIntervalsServerParameter::set(const BSONElement& newValueElement) { const auto oldValue = **_data; auto newValue = HealthObserverIntervals::parse( - IDLParserErrorContext("health monitoring liveness"), newValueElement.Obj()); + IDLParserErrorContext("health monitoring interval"), newValueElement.Obj()); newValue = mergeConfigValues(oldValue, newValue); **_data = newValue; return Status::OK(); diff --git a/src/mongo/db/process_health/health_observer_test.cpp b/src/mongo/db/process_health/health_observer_test.cpp index 3cfe1dbe608..f10d2683967 100644 --- a/src/mongo/db/process_health/health_observer_test.cpp +++ b/src/mongo/db/process_health/health_observer_test.cpp @@ -221,6 +221,35 @@ TEST_F(FaultManagerTest, resetManager(); // Before fields above go out of scope. } +TEST_F(FaultManagerTest, SchedulingDuplicateHealthChecksRejected) { + static constexpr int kLoops = 1000; + resetManager(std::make_unique()); + registerMockHealthObserver(FaultFacetType::kMock1, [] { return Severity::kOk; }); + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + waitForTransitionIntoState(FaultState::kOk); + + auto observer = manager().getHealthObserversTest()[0]; + auto initialStats = observer->getStats(); + + for (int i = 0; i < kLoops; ++i) { + // A check will not be scheduled if another check is running. + // Default interval is 1 sec so only 1/500 of checks will be scheduled. + scheduleNextImmediateCheck(FaultFacetType::kMock1); + sleepFor(Milliseconds(2)); + } + + // Sleep time here is not introducing flakiness - it only shows that even after + // waiting the total count of completed tests is lower than the total we scheduled. + sleepFor(Milliseconds(100)); + auto finalStats = observer->getStats(); + + const auto totalCompletedCount = + finalStats.completedChecksCount - initialStats.completedChecksCount; + ASSERT_LT(totalCompletedCount, kLoops); + ASSERT_GT(totalCompletedCount, 0); + LOGV2(6418205, "Total completed checks count", "count"_attr = totalCompletedCount); +} + } // namespace } // namespace process_health } // namespace mongo -- cgit v1.2.1