summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Shuvalov <andrew.shuvalov@mongodb.com>2022-03-11 15:23:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-11 15:54:23 +0000
commite146ca34cbec7301ba1be1cc3c10c56a53a107fc (patch)
treeb9240e07d0db7a42b5ce31f3216a34bd2327da32
parentea4409bb5186b84bff6651630aeb579f6d7f8588 (diff)
downloadmongo-e146ca34cbec7301ba1be1cc3c10c56a53a107fc.tar.gz
SERVER-64182 avoid duplicate health checks; cleanups
-rw-r--r--src/mongo/db/process_health/fault_manager.cpp148
-rw-r--r--src/mongo/db/process_health/fault_manager.h17
-rw-r--r--src/mongo/db/process_health/fault_manager_test_suite.h10
-rw-r--r--src/mongo/db/process_health/health_monitoring_server_parameters.cpp4
-rw-r--r--src/mongo/db/process_health/health_observer_test.cpp29
5 files changed, 144 insertions, 64 deletions
diff --git a/src/mongo/db/process_health/fault_manager.cpp b/src/mongo/db/process_health/fault_manager.cpp
index ab1c653aa68..6518316b139 100644
--- a/src/mongo/db/process_health/fault_manager.cpp
+++ b/src/mongo/db/process_health/fault_manager.cpp
@@ -142,7 +142,8 @@ void FaultManager::healthMonitoringIntensitiesUpdated(HealthObserverIntensities
if (auto* observer =
manager->getHealthObserver(toFaultFacetType(setting.getType()));
observer != nullptr) {
- manager->healthCheck(observer, cancellationToken);
+ manager->scheduleNextHealthCheck(
+ observer, cancellationToken, true /* immediate */);
}
} else if (newIntensity == HealthObserverIntensityEnum::kOff) {
// {critical, non-critical} -> off
@@ -432,7 +433,7 @@ void FaultManager::schedulePeriodicHealthCheckThread() {
setState(FaultState::kActiveFault, HealthCheckStatus(observer->getType()));
return;
}
- healthCheck(observer, token);
+ scheduleNextHealthCheck(observer, token, true /* immediate */);
}
LOGV2(5936804, "Health observers started", "detail"_attr = listOfActiveObservers);
}
@@ -445,7 +446,7 @@ FaultManager::~FaultManager() {
{
stdx::lock_guard lock(_mutex);
for (auto& pair : _healthCheckContexts) {
- auto cbHandle = pair.second.resultStatus;
+ auto cbHandle = pair.second.callbackHandle;
if (cbHandle) {
_taskExecutor->cancel(cbHandle.get());
}
@@ -519,52 +520,74 @@ FaultPtr FaultManager::getOrCreateFault() {
return _fault;
}
-void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token) {
- auto schedulerCb = [this, observer, token] {
- auto scheduledTime = _taskExecutor->now() +
- _config->getPeriodicHealthCheckInterval(observer->getType()) +
+void FaultManager::scheduleNextHealthCheck(HealthObserver* observer,
+ CancellationToken token,
+ bool immediately) {
+ stdx::lock_guard lock(_mutex);
+
+ // Check that context callbackHandle is not set and if future exists, it is ready.
+ auto existingIt = _healthCheckContexts.find(observer->getType());
+ if (existingIt != _healthCheckContexts.end()) {
+ if (existingIt->second.callbackHandle) {
+ LOGV2_WARNING(6418201,
+ "Cannot schedule health check while another one is in queue",
+ "observerType"_attr = str::stream() << observer->getType());
+ return;
+ }
+ if (existingIt->second.result && !existingIt->second.result->isReady()) {
+ LOGV2_WARNING(6418202,
+ "Cannot schedule health check while another one is currently executing",
+ "observerType"_attr = str::stream() << observer->getType());
+ return;
+ }
+ }
+ _healthCheckContexts.insert_or_assign(observer->getType(),
+ HealthCheckContext(nullptr, boost::none));
+
+ auto scheduledTime = immediately
+ ? _taskExecutor->now()
+ : _taskExecutor->now() + _config->getPeriodicHealthCheckInterval(observer->getType()) +
std::min(observer->healthCheckJitter(),
FaultManagerConfig::kPeriodicHealthCheckMaxJitter);
- LOGV2_DEBUG(5939701,
- 3,
- "Schedule next health check",
- "observerType"_attr = str::stream() << observer->getType(),
- "scheduledTime"_attr = scheduledTime);
-
- auto periodicThreadCbHandleStatus = _taskExecutor->scheduleWorkAt(
- scheduledTime,
- [this, observer, token](const mongo::executor::TaskExecutor::CallbackArgs& cbData) {
- if (!cbData.status.isOK()) {
- LOGV2_DEBUG(5939702,
- 1,
- "Fault manager received an error",
- "status"_attr = cbData.status);
- if (ErrorCodes::isA<ErrorCategory::CancellationError>(cbData.status.code())) {
- return;
- }
- // continue health checking otherwise
+ LOGV2_DEBUG(5939701,
+ 3,
+ "Schedule next health check",
+ "observerType"_attr = str::stream() << observer->getType(),
+ "scheduledTime"_attr = scheduledTime);
+
+ auto periodicThreadCbHandleStatus = _taskExecutor->scheduleWorkAt(
+ scheduledTime,
+ [this, observer, token](const mongo::executor::TaskExecutor::CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ LOGV2_DEBUG(
+ 5939702, 1, "Fault manager received an error", "status"_attr = cbData.status);
+ if (ErrorCodes::isA<ErrorCategory::CancellationError>(cbData.status.code())) {
+ return;
}
- healthCheck(observer, token);
- });
-
- if (!periodicThreadCbHandleStatus.isOK()) {
- if (ErrorCodes::isA<ErrorCategory::ShutdownError>(
- periodicThreadCbHandleStatus.getStatus().code())) {
- return;
+ // continue health checking otherwise
}
+ healthCheck(observer, token);
+ });
- uassert(5936101,
- str::stream() << "Failed to schedule periodic health check for "
- << observer->getType() << ": "
- << periodicThreadCbHandleStatus.getStatus().codeString(),
- periodicThreadCbHandleStatus.isOK());
+ if (!periodicThreadCbHandleStatus.isOK()) {
+ if (ErrorCodes::isA<ErrorCategory::ShutdownError>(
+ periodicThreadCbHandleStatus.getStatus().code())) {
+ LOGV2_DEBUG(6418203, 1, "Not scheduling health check because of shutdown");
+ return;
}
- stdx::lock_guard lock(_mutex);
- _healthCheckContexts.at(observer->getType()).resultStatus =
- std::move(periodicThreadCbHandleStatus.getValue());
- };
+ uassert(5936101,
+ str::stream() << "Failed to schedule periodic health check for "
+ << observer->getType() << ": "
+ << periodicThreadCbHandleStatus.getStatus().codeString(),
+ periodicThreadCbHandleStatus.isOK());
+ }
+
+ _healthCheckContexts.at(observer->getType()).callbackHandle =
+ std::move(periodicThreadCbHandleStatus.getValue());
+}
+void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token) {
auto acceptNotOKStatus = [this, observer](Status s) {
auto healthCheckStatus =
HealthCheckStatus(observer->getType(), Severity::kFailure, s.reason());
@@ -574,22 +597,34 @@ void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token
return healthCheckStatus;
};
- {
- stdx::lock_guard lock(_mutex);
- _healthCheckContexts.insert(
- {observer->getType(), HealthCheckContext(nullptr, boost::none)});
- }
// Run asynchronous health check. Send output to the state machine. Schedule next run.
- auto healthCheckFuture =
- observer->periodicCheck(_taskExecutor, token)
- .thenRunOn(_taskExecutor)
- .onCompletion([this, acceptNotOKStatus, schedulerCb, observer](
- StatusWith<HealthCheckStatus> status) {
- ON_BLOCK_EXIT([this, schedulerCb, observer]() {
+ auto healthCheckFuture = observer->periodicCheck(_taskExecutor, token);
+
+ stdx::lock_guard lock(_mutex);
+ auto contextIt = _healthCheckContexts.find(observer->getType());
+ if (contextIt == _healthCheckContexts.end()) {
+ LOGV2_ERROR(6418204, "Unexpected failure during health check: context not found");
+ return;
+ }
+ contextIt->second.result =
+ std::make_unique<SharedSemiFuture<HealthCheckStatus>>(std::move(healthCheckFuture));
+
+ contextIt->second.result->thenRunOn(_taskExecutor)
+ .onCompletion(
+ [this, acceptNotOKStatus, observer, token](StatusWith<HealthCheckStatus> status) {
+ ON_BLOCK_EXIT([this, observer, token]() {
+ {
+ stdx::lock_guard lock(_mutex);
+ // Rescheduling requires the previous handle to be cleaned.
+ auto contextIt = _healthCheckContexts.find(observer->getType());
+ if (contextIt != _healthCheckContexts.end()) {
+ contextIt->second.callbackHandle = {};
+ }
+ }
if (!_config->periodicChecksDisabledForTests() &&
_config->isHealthObserverEnabled(observer->getType())) {
- schedulerCb();
+ scheduleNextHealthCheck(observer, token, false /* immediate */);
}
});
@@ -599,11 +634,8 @@ void FaultManager::healthCheck(HealthObserver* observer, CancellationToken token
accept(status.getValue());
return status.getValue();
- });
-
- stdx::lock_guard lock(_mutex);
- _healthCheckContexts.at(observer->getType()).result =
- std::make_unique<ExecutorFuture<HealthCheckStatus>>(std::move(healthCheckFuture));
+ })
+ .getAsync([](StatusOrStatusWith<mongo::process_health::HealthCheckStatus>) {});
}
void FaultManager::updateWithCheckStatus(HealthCheckStatus&& checkStatus) {
diff --git a/src/mongo/db/process_health/fault_manager.h b/src/mongo/db/process_health/fault_manager.h
index 14234fc75d4..4e23c629a09 100644
--- a/src/mongo/db/process_health/fault_manager.h
+++ b/src/mongo/db/process_health/fault_manager.h
@@ -153,6 +153,10 @@ protected:
void progressMonitorCheckForTests(std::function<void(std::string cause)> crashCb);
+ void scheduleNextHealthCheck(HealthObserver* observer,
+ CancellationToken token,
+ bool immediately);
+
private:
// One time init.
void _init();
@@ -202,12 +206,17 @@ private:
std::unique_ptr<ProgressMonitor> _progressMonitor;
stdx::unordered_set<FaultFacetType> _healthyObservations;
+
+ // The stages of health check context modifications:
+ // 1. Schedule and set callbackHandle
+ // 2. When scheduled check starts, reset callbackHandle and set result future
+ // 3. When result is ready, repeat
struct HealthCheckContext {
- std::unique_ptr<ExecutorFuture<HealthCheckStatus>> result;
- boost::optional<executor::TaskExecutor::CallbackHandle> resultStatus;
- HealthCheckContext(std::unique_ptr<ExecutorFuture<HealthCheckStatus>> future,
+ std::unique_ptr<SharedSemiFuture<HealthCheckStatus>> result;
+ boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle;
+ HealthCheckContext(std::unique_ptr<SharedSemiFuture<HealthCheckStatus>> future,
boost::optional<executor::TaskExecutor::CallbackHandle> cbHandle)
- : result(std::move(future)), resultStatus(cbHandle){};
+ : result(std::move(future)), callbackHandle(cbHandle){};
};
stdx::unordered_map<FaultFacetType, HealthCheckContext> _healthCheckContexts;
diff --git a/src/mongo/db/process_health/fault_manager_test_suite.h b/src/mongo/db/process_health/fault_manager_test_suite.h
index 584aad99e81..67e9f2d506c 100644
--- a/src/mongo/db/process_health/fault_manager_test_suite.h
+++ b/src/mongo/db/process_health/fault_manager_test_suite.h
@@ -114,6 +114,10 @@ public:
FaultState acceptTest(const HealthCheckStatus& message) {
return accept(message);
}
+
+ void scheduleNextImmediateCheckForTest(HealthObserver* observer) {
+ scheduleNextHealthCheck(observer, CancellationToken::uncancelable(), true);
+ }
};
/**
@@ -194,6 +198,12 @@ public:
}
template <typename Observer>
+ void scheduleNextImmediateCheck(FaultFacetType type) {
+ auto& obsrv = observer<Observer>(type);
+ manager().scheduleNextImmediateCheckForTest(&obsrv);
+ }
+
+ template <typename Observer>
void registerHealthObserver() {
HealthObserverRegistration::registerObserverFactory(
[](ServiceContext* svcCtx) { return std::make_unique<Observer>(svcCtx); });
diff --git a/src/mongo/db/process_health/health_monitoring_server_parameters.cpp b/src/mongo/db/process_health/health_monitoring_server_parameters.cpp
index 010ecb3b64d..52dbf31a1c6 100644
--- a/src/mongo/db/process_health/health_monitoring_server_parameters.cpp
+++ b/src/mongo/db/process_health/health_monitoring_server_parameters.cpp
@@ -120,7 +120,7 @@ void HealthMonitoringProgressMonitorServerParameter::append(OperationContext*,
Status PeriodicHealthCheckIntervalsServerParameter::setFromString(const std::string& value) {
const auto oldValue = **_data;
auto newValue = HealthObserverIntervals::parse(
- IDLParserErrorContext("health monitoring liveness"), fromjson(value));
+ IDLParserErrorContext("health monitoring interval"), fromjson(value));
newValue = mergeConfigValues(oldValue, newValue);
**_data = newValue;
return Status::OK();
@@ -129,7 +129,7 @@ Status PeriodicHealthCheckIntervalsServerParameter::setFromString(const std::str
Status PeriodicHealthCheckIntervalsServerParameter::set(const BSONElement& newValueElement) {
const auto oldValue = **_data;
auto newValue = HealthObserverIntervals::parse(
- IDLParserErrorContext("health monitoring liveness"), newValueElement.Obj());
+ IDLParserErrorContext("health monitoring interval"), newValueElement.Obj());
newValue = mergeConfigValues(oldValue, newValue);
**_data = newValue;
return Status::OK();
diff --git a/src/mongo/db/process_health/health_observer_test.cpp b/src/mongo/db/process_health/health_observer_test.cpp
index 3cfe1dbe608..f10d2683967 100644
--- a/src/mongo/db/process_health/health_observer_test.cpp
+++ b/src/mongo/db/process_health/health_observer_test.cpp
@@ -221,6 +221,35 @@ TEST_F(FaultManagerTest,
resetManager(); // Before fields above go out of scope.
}
+TEST_F(FaultManagerTest, SchedulingDuplicateHealthChecksRejected) {
+ static constexpr int kLoops = 1000;
+ resetManager(std::make_unique<FaultManagerConfig>());
+ registerMockHealthObserver(FaultFacetType::kMock1, [] { return Severity::kOk; });
+ auto initialHealthCheckFuture = manager().startPeriodicHealthChecks();
+ waitForTransitionIntoState(FaultState::kOk);
+
+ auto observer = manager().getHealthObserversTest()[0];
+ auto initialStats = observer->getStats();
+
+ for (int i = 0; i < kLoops; ++i) {
+ // A check will not be scheduled if another check is running.
+ // Default interval is 1 sec so only 1/500 of checks will be scheduled.
+ scheduleNextImmediateCheck<HealthObserverMock>(FaultFacetType::kMock1);
+ sleepFor(Milliseconds(2));
+ }
+
+ // Sleep time here is not introducing flakiness - it only shows that even after
+ // waiting the total count of completed tests is lower than the total we scheduled.
+ sleepFor(Milliseconds(100));
+ auto finalStats = observer->getStats();
+
+ const auto totalCompletedCount =
+ finalStats.completedChecksCount - initialStats.completedChecksCount;
+ ASSERT_LT(totalCompletedCount, kLoops);
+ ASSERT_GT(totalCompletedCount, 0);
+ LOGV2(6418205, "Total completed checks count", "count"_attr = totalCompletedCount);
+}
+
} // namespace
} // namespace process_health
} // namespace mongo