diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 68 |
1 files changed, 53 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 22188054a6..c53d943e98 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -68,8 +68,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (parent != 0) { agent = broker_.getManagementAgent(); - - + + // TODO set last bool true if system connection if (agent != 0) { mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); @@ -95,9 +95,11 @@ Connection::~Connection() } if (isLink) links.notifyClosed(mgmtId); - + if (heartbeatTimer) heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); } void Connection::received(framing::AMQFrame& frame) { @@ -181,7 +183,9 @@ void Connection::close(connection::CloseCode code, const string& text) { QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); if (heartbeatTimer) - heartbeatTimer->cancel(); + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -192,7 +196,9 @@ void Connection::close(connection::CloseCode code, const string& text) // Send a close to the client but keep the channels. Used by cluster. void Connection::sendClose() { if (heartbeatTimer) - heartbeatTimer->cancel(); + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -203,7 +209,7 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { - while (!channels.empty()) + while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); @@ -221,7 +227,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions. bool Connection::hasOutput() { return outputTasks.hasOutput(); } -bool Connection::doOutput() { +bool Connection::doOutput() { try{ { ScopedLock<Mutex> l(ioCallbackLock); @@ -292,33 +298,65 @@ void Connection::setSecureConnection(SecureConnection* s) struct ConnectionHeartbeatTask : public TimerTask { Timer& timer; Connection& connection; - ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : + ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : TimerTask(Duration(hb*TIME_SEC)), timer(t), connection(c) {} - + void fire() { // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { // Setup next firing reset(); timer.add(this); - + // Send Heartbeat connection.sendHeartbeat(); } } }; +struct ConnectionTimeoutTask : public TimerTask { + Timer& timer; + Connection& connection; + ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) : + TimerTask(Duration(hb*2*TIME_SEC)), + timer(t), + connection(c) + {} + + void fire() { + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // If we get here then we've not received any traffic in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection timed out: closing"); + connection.abort(); + } + } +}; + +void Connection::abort() +{ + out.abort(); +} + void Connection::setHeartbeatInterval(uint16_t heartbeat) { setHeartbeat(heartbeat); if (heartbeat > 0) { - heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); - timer.add(heartbeatTimer); + heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); + timer.add(heartbeatTimer); + timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this); + timer.add(timeoutTimer); } } -}} +void Connection::restartTimeout() +{ + if (timeoutTimer) + timeoutTimer->reset(); +} +}} |