diff options
Diffstat (limited to 'src/components/connection_handler/src/heartbeat_monitor.cc')
-rw-r--r-- | src/components/connection_handler/src/heartbeat_monitor.cc | 122 |
1 files changed, 70 insertions, 52 deletions
diff --git a/src/components/connection_handler/src/heartbeat_monitor.cc b/src/components/connection_handler/src/heartbeat_monitor.cc index 6a5a9e723e..4dbafdd361 100644 --- a/src/components/connection_handler/src/heartbeat_monitor.cc +++ b/src/components/connection_handler/src/heartbeat_monitor.cc @@ -32,6 +32,7 @@ #include "connection_handler/heartbeat_monitor.h" #include <unistd.h> +#include <utility> #include "utils/logger.h" #include "connection_handler/connection.h" @@ -44,58 +45,41 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "HeartBeatMonitor") HeartBeatMonitor::HeartBeatMonitor(int32_t heartbeat_timeout_seconds, Connection *connection) - : heartbeat_timeout_seconds_(heartbeat_timeout_seconds), + : default_heartbeat_timeout_(heartbeat_timeout_seconds), connection_(connection), sessions_list_lock_(true), run_(true) { } -bool HeartBeatMonitor::HasTimeoutElapsed(const TimevalStruct& expiration) const { - TimevalStruct now = date_time::DateTime::getCurrentTime(); - return date_time::DateTime::Greater(now, expiration); -} - void HeartBeatMonitor::Process() { AutoLock auto_lock(sessions_list_lock_); SessionMap::iterator it = sessions_.begin(); while (it != sessions_.end()) { SessionState &state = it->second; - if (HasTimeoutElapsed(state.heartbeat_expiration)) { + if (state.HasTimeoutElapsed()) { const uint8_t session_id = it->first; - if (state.is_heartbeat_sent) { - LOG4CXX_DEBUG(logger_, - "Session with id " << static_cast<int32_t>(session_id) << " timed out, closing"); + if (state.IsReadyToClose()) { + LOG4CXX_DEBUG(logger_, "Will close session"); connection_->CloseSession(session_id); it = sessions_.begin(); continue; } else { LOG4CXX_DEBUG(logger_, "Send heart beat into session with id " << static_cast<int32_t>(session_id)); - RefreshExpiration(&state.heartbeat_expiration); + state.PrepareToClose(); connection_->SendHeartBeat(it->first); - state.is_heartbeat_sent = true; } } - ++it; } } -void HeartBeatMonitor::RefreshExpiration(TimevalStruct* expiration) const { - LOG4CXX_TRACE_ENTER(logger_); - sync_primitives::AutoLock locker(heartbeat_timeout_seconds_lock_); - DCHECK(expiration); - *expiration = date_time::DateTime::getCurrentTime(); - expiration->tv_sec += heartbeat_timeout_seconds_; - LOG4CXX_TRACE_EXIT(logger_); -} - void HeartBeatMonitor::threadMain() { AutoLock main_lock(main_thread_lock_); LOG4CXX_DEBUG( logger_, - "Start heart beat monitor. Timeout is " << heartbeat_timeout_seconds_); + "Start heart beat monitor. Timeout is " << default_heartbeat_timeout_); while (run_) { usleep(kDefaultCycleTimeout); Process(); @@ -111,23 +95,21 @@ void HeartBeatMonitor::AddSession(uint8_t session_id) { "Session with id " << static_cast<int32_t>(session_id) << " already exists"); return; } - SessionState session_state; - RefreshExpiration(&session_state.heartbeat_expiration); - session_state.is_heartbeat_sent = false; - sessions_[session_id] = session_state; - - LOG4CXX_INFO( - logger_, - "Start heartbeat for session " << static_cast<int32_t>(session_id)); + sessions_.insert(std::make_pair(session_id, + SessionState(default_heartbeat_timeout_))); + LOG4CXX_INFO(logger_, "Start heartbeat for session " << session_id); } void HeartBeatMonitor::RemoveSession(uint8_t session_id) { AutoLock auto_lock(sessions_list_lock_); - if (sessions_.end() != sessions_.find(session_id)) { + LOG4CXX_INFO(logger_, + "Remove session with id " << session_id); + + if (sessions_.erase(session_id) == 0) { LOG4CXX_INFO(logger_, - "Remove session with id " << static_cast<int32_t>(session_id)); - sessions_.erase(session_id); + "Remove session with id " << session_id << + " was unsuccessful"); } } @@ -135,35 +117,71 @@ void HeartBeatMonitor::KeepAlive(uint8_t session_id) { AutoLock auto_lock(sessions_list_lock_); if (sessions_.end() != sessions_.find(session_id)) { - LOG4CXX_INFO( - logger_, - "Resetting heart beat timer for session with id " << static_cast<int32_t>(session_id)); + LOG4CXX_INFO( logger_, "Resetting heart beat timer for session with id " << + static_cast<int32_t>(session_id)); - RefreshExpiration(&sessions_[session_id].heartbeat_expiration); - sessions_[session_id].is_heartbeat_sent = false; + sessions_[session_id].KeepAlive(); } } -bool HeartBeatMonitor::exitThreadMain() { - LOG4CXX_TRACE_ENTER(logger_); +void HeartBeatMonitor::exitThreadMain() { + // FIXME (dchmerev@luxoft.com): thread requested to stop should stop as soon as possible, + // not running one more iteration before actual stop + LOG4CXX_AUTO_TRACE(logger_); run_ = false; AutoLock main_lock(main_thread_lock_); - LOG4CXX_TRACE_EXIT(logger_); - return true; } -void HeartBeatMonitor::set_heartbeat_timeout_seconds(int32_t timeout) { - LOG4CXX_DEBUG(logger_, "Set new heart beat timeout " << timeout); - { - AutoLock locker(heartbeat_timeout_seconds_lock_); - heartbeat_timeout_seconds_ = timeout; - } +void HeartBeatMonitor::set_heartbeat_timeout_seconds(int32_t timeout, + uint8_t session_id) { + LOG4CXX_DEBUG(logger_, "Set new heart beat timeout " << timeout << + "For session: " << session_id); AutoLock session_locker(sessions_list_lock_); - for (SessionMap::iterator i = sessions_.begin(); i != sessions_.end(); ++i) { - SessionState& session_state = i->second; - RefreshExpiration(&session_state.heartbeat_expiration); + if (sessions_.end() != sessions_.find(session_id)) { + sessions_[session_id].UpdateTimeout(timeout); } } +HeartBeatMonitor::SessionState::SessionState(int32_t heartbeat_timeout_seconds) + : heartbeat_timeout_seconds_(heartbeat_timeout_seconds), + is_heartbeat_sent(false) { + LOG4CXX_DEBUG(logger_, "SessionState ctor."); + RefreshExpiration(); +} + +void HeartBeatMonitor::SessionState::RefreshExpiration () { + LOG4CXX_DEBUG(logger_, "Refresh expiration: " << heartbeat_timeout_seconds_); + heartbeat_expiration = date_time::DateTime::getCurrentTime(); + heartbeat_expiration.tv_sec += heartbeat_timeout_seconds_; +} + +void HeartBeatMonitor::SessionState::UpdateTimeout( + int32_t heartbeat_timeout_seconds) { + heartbeat_timeout_seconds_ = heartbeat_timeout_seconds; + LOG4CXX_DEBUG(logger_, "Update timout"); + RefreshExpiration(); +} + +void HeartBeatMonitor::SessionState::PrepareToClose() { + is_heartbeat_sent = true; + LOG4CXX_DEBUG(logger_, "Prepare to close"); + RefreshExpiration(); +} + +bool HeartBeatMonitor::SessionState::IsReadyToClose() const { + return is_heartbeat_sent; +} + +void HeartBeatMonitor::SessionState::KeepAlive() { + is_heartbeat_sent = false; + LOG4CXX_DEBUG(logger_, "keep alive"); + RefreshExpiration(); +} + +bool HeartBeatMonitor::SessionState::HasTimeoutElapsed() { + TimevalStruct now = date_time::DateTime::getCurrentTime(); + return date_time::DateTime::Greater(now, heartbeat_expiration); +} + } // namespace connection_handler |