diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 68 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 20 |
4 files changed, 82 insertions, 34 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 22188054a6..c53d943e98 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -68,8 +68,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (parent != 0) { agent = broker_.getManagementAgent(); - - + + // TODO set last bool true if system connection if (agent != 0) { mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); @@ -95,9 +95,11 @@ Connection::~Connection() } if (isLink) links.notifyClosed(mgmtId); - + if (heartbeatTimer) heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); } void Connection::received(framing::AMQFrame& frame) { @@ -181,7 +183,9 @@ void Connection::close(connection::CloseCode code, const string& text) { QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); if (heartbeatTimer) - heartbeatTimer->cancel(); + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -192,7 +196,9 @@ void Connection::close(connection::CloseCode code, const string& text) // Send a close to the client but keep the channels. Used by cluster. void Connection::sendClose() { if (heartbeatTimer) - heartbeatTimer->cancel(); + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -203,7 +209,7 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { - while (!channels.empty()) + while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); @@ -221,7 +227,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions. bool Connection::hasOutput() { return outputTasks.hasOutput(); } -bool Connection::doOutput() { +bool Connection::doOutput() { try{ { ScopedLock<Mutex> l(ioCallbackLock); @@ -292,33 +298,65 @@ void Connection::setSecureConnection(SecureConnection* s) struct ConnectionHeartbeatTask : public TimerTask { Timer& timer; Connection& connection; - ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : + ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : TimerTask(Duration(hb*TIME_SEC)), timer(t), connection(c) {} - + void fire() { // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { // Setup next firing reset(); timer.add(this); - + // Send Heartbeat connection.sendHeartbeat(); } } }; +struct ConnectionTimeoutTask : public TimerTask { + Timer& timer; + Connection& connection; + ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) : + TimerTask(Duration(hb*2*TIME_SEC)), + timer(t), + connection(c) + {} + + void fire() { + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // If we get here then we've not received any traffic in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection timed out: closing"); + connection.abort(); + } + } +}; + +void Connection::abort() +{ + out.abort(); +} + void Connection::setHeartbeatInterval(uint16_t heartbeat) { setHeartbeat(heartbeat); if (heartbeat > 0) { - heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); - timer.add(heartbeatTimer); + heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); + timer.add(heartbeatTimer); + timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this); + timer.add(timeoutTimer); } } -}} +void Connection::restartTimeout() +{ + if (timeoutTimer) + timeoutTimer->reset(); +} +}} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 770bf2184f..df2c36c92e 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -61,7 +61,7 @@ namespace broker { class LinkRegistry; class SecureConnection; -class Connection : public sys::ConnectionInputHandler, +class Connection : public sys::ConnectionInputHandler, public ConnectionState, public RefCounted { @@ -115,9 +115,11 @@ class Connection : public sys::ConnectionInputHandler, /** Connection does not delete the listener. 0 resets. */ void setErrorListener(ErrorListener* l) { errorListener=l; } ErrorListener* getErrorListener() { return errorListener; } - + void setHeartbeatInterval(uint16_t heartbeat); void sendHeartbeat(); + void restartTimeout(); + void abort(); template <class F> void eachSessionHandler(F f) { for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) @@ -143,6 +145,7 @@ class Connection : public sys::ConnectionInputHandler, management::ManagementAgent* agent; Timer& timer; boost::intrusive_ptr<TimerTask> heartbeatTimer; + boost::intrusive_ptr<TimerTask> timeoutTimer; ErrorListener* errorListener; public: diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 8b70836da0..d3e795ae06 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -63,6 +63,9 @@ void ConnectionHandler::heartbeat() void ConnectionHandler::handle(framing::AMQFrame& frame) { + // Received frame on connection so delay timeout + handler->connection.restartTimeout(); + AMQMethodBody* method=frame.getBody()->getMethod(); Connection::ErrorListener* errorListener = handler->connection.getErrorListener(); try{ @@ -186,7 +189,7 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/, { std::vector<Url> urls = connection.broker.getKnownBrokers(); framing::Array array(0x95); // str16 array - for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) + for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); proxy.openOk(array); @@ -197,7 +200,7 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/, } } - + void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyText) { if (replyCode != 200) { @@ -209,11 +212,11 @@ void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyTe proxy.closeOk(); connection.getOutput().close(); -} - +} + void ConnectionHandler::Handler::closeOk(){ connection.getOutput().close(); -} +} void ConnectionHandler::Handler::heartbeat(){ // Do nothing - the purpose of heartbeats is just to make sure that there is some diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 6efdb91e96..db113cdf80 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -70,8 +70,8 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode) } } -ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) - : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), +ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) + : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v) { insist = true; @@ -82,7 +82,7 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio FINISHED.insert(FAILED); FINISHED.insert(CLOSED); - + properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER); properties.setString(CLIENT_PROCESS_NAME, sys::SystemInfo::getProcessName()); properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId()); @@ -125,7 +125,7 @@ void ConnectionHandler::incoming(AMQFrame& frame) void ConnectionHandler::outgoing(AMQFrame& frame) { - if (getState() == OPEN) + if (getState() == OPEN) out(frame); else throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText); @@ -160,6 +160,10 @@ void ConnectionHandler::heartbeat() // Do nothing - the purpose of heartbeats is just to make sure that there is some // traffic on the connection within the heart beat interval, we check for the // traffic and don't need to do anything in response to heartbeats + + // Although the above is still true we're now using a received heartbeat as a trigger + // to send out our own heartbeat + proxy.heartbeat(); } void ConnectionHandler::checkState(STATES s, const std::string& msg) @@ -223,13 +227,13 @@ void ConnectionHandler::secure(const std::string& challenge) } } -void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, +void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, uint16_t heartbeatMin, uint16_t heartbeatMax) { checkState(NEGOTIATING, INVALID_STATE_TUNE); maxChannels = std::min(maxChannels, maxChannelsProposed); maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); - // Clip the requested heartbeat to the maximum/minimum offered + // Clip the requested heartbeat to the maximum/minimum offered uint16_t heartbeat = ConnectionSettings::heartbeat; heartbeat = heartbeat < heartbeatMin ? heartbeatMin : heartbeat > heartbeatMax ? heartbeatMax : |