diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-06-08 14:35:01 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-06-08 14:35:01 +0000 |
commit | 58ee0f096ad8d487984fa19ccbfe6b274e989ade (patch) | |
tree | 34a089afc4335ec76d398fc187bda5d3f4395553 | |
parent | 322353742b90b8f133c6c8d0a654c714071bf77f (diff) | |
download | qpid-python-58ee0f096ad8d487984fa19ccbfe6b274e989ade.tar.gz |
- Added heartbeat generation to the client (actually echo back any
broker generated heartbeat)
- Broker now disconnects client if it receives no traffic in
2 heartbeat intervals (which is now the same as the client behvaiour)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@782651 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 68 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 20 |
4 files changed, 82 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 22188054a6..c53d943e98 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -68,8 +68,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (parent != 0) { agent = broker_.getManagementAgent(); - - + + // TODO set last bool true if system connection if (agent != 0) { mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); @@ -95,9 +95,11 @@ Connection::~Connection() } if (isLink) links.notifyClosed(mgmtId); - + if (heartbeatTimer) heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); } void Connection::received(framing::AMQFrame& frame) { @@ -181,7 +183,9 @@ void Connection::close(connection::CloseCode code, const string& text) { QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); if (heartbeatTimer) - heartbeatTimer->cancel(); + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -192,7 +196,9 @@ void Connection::close(connection::CloseCode code, const string& text) // Send a close to the client but keep the channels. Used by cluster. void Connection::sendClose() { if (heartbeatTimer) - heartbeatTimer->cancel(); + heartbeatTimer->cancel(); + if (timeoutTimer) + timeoutTimer->cancel(); adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -203,7 +209,7 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { - while (!channels.empty()) + while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); @@ -221,7 +227,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions. bool Connection::hasOutput() { return outputTasks.hasOutput(); } -bool Connection::doOutput() { +bool Connection::doOutput() { try{ { ScopedLock<Mutex> l(ioCallbackLock); @@ -292,33 +298,65 @@ void Connection::setSecureConnection(SecureConnection* s) struct ConnectionHeartbeatTask : public TimerTask { Timer& timer; Connection& connection; - ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : + ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : TimerTask(Duration(hb*TIME_SEC)), timer(t), connection(c) {} - + void fire() { // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { // Setup next firing reset(); timer.add(this); - + // Send Heartbeat connection.sendHeartbeat(); } } }; +struct ConnectionTimeoutTask : public TimerTask { + Timer& timer; + Connection& connection; + ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) : + TimerTask(Duration(hb*2*TIME_SEC)), + timer(t), + connection(c) + {} + + void fire() { + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // If we get here then we've not received any traffic in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection timed out: closing"); + connection.abort(); + } + } +}; + +void Connection::abort() +{ + out.abort(); +} + void Connection::setHeartbeatInterval(uint16_t heartbeat) { setHeartbeat(heartbeat); if (heartbeat > 0) { - heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); - timer.add(heartbeatTimer); + heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); + timer.add(heartbeatTimer); + timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this); + timer.add(timeoutTimer); } } -}} +void Connection::restartTimeout() +{ + if (timeoutTimer) + timeoutTimer->reset(); +} +}} diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 770bf2184f..df2c36c92e 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -61,7 +61,7 @@ namespace broker { class LinkRegistry; class SecureConnection; -class Connection : public sys::ConnectionInputHandler, +class Connection : public sys::ConnectionInputHandler, public ConnectionState, public RefCounted { @@ -115,9 +115,11 @@ class Connection : public sys::ConnectionInputHandler, /** Connection does not delete the listener. 0 resets. */ void setErrorListener(ErrorListener* l) { errorListener=l; } ErrorListener* getErrorListener() { return errorListener; } - + void setHeartbeatInterval(uint16_t heartbeat); void sendHeartbeat(); + void restartTimeout(); + void abort(); template <class F> void eachSessionHandler(F f) { for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) @@ -143,6 +145,7 @@ class Connection : public sys::ConnectionInputHandler, management::ManagementAgent* agent; Timer& timer; boost::intrusive_ptr<TimerTask> heartbeatTimer; + boost::intrusive_ptr<TimerTask> timeoutTimer; ErrorListener* errorListener; public: diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 8b70836da0..d3e795ae06 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -63,6 +63,9 @@ void ConnectionHandler::heartbeat() void ConnectionHandler::handle(framing::AMQFrame& frame) { + // Received frame on connection so delay timeout + handler->connection.restartTimeout(); + AMQMethodBody* method=frame.getBody()->getMethod(); Connection::ErrorListener* errorListener = handler->connection.getErrorListener(); try{ @@ -186,7 +189,7 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/, { std::vector<Url> urls = connection.broker.getKnownBrokers(); framing::Array array(0x95); // str16 array - for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) + for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); proxy.openOk(array); @@ -197,7 +200,7 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/, } } - + void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyText) { if (replyCode != 200) { @@ -209,11 +212,11 @@ void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyTe proxy.closeOk(); connection.getOutput().close(); -} - +} + void ConnectionHandler::Handler::closeOk(){ connection.getOutput().close(); -} +} void ConnectionHandler::Handler::heartbeat(){ // Do nothing - the purpose of heartbeats is just to make sure that there is some diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index 6efdb91e96..db113cdf80 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -70,8 +70,8 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode) } } -ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) - : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), +ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) + : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v) { insist = true; @@ -82,7 +82,7 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio FINISHED.insert(FAILED); FINISHED.insert(CLOSED); - + properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER); properties.setString(CLIENT_PROCESS_NAME, sys::SystemInfo::getProcessName()); properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId()); @@ -125,7 +125,7 @@ void ConnectionHandler::incoming(AMQFrame& frame) void ConnectionHandler::outgoing(AMQFrame& frame) { - if (getState() == OPEN) + if (getState() == OPEN) out(frame); else throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText); @@ -160,6 +160,10 @@ void ConnectionHandler::heartbeat() // Do nothing - the purpose of heartbeats is just to make sure that there is some // traffic on the connection within the heart beat interval, we check for the // traffic and don't need to do anything in response to heartbeats + + // Although the above is still true we're now using a received heartbeat as a trigger + // to send out our own heartbeat + proxy.heartbeat(); } void ConnectionHandler::checkState(STATES s, const std::string& msg) @@ -223,13 +227,13 @@ void ConnectionHandler::secure(const std::string& challenge) } } -void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, +void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, uint16_t heartbeatMin, uint16_t heartbeatMax) { checkState(NEGOTIATING, INVALID_STATE_TUNE); maxChannels = std::min(maxChannels, maxChannelsProposed); maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); - // Clip the requested heartbeat to the maximum/minimum offered + // Clip the requested heartbeat to the maximum/minimum offered uint16_t heartbeat = ConnectionSettings::heartbeat; heartbeat = heartbeat < heartbeatMin ? heartbeatMin : heartbeat > heartbeatMax ? heartbeatMax : |