summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp46
1 files changed, 44 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index eb54ddfd56..66ee6281c6 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -38,7 +38,6 @@
using namespace qpid::sys;
using namespace qpid::framing;
-using namespace qpid::sys;
using qpid::ptr_map_ptr;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
@@ -57,7 +56,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtId(mgmtId_),
mgmtObject(0),
links(broker_.getLinks()),
- agent(0)
+ agent(0),
+ timer(broker_.getTimer())
{
Manageable* parent = broker.GetVhostObject();
@@ -92,6 +92,9 @@ Connection::~Connection()
}
if (isLink)
links.notifyClosed(mgmtId);
+
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
}
void Connection::received(framing::AMQFrame& frame) {
@@ -174,6 +177,8 @@ void Connection::setFederationLink(bool b)
void Connection::close(connection::CloseCode code, const string& text)
{
QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
@@ -183,6 +188,8 @@ void Connection::close(connection::CloseCode code, const string& text)
// Send a close to the client but keep the channels. Used by cluster.
void Connection::sendClose() {
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -230,6 +237,10 @@ bool Connection::doOutput() {
return false;
}
+void Connection::sendHeartbeat() {
+ adapter.heartbeat();
+}
+
void Connection::closeChannel(uint16_t id) {
ChannelMap::iterator i = channels.find(id);
if (i != channels.end()) channels.erase(i);
@@ -272,5 +283,36 @@ void Connection::setSecureConnection(SecureConnection* s)
adapter.setSecureConnection(s);
}
+struct ConnectionHeartbeatTask : public TimerTask {
+ Timer& timer;
+ Connection& connection;
+ ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
+ TimerTask(Duration(hb*TIME_SEC)),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (!isCancelled()) {
+ // Setup next firing
+ reset();
+ timer.add(this);
+
+ // Send Heartbeat
+ connection.sendHeartbeat();
+ }
+ }
+};
+
+void Connection::setHeartbeatInterval(uint16_t heartbeat)
+{
+ setHeartbeat(heartbeat);
+ if (heartbeat > 0) {
+ heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
+ timer.add(heartbeatTimer);
+ }
+}
+
}}