diff options
author | Andrew Shuvalov <andrew.shuvalov@mongodb.com> | 2022-03-11 15:23:57 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-14 14:38:57 +0000 |
commit | 8fe804f3b383594441788ac995b6e50c9c340928 (patch) | |
tree | 6ec4c5b00db6c1a06b4ccf41ad8c1c4691d26ed5 | |
parent | dec53019d57fc35bc50f0f32af307932f4ff8df6 (diff) | |
download | mongo-8fe804f3b383594441788ac995b6e50c9c340928.tar.gz |
SERVER-64182 avoid duplicate health checks; cleanups
(cherry picked from commit e146ca34cbec7301ba1be1cc3c10c56a53a107fc)
5 files changed, 144 insertions, 64 deletions
diff --git a/src/mongo/db/process_health/fault_manager.cpp b/src/mongo/db/process_health/fault_manager.cpp index ab1c653aa68..6518316b139 100644 --- a/src/mongo/db/process_health/fault_manager.cpp +++ b/src/mongo/db/process_health/fault_manager.cpp @@ -142,7 +142,8 @@ void FaultManager::healthMonitoringIntensitiesUpdated(HealthObserverIntensities if (auto* observer = manager->getHealthObserver(toFaultFacetType(setting.getType())); observer != nullptr) { - manager->healthCheck(observer, cancellationToken); + manager->scheduleNextHealthCheck( + observer, cancellationToken, true /* immediate */); } } else if (newIntensity == HealthObserverIntensityEnum::kOff) { // {critical, non-critical} -> off @@ -432,7 +433,7 @@ void FaultManager::schedulePeriodicHealthCheckThread() { setState(FaultState::kActiveFault, HealthCheckStatus(observer->getType())); return; } - healthCheck(observer, token); + scheduleNextHealthCheck(observer, token, true /* immediate */); } LOGV2(5936804, "Health observers started", "detail"_attr = listOfActiveObservers); } @@ -445,7 +446,7 @@ FaultManager::~FaultManager() { { stdx::lock_guard lock(_mutex); for (auto& pair : _healthCheckContexts) { - auto cbHandle = pair.second.resultStatus; + auto cbHandle = pair.second.callbackHandle; if (cbHandle) { _taskExecutor->cancel(cbHandle.get()); } @@ -519,52 +520,74 @@ FaultPtr FaultManager::getOrCreateFault() { return _fault; } -void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token) { - auto schedulerCb = [this, observer, token] { - auto scheduledTime = _taskExecutor->now() + - _config->getPeriodicHealthCheckInterval(observer->getType()) + +void FaultManager::scheduleNextHealthCheck(HealthObserver* observer, + CancellationToken token, + bool immediately) { + stdx::lock_guard lock(_mutex); + + // Check that context callbackHandle is not set and if future exists, it is ready. + auto existingIt = _healthCheckContexts.find(observer->getType()); + if (existingIt != _healthCheckContexts.end()) { + if (existingIt->second.callbackHandle) { + LOGV2_WARNING(6418201, + "Cannot schedule health check while another one is in queue", + "observerType"_attr = str::stream() << observer->getType()); + return; + } + if (existingIt->second.result && !existingIt->second.result->isReady()) { + LOGV2_WARNING(6418202, + "Cannot schedule health check while another one is currently executing", + "observerType"_attr = str::stream() << observer->getType()); + return; + } + } + _healthCheckContexts.insert_or_assign(observer->getType(), + HealthCheckContext(nullptr, boost::none)); + + auto scheduledTime = immediately + ? _taskExecutor->now() + : _taskExecutor->now() + _config->getPeriodicHealthCheckInterval(observer->getType()) + std::min(observer->healthCheckJitter(), FaultManagerConfig::kPeriodicHealthCheckMaxJitter); - LOGV2_DEBUG(5939701, - 3, - "Schedule next health check", - "observerType"_attr = str::stream() << observer->getType(), - "scheduledTime"_attr = scheduledTime); - - auto periodicThreadCbHandleStatus = _taskExecutor->scheduleWorkAt( - scheduledTime, - [this, observer, token](const mongo::executor::TaskExecutor::CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - LOGV2_DEBUG(5939702, - 1, - "Fault manager received an error", - "status"_attr = cbData.status); - if (ErrorCodes::isA<ErrorCategory::CancellationError>(cbData.status.code())) { - return; - } - // continue health checking otherwise + LOGV2_DEBUG(5939701, + 3, + "Schedule next health check", + "observerType"_attr = str::stream() << observer->getType(), + "scheduledTime"_attr = scheduledTime); + + auto periodicThreadCbHandleStatus = _taskExecutor->scheduleWorkAt( + scheduledTime, + [this, observer, token](const mongo::executor::TaskExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + LOGV2_DEBUG( + 5939702, 1, "Fault manager received an error", "status"_attr = cbData.status); + if (ErrorCodes::isA<ErrorCategory::CancellationError>(cbData.status.code())) { + return; } - healthCheck(observer, token); - }); - - if (!periodicThreadCbHandleStatus.isOK()) { - if (ErrorCodes::isA<ErrorCategory::ShutdownError>( - periodicThreadCbHandleStatus.getStatus().code())) { - return; + // continue health checking otherwise } + healthCheck(observer, token); + }); - uassert(5936101, - str::stream() << "Failed to schedule periodic health check for " - << observer->getType() << ": " - << periodicThreadCbHandleStatus.getStatus().codeString(), - periodicThreadCbHandleStatus.isOK()); + if (!periodicThreadCbHandleStatus.isOK()) { + if (ErrorCodes::isA<ErrorCategory::ShutdownError>( + periodicThreadCbHandleStatus.getStatus().code())) { + LOGV2_DEBUG(6418203, 1, "Not scheduling health check because of shutdown"); + return; } - stdx::lock_guard lock(_mutex); - _healthCheckContexts.at(observer->getType()).resultStatus = - std::move(periodicThreadCbHandleStatus.getValue()); - }; + uassert(5936101, + str::stream() << "Failed to schedule periodic health check for " + << observer->getType() << ": " + << periodicThreadCbHandleStatus.getStatus().codeString(), + periodicThreadCbHandleStatus.isOK()); + } + + _healthCheckContexts.at(observer->getType()).callbackHandle = + std::move(periodicThreadCbHandleStatus.getValue()); +} +void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token) { auto acceptNotOKStatus = [this, observer](Status s) { auto healthCheckStatus = HealthCheckStatus(observer->getType(), Severity::kFailure, s.reason()); @@ -574,22 +597,34 @@ void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token return healthCheckStatus; }; - { - stdx::lock_guard lock(_mutex); - _healthCheckContexts.insert( - {observer->getType(), HealthCheckContext(nullptr, boost::none)}); - } // Run asynchronous health check. Send output to the state machine. Schedule next run. - auto healthCheckFuture = - observer->periodicCheck(_taskExecutor, token) - .thenRunOn(_taskExecutor) - .onCompletion([this, acceptNotOKStatus, schedulerCb, observer]( - StatusWith<HealthCheckStatus> status) { - ON_BLOCK_EXIT([this, schedulerCb, observer]() { + auto healthCheckFuture = observer->periodicCheck(_taskExecutor, token); + + stdx::lock_guard lock(_mutex); + auto contextIt = _healthCheckContexts.find(observer->getType()); + if (contextIt == _healthCheckContexts.end()) { + LOGV2_ERROR(6418204, "Unexpected failure during health check: context not found"); + return; + } + contextIt->second.result = + std::make_unique<SharedSemiFuture<HealthCheckStatus>>(std::move(healthCheckFuture)); + + contextIt->second.result->thenRunOn(_taskExecutor) + .onCompletion( + [this, acceptNotOKStatus, observer, token](StatusWith<HealthCheckStatus> status) { + ON_BLOCK_EXIT([this, observer, token]() { + { + stdx::lock_guard lock(_mutex); + // Rescheduling requires the previous handle to be cleaned. + auto contextIt = _healthCheckContexts.find(observer->getType()); + if (contextIt != _healthCheckContexts.end()) { + contextIt->second.callbackHandle = {}; + } + } if (!_config->periodicChecksDisabledForTests() && _config->isHealthObserverEnabled(observer->getType())) { - schedulerCb(); + scheduleNextHealthCheck(observer, token, false /* immediate */); } }); @@ -599,11 +634,8 @@ void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token accept(status.getValue()); return status.getValue(); - }); - - stdx::lock_guard lock(_mutex); - _healthCheckContexts.at(observer->getType()).result = - std::make_unique<ExecutorFuture<HealthCheckStatus>>(std::move(healthCheckFuture)); + }) + .getAsync([](StatusOrStatusWith<mongo::process_health::HealthCheckStatus>) {}); } void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) { diff --git a/src/mongo/db/process_health/fault_manager.h b/src/mongo/db/process_health/fault_manager.h index 14234fc75d4..4e23c629a09 100644 --- a/src/mongo/db/process_health/fault_manager.h +++ b/src/mongo/db/process_health/fault_manager.h @@ -153,6 +153,10 @@ protected: void progressMonitorCheckForTests(std::function<void(std::string cause)> crashCb); + void scheduleNextHealthCheck(HealthObserver* observer, + CancellationToken token, + bool immediately); + private: // One time init. void _init(); @@ -202,12 +206,17 @@ private: std::unique_ptr<ProgressMonitor> _progressMonitor; stdx::unordered_set<FaultFacetType> _healthyObservations; + + // The stages of health check context modifications: + // 1. Schedule and set callbackHandle + // 2. When scheduled check starts, reset callbackHandle and set result future + // 3. When result is ready, repeat struct HealthCheckContext { - std::unique_ptr<ExecutorFuture<HealthCheckStatus>> result; - boost::optional<executor::TaskExecutor::CallbackHandle> resultStatus; - HealthCheckContext(std::unique_ptr<ExecutorFuture<HealthCheckStatus>> future, + std::unique_ptr<SharedSemiFuture<HealthCheckStatus>> result; + boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle; + HealthCheckContext(std::unique_ptr<SharedSemiFuture<HealthCheckStatus>> future, boost::optional<executor::TaskExecutor::CallbackHandle> cbHandle) - : result(std::move(future)), resultStatus(cbHandle){}; + : result(std::move(future)), callbackHandle(cbHandle){}; }; stdx::unordered_map<FaultFacetType, HealthCheckContext> _healthCheckContexts; diff --git a/src/mongo/db/process_health/fault_manager_test_suite.h b/src/mongo/db/process_health/fault_manager_test_suite.h index 584aad99e81..67e9f2d506c 100644 --- a/src/mongo/db/process_health/fault_manager_test_suite.h +++ b/src/mongo/db/process_health/fault_manager_test_suite.h @@ -114,6 +114,10 @@ public: FaultState acceptTest(const HealthCheckStatus& message) { return accept(message); } + + void scheduleNextImmediateCheckForTest(HealthObserver* observer) { + scheduleNextHealthCheck(observer, CancellationToken::uncancelable(), true); + } }; /** @@ -194,6 +198,12 @@ public: } template <typename Observer> + void scheduleNextImmediateCheck(FaultFacetType type) { + auto& obsrv = observer<Observer>(type); + manager().scheduleNextImmediateCheckForTest(&obsrv); + } + + template <typename Observer> void registerHealthObserver() { HealthObserverRegistration::registerObserverFactory( [](ServiceContext* svcCtx) { return std::make_unique<Observer>(svcCtx); }); diff --git a/src/mongo/db/process_health/health_monitoring_server_parameters.cpp b/src/mongo/db/process_health/health_monitoring_server_parameters.cpp index 010ecb3b64d..52dbf31a1c6 100644 --- a/src/mongo/db/process_health/health_monitoring_server_parameters.cpp +++ b/src/mongo/db/process_health/health_monitoring_server_parameters.cpp @@ -120,7 +120,7 @@ void HealthMonitoringProgressMonitorServerParameter::append(OperationContext*, Status PeriodicHealthCheckIntervalsServerParameter::setFromString(const std::string& value) { const auto oldValue = **_data; auto newValue = HealthObserverIntervals::parse( - IDLParserErrorContext("health monitoring liveness"), fromjson(value)); + IDLParserErrorContext("health monitoring interval"), fromjson(value)); newValue = mergeConfigValues(oldValue, newValue); **_data = newValue; return Status::OK(); @@ -129,7 +129,7 @@ Status PeriodicHealthCheckIntervalsServerParameter::setFromString(const std::str Status PeriodicHealthCheckIntervalsServerParameter::set(const BSONElement& newValueElement) { const auto oldValue = **_data; auto newValue = HealthObserverIntervals::parse( - IDLParserErrorContext("health monitoring liveness"), newValueElement.Obj()); + IDLParserErrorContext("health monitoring interval"), newValueElement.Obj()); newValue = mergeConfigValues(oldValue, newValue); **_data = newValue; return Status::OK(); diff --git a/src/mongo/db/process_health/health_observer_test.cpp b/src/mongo/db/process_health/health_observer_test.cpp index 3cfe1dbe608..f10d2683967 100644 --- a/src/mongo/db/process_health/health_observer_test.cpp +++ b/src/mongo/db/process_health/health_observer_test.cpp @@ -221,6 +221,35 @@ TEST_F(FaultManagerTest, resetManager(); // Before fields above go out of scope. } +TEST_F(FaultManagerTest, SchedulingDuplicateHealthChecksRejected) { + static constexpr int kLoops = 1000; + resetManager(std::make_unique<FaultManagerConfig>()); + registerMockHealthObserver(FaultFacetType::kMock1, [] { return Severity::kOk; }); + auto initialHealthCheckFuture = manager().startPeriodicHealthChecks(); + waitForTransitionIntoState(FaultState::kOk); + + auto observer = manager().getHealthObserversTest()[0]; + auto initialStats = observer->getStats(); + + for (int i = 0; i < kLoops; ++i) { + // A check will not be scheduled if another check is running. + // Default interval is 1 sec so only 1/500 of checks will be scheduled. + scheduleNextImmediateCheck<HealthObserverMock>(FaultFacetType::kMock1); + sleepFor(Milliseconds(2)); + } + + // Sleep time here is not introducing flakiness - it only shows that even after + // waiting the total count of completed tests is lower than the total we scheduled. + sleepFor(Milliseconds(100)); + auto finalStats = observer->getStats(); + + const auto totalCompletedCount = + finalStats.completedChecksCount - initialStats.completedChecksCount; + ASSERT_LT(totalCompletedCount, kLoops); + ASSERT_GT(totalCompletedCount, 0); + LOGV2(6418205, "Total completed checks count", "count"_attr = totalCompletedCount); +} + } // namespace } // namespace process_health } // namespace mongo |