summaryrefslogtreecommitdiff
path: root/src/components/connection_handler/src/heartbeat_monitor.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/connection_handler/src/heartbeat_monitor.cc')
-rw-r--r--src/components/connection_handler/src/heartbeat_monitor.cc122
1 files changed, 70 insertions, 52 deletions
diff --git a/src/components/connection_handler/src/heartbeat_monitor.cc b/src/components/connection_handler/src/heartbeat_monitor.cc
index 6a5a9e723..4dbafdd36 100644
--- a/src/components/connection_handler/src/heartbeat_monitor.cc
+++ b/src/components/connection_handler/src/heartbeat_monitor.cc
@@ -32,6 +32,7 @@
#include "connection_handler/heartbeat_monitor.h"
#include <unistd.h>
+#include <utility>
#include "utils/logger.h"
#include "connection_handler/connection.h"
@@ -44,58 +45,41 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "HeartBeatMonitor")
HeartBeatMonitor::HeartBeatMonitor(int32_t heartbeat_timeout_seconds,
Connection *connection)
- : heartbeat_timeout_seconds_(heartbeat_timeout_seconds),
+ : default_heartbeat_timeout_(heartbeat_timeout_seconds),
connection_(connection),
sessions_list_lock_(true),
run_(true) {
}
-bool HeartBeatMonitor::HasTimeoutElapsed(const TimevalStruct& expiration) const {
- TimevalStruct now = date_time::DateTime::getCurrentTime();
- return date_time::DateTime::Greater(now, expiration);
-}
-
void HeartBeatMonitor::Process() {
AutoLock auto_lock(sessions_list_lock_);
SessionMap::iterator it = sessions_.begin();
while (it != sessions_.end()) {
SessionState &state = it->second;
- if (HasTimeoutElapsed(state.heartbeat_expiration)) {
+ if (state.HasTimeoutElapsed()) {
const uint8_t session_id = it->first;
- if (state.is_heartbeat_sent) {
- LOG4CXX_DEBUG(logger_,
- "Session with id " << static_cast<int32_t>(session_id) << " timed out, closing");
+ if (state.IsReadyToClose()) {
+ LOG4CXX_DEBUG(logger_, "Will close session");
connection_->CloseSession(session_id);
it = sessions_.begin();
continue;
} else {
LOG4CXX_DEBUG(logger_,
"Send heart beat into session with id " << static_cast<int32_t>(session_id));
- RefreshExpiration(&state.heartbeat_expiration);
+ state.PrepareToClose();
connection_->SendHeartBeat(it->first);
- state.is_heartbeat_sent = true;
}
}
-
++it;
}
}
-void HeartBeatMonitor::RefreshExpiration(TimevalStruct* expiration) const {
- LOG4CXX_TRACE_ENTER(logger_);
- sync_primitives::AutoLock locker(heartbeat_timeout_seconds_lock_);
- DCHECK(expiration);
- *expiration = date_time::DateTime::getCurrentTime();
- expiration->tv_sec += heartbeat_timeout_seconds_;
- LOG4CXX_TRACE_EXIT(logger_);
-}
-
void HeartBeatMonitor::threadMain() {
AutoLock main_lock(main_thread_lock_);
LOG4CXX_DEBUG(
logger_,
- "Start heart beat monitor. Timeout is " << heartbeat_timeout_seconds_);
+ "Start heart beat monitor. Timeout is " << default_heartbeat_timeout_);
while (run_) {
usleep(kDefaultCycleTimeout);
Process();
@@ -111,23 +95,21 @@ void HeartBeatMonitor::AddSession(uint8_t session_id) {
"Session with id " << static_cast<int32_t>(session_id) << " already exists");
return;
}
- SessionState session_state;
- RefreshExpiration(&session_state.heartbeat_expiration);
- session_state.is_heartbeat_sent = false;
- sessions_[session_id] = session_state;
-
- LOG4CXX_INFO(
- logger_,
- "Start heartbeat for session " << static_cast<int32_t>(session_id));
+ sessions_.insert(std::make_pair(session_id,
+ SessionState(default_heartbeat_timeout_)));
+ LOG4CXX_INFO(logger_, "Start heartbeat for session " << session_id);
}
void HeartBeatMonitor::RemoveSession(uint8_t session_id) {
AutoLock auto_lock(sessions_list_lock_);
- if (sessions_.end() != sessions_.find(session_id)) {
+ LOG4CXX_INFO(logger_,
+ "Remove session with id " << session_id);
+
+ if (sessions_.erase(session_id) == 0) {
LOG4CXX_INFO(logger_,
- "Remove session with id " << static_cast<int32_t>(session_id));
- sessions_.erase(session_id);
+ "Remove session with id " << session_id <<
+ " was unsuccessful");
}
}
@@ -135,35 +117,71 @@ void HeartBeatMonitor::KeepAlive(uint8_t session_id) {
AutoLock auto_lock(sessions_list_lock_);
if (sessions_.end() != sessions_.find(session_id)) {
- LOG4CXX_INFO(
- logger_,
- "Resetting heart beat timer for session with id " << static_cast<int32_t>(session_id));
+ LOG4CXX_INFO( logger_, "Resetting heart beat timer for session with id " <<
+ static_cast<int32_t>(session_id));
- RefreshExpiration(&sessions_[session_id].heartbeat_expiration);
- sessions_[session_id].is_heartbeat_sent = false;
+ sessions_[session_id].KeepAlive();
}
}
-bool HeartBeatMonitor::exitThreadMain() {
- LOG4CXX_TRACE_ENTER(logger_);
+void HeartBeatMonitor::exitThreadMain() {
+ // FIXME (dchmerev@luxoft.com): thread requested to stop should stop as soon as possible,
+ // not running one more iteration before actual stop
+ LOG4CXX_AUTO_TRACE(logger_);
run_ = false;
AutoLock main_lock(main_thread_lock_);
- LOG4CXX_TRACE_EXIT(logger_);
- return true;
}
-void HeartBeatMonitor::set_heartbeat_timeout_seconds(int32_t timeout) {
- LOG4CXX_DEBUG(logger_, "Set new heart beat timeout " << timeout);
- {
- AutoLock locker(heartbeat_timeout_seconds_lock_);
- heartbeat_timeout_seconds_ = timeout;
- }
+void HeartBeatMonitor::set_heartbeat_timeout_seconds(int32_t timeout,
+ uint8_t session_id) {
+ LOG4CXX_DEBUG(logger_, "Set new heart beat timeout " << timeout <<
+ "For session: " << session_id);
AutoLock session_locker(sessions_list_lock_);
- for (SessionMap::iterator i = sessions_.begin(); i != sessions_.end(); ++i) {
- SessionState& session_state = i->second;
- RefreshExpiration(&session_state.heartbeat_expiration);
+ if (sessions_.end() != sessions_.find(session_id)) {
+ sessions_[session_id].UpdateTimeout(timeout);
}
}
+HeartBeatMonitor::SessionState::SessionState(int32_t heartbeat_timeout_seconds)
+ : heartbeat_timeout_seconds_(heartbeat_timeout_seconds),
+ is_heartbeat_sent(false) {
+ LOG4CXX_DEBUG(logger_, "SessionState ctor.");
+ RefreshExpiration();
+}
+
+void HeartBeatMonitor::SessionState::RefreshExpiration () {
+ LOG4CXX_DEBUG(logger_, "Refresh expiration: " << heartbeat_timeout_seconds_);
+ heartbeat_expiration = date_time::DateTime::getCurrentTime();
+ heartbeat_expiration.tv_sec += heartbeat_timeout_seconds_;
+}
+
+void HeartBeatMonitor::SessionState::UpdateTimeout(
+ int32_t heartbeat_timeout_seconds) {
+ heartbeat_timeout_seconds_ = heartbeat_timeout_seconds;
+ LOG4CXX_DEBUG(logger_, "Update timout");
+ RefreshExpiration();
+}
+
+void HeartBeatMonitor::SessionState::PrepareToClose() {
+ is_heartbeat_sent = true;
+ LOG4CXX_DEBUG(logger_, "Prepare to close");
+ RefreshExpiration();
+}
+
+bool HeartBeatMonitor::SessionState::IsReadyToClose() const {
+ return is_heartbeat_sent;
+}
+
+void HeartBeatMonitor::SessionState::KeepAlive() {
+ is_heartbeat_sent = false;
+ LOG4CXX_DEBUG(logger_, "keep alive");
+ RefreshExpiration();
+}
+
+bool HeartBeatMonitor::SessionState::HasTimeoutElapsed() {
+ TimevalStruct now = date_time::DateTime::getCurrentTime();
+ return date_time::DateTime::Greater(now, heartbeat_expiration);
+}
+
} // namespace connection_handler