diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index eb54ddfd56..66ee6281c6 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -38,7 +38,6 @@ using namespace qpid::sys; using namespace qpid::framing; -using namespace qpid::sys; using qpid::ptr_map_ptr; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; @@ -57,7 +56,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtId(mgmtId_), mgmtObject(0), links(broker_.getLinks()), - agent(0) + agent(0), + timer(broker_.getTimer()) { Manageable* parent = broker.GetVhostObject(); @@ -92,6 +92,9 @@ Connection::~Connection() } if (isLink) links.notifyClosed(mgmtId); + + if (heartbeatTimer) + heartbeatTimer->cancel(); } void Connection::received(framing::AMQFrame& frame) { @@ -174,6 +177,8 @@ void Connection::setFederationLink(bool b) void Connection::close(connection::CloseCode code, const string& text) { QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); + if (heartbeatTimer) + heartbeatTimer->cancel(); adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -183,6 +188,8 @@ void Connection::close(connection::CloseCode code, const string& text) // Send a close to the client but keep the channels. Used by cluster. void Connection::sendClose() { + if (heartbeatTimer) + heartbeatTimer->cancel(); adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -230,6 +237,10 @@ bool Connection::doOutput() { return false; } +void Connection::sendHeartbeat() { + adapter.heartbeat(); +} + void Connection::closeChannel(uint16_t id) { ChannelMap::iterator i = channels.find(id); if (i != channels.end()) channels.erase(i); @@ -272,5 +283,36 @@ void Connection::setSecureConnection(SecureConnection* s) adapter.setSecureConnection(s); } +struct ConnectionHeartbeatTask : public TimerTask { + Timer& timer; + Connection& connection; + ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : + TimerTask(Duration(hb*TIME_SEC)), + timer(t), + connection(c) + {} + + void fire() { + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // Setup next firing + reset(); + timer.add(this); + + // Send Heartbeat + connection.sendHeartbeat(); + } + } +}; + +void Connection::setHeartbeatInterval(uint16_t heartbeat) +{ + setHeartbeat(heartbeat); + if (heartbeat > 0) { + heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); + timer.add(heartbeatTimer); + } +} + }} |