summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp68
1 files changed, 53 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 22188054a6..c53d943e98 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -68,8 +68,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
if (parent != 0)
{
agent = broker_.getManagementAgent();
-
-
+
+
// TODO set last bool true if system connection
if (agent != 0) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
@@ -95,9 +95,11 @@ Connection::~Connection()
}
if (isLink)
links.notifyClosed(mgmtId);
-
+
if (heartbeatTimer)
heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
}
void Connection::received(framing::AMQFrame& frame) {
@@ -181,7 +183,9 @@ void Connection::close(connection::CloseCode code, const string& text)
{
QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
if (heartbeatTimer)
- heartbeatTimer->cancel();
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
@@ -192,7 +196,9 @@ void Connection::close(connection::CloseCode code, const string& text)
// Send a close to the client but keep the channels. Used by cluster.
void Connection::sendClose() {
if (heartbeatTimer)
- heartbeatTimer->cancel();
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -203,7 +209,7 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
- while (!channels.empty())
+ while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
@@ -221,7 +227,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
-bool Connection::doOutput() {
+bool Connection::doOutput() {
try{
{
ScopedLock<Mutex> l(ioCallbackLock);
@@ -292,33 +298,65 @@ void Connection::setSecureConnection(SecureConnection* s)
struct ConnectionHeartbeatTask : public TimerTask {
Timer& timer;
Connection& connection;
- ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
+ ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
TimerTask(Duration(hb*TIME_SEC)),
timer(t),
connection(c)
{}
-
+
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
// Setup next firing
reset();
timer.add(this);
-
+
// Send Heartbeat
connection.sendHeartbeat();
}
}
};
+struct ConnectionTimeoutTask : public TimerTask {
+ Timer& timer;
+ Connection& connection;
+ ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) :
+ TimerTask(Duration(hb*2*TIME_SEC)),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (!isCancelled()) {
+ // If we get here then we've not received any traffic in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection timed out: closing");
+ connection.abort();
+ }
+ }
+};
+
+void Connection::abort()
+{
+ out.abort();
+}
+
void Connection::setHeartbeatInterval(uint16_t heartbeat)
{
setHeartbeat(heartbeat);
if (heartbeat > 0) {
- heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
- timer.add(heartbeatTimer);
+ heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
+ timer.add(heartbeatTimer);
+ timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this);
+ timer.add(timeoutTimer);
}
}
-}}
+void Connection::restartTimeout()
+{
+ if (timeoutTimer)
+ timeoutTimer->reset();
+}
+}}