diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 474472719e..03ff3d5793 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -148,6 +148,9 @@ Connection::~Connection() heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } if (!isShadow()) broker.getConnectionCounter().dec_connectionCount(); } @@ -300,6 +303,9 @@ void Connection::close(connection::CloseCode code, const string& text) heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -313,6 +319,9 @@ void Connection::sendClose() { heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -326,6 +335,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions. heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); @@ -435,6 +447,31 @@ struct ConnectionHeartbeatTask : public sys::TimerTask { } }; +class LinkHeartbeatTask : public qpid::sys::TimerTask { + sys::Timer& timer; + Connection& connection; + bool heartbeatSeen; + + void fire() { + if (!heartbeatSeen) { + QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection"); + connection.abort(); + } else { + heartbeatSeen = false; + // Setup next firing + setupNextFire(); + timer.add(this); + } + } + +public: + LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) : + TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {} + + void heartbeatReceived() { heartbeatSeen = true; } +}; + + void Connection::abort() { // Make sure that we don't try to send a heartbeat as we're @@ -460,10 +497,21 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat) } } +void Connection::startLinkHeartbeatTimeoutTask() { + if (!linkHeartbeatTimer && heartbeat > 0) { + linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this); + timer.add(linkHeartbeatTimer); + } +} + void Connection::restartTimeout() { if (timeoutTimer) timeoutTimer->touch(); + + if (linkHeartbeatTimer) { + static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived(); + } } bool Connection::isOpen() { return adapter.isOpen(); } |
