diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-01-13 19:15:31 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-13 19:15:31 +0000 |
commit | 766c218055567f4e387aab4678d8d1c840916465 (patch) | |
tree | a8caf1fc75a6505124b3e6d4ed3b36de5894f50c /cpp/src | |
parent | 5fec8f487c510e2309b3bc939fea70078a11af97 (diff) | |
download | qpid-python-766c218055567f4e387aab4678d8d1c840916465.tar.gz |
Implement heartbeat timeout on client:
- The client shuts down a connection if
it receives no traffic on it in 2 timeout periods
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@734221 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 43 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 40 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionSettings.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 3 |
9 files changed, 83 insertions, 20 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index d5b3f2264b..d6d024cf3f 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -33,6 +33,13 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::framing::connection; using qpid::sys::SecurityLayer; +using qpid::sys::Duration; +using qpid::sys::TimerTask; +using qpid::sys::Timer; +using qpid::sys::AbsTime; +using qpid::sys::TIME_SEC; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; namespace { const std::string OK("OK"); @@ -60,7 +67,7 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode) ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v) -{ +{ insist = true; ESTABLISHED.insert(FAILED); @@ -69,14 +76,18 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio FINISHED.insert(FAILED); FINISHED.insert(CLOSED); -} +} void ConnectionHandler::incoming(AMQFrame& frame) { if (getState() == CLOSED) { - throw Exception("Received frame on closed connection"); + throw Exception("Received frame on closed connection"); } + if (rcvTimeoutTask) { + // Received frame on connection so delay timeout + rcvTimeoutTask->restart(); + } AMQBody* body = frame.getBody(); try { @@ -86,18 +97,18 @@ void ConnectionHandler::incoming(AMQFrame& frame) in(frame); break; case CLOSING: - QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); + QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); break; default: throw Exception("Cannot receive frames on non-zero channel until connection is established."); } } }catch(std::exception& e){ - QPID_LOG(warning, "Closing connection due to " << e.what()); + QPID_LOG(warning, "Closing connection due to " << e.what()); setState(CLOSING); errorCode = CLOSE_CODE_FRAMING_ERROR; errorText = e.what(); - proxy.close(501, e.what()); + proxy.close(501, e.what()); } } @@ -135,9 +146,9 @@ void ConnectionHandler::close() void ConnectionHandler::heartbeat() { - // Do nothing - the purpose of heartbeats is just to make sure that there is some - // traffic on the connection within the heart beat interval, we check for the - // traffic and don't need to do anything in response to heartbeats + // Do nothing - the purpose of heartbeats is just to make sure that there is some + // traffic on the connection within the heart beat interval, we check for the + // traffic and don't need to do anything in response to heartbeats } void ConnectionHandler::checkState(STATES s, const std::string& msg) @@ -175,7 +186,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me if (i != mechanisms.begin()) mechlist += SPACE; mechlist += (*i)->get<std::string>(); } - } + } if (!chosenMechanismSupported) { fail("Selected mechanism not supported: " + mechanism); @@ -210,11 +221,10 @@ void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSize // Clip the requested heartbeat to the maximum/minimum offered uint16_t heartbeat = ConnectionSettings::heartbeat; heartbeat = heartbeat < heartbeatMin ? heartbeatMin : - heartbeat > heartbeatMax ? heartbeatMax : - heartbeat; + heartbeat > heartbeatMax ? heartbeatMax : + heartbeat; + ConnectionSettings::heartbeat = heartbeat; proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); - // TODO set connection timeout to be 2x heart beat interval - // TODO and set an alarm for it. setState(OPENING); proxy.open(virtualhost, capabilities, insist); } @@ -279,3 +289,8 @@ std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer() { return securityLayer; } + +void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t) +{ + rcvTimeoutTask = t; +} diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index 58b540cfeb..b3b15e2f04 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -35,6 +35,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/InputHandler.h" #include "qpid/sys/SecurityLayer.h" +#include "qpid/sys/Timer.h" #include "qpid/Url.h" #include <memory> @@ -69,6 +70,7 @@ class ConnectionHandler : private StateManager, framing::FieldTable properties; std::auto_ptr<Sasl> sasl; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; + boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask; void checkState(STATES s, const std::string& msg); @@ -109,7 +111,8 @@ public: bool isClosed() const; bool isClosing() const; - std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(); + std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(); + void setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask>); CloseListener onClose; ErrorListener onError; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index aa9eeb7489..46ef9eda1e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -41,6 +41,31 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes +// Get timer singleton +Timer& theTimer() { + static Mutex timerInitLock; + ScopedLock<Mutex> l(timerInitLock); + + static qpid::sys::Timer t; + return t; +} + +class HeartbeatTask : public TimerTask { + TimeoutHandler& timeout; + + void fire() { + // If we ever get here then we have timed out + QPID_LOG(debug, "Traffic timeout"); + timeout.idleIn(); + } + +public: + HeartbeatTask(Duration p, TimeoutHandler& t) : + TimerTask(p), + timeout(t) + {} +}; + ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), @@ -110,6 +135,16 @@ void ConnectionImpl::open() connector->connect(host, port); connector->init(); handler.waitForOpen(); + + // Enable heartbeat if requested + uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat; + if (heartbeat) { + // Set connection timeout to be 2x heart beat interval and setup timer + heartbeatTask = new HeartbeatTask(heartbeat * 2 * TIME_SEC, *this); + handler.setRcvTimeoutTask(heartbeatTask); + theTimer().add(heartbeatTask); + } + //enable security layer if one has been negotiated: std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer(); if (securityLayer.get()) { @@ -124,7 +159,7 @@ void ConnectionImpl::open() void ConnectionImpl::idleIn() { - close(); + connector->abort(); } void ConnectionImpl::idleOut() @@ -136,6 +171,9 @@ void ConnectionImpl::idleOut() void ConnectionImpl::close() { if (!handler.isOpen()) return; + if (heartbeatTask) { + heartbeatTask->cancel(); + } handler.close(); closed(CLOSE_CODE_NORMAL, "Closed by client"); } diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 55a4929028..9385687238 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -61,6 +61,8 @@ class ConnectionImpl : public Bounds, uint16_t nextChannel; sys::Mutex lock; + boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask; + template <class F> void closeInternal(const F&); void incoming(framing::AMQFrame& frame); diff --git a/cpp/src/qpid/client/ConnectionSettings.h b/cpp/src/qpid/client/ConnectionSettings.h index c7725e19f0..f60b11a4ab 100644 --- a/cpp/src/qpid/client/ConnectionSettings.h +++ b/cpp/src/qpid/client/ConnectionSettings.h @@ -89,8 +89,7 @@ struct ConnectionSettings { */ std::string locale; /** - * Allows a heartbeat frequency to be specified (this feature is - * not yet implemented). + * Allows a heartbeat frequency to be specified */ uint16_t heartbeat; /** diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 5cdde723af..e6355601df 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -128,6 +128,7 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable void init(); void close(); void send(framing::AMQFrame& frame); + void abort(); void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); @@ -233,6 +234,10 @@ void TCPConnector::close() { closeInternal(); } +void TCPConnector::abort() { + aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); +} + void TCPConnector::setInputHandler(InputHandler* handler){ input = handler; } diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index e23fb8875b..2966166d28 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -64,6 +64,7 @@ class Connector : public framing::OutputHandler virtual void init() {}; virtual void close() = 0; virtual void send(framing::AMQFrame& frame) = 0; + virtual void abort() = 0; virtual void setInputHandler(framing::InputHandler* handler) = 0; virtual void setShutdownHandler(sys::ShutdownHandler* handler) = 0; diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index 8e330448c9..ad85104f3a 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -105,6 +105,7 @@ using boost::str; void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); + void abort() {} // TODO: need to fix this for heartbeat timeouts to work void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 6dbdbd003e..a68af78354 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -122,6 +122,7 @@ class SslConnector : public Connector, private sys::Runnable void init(); void close(); void send(framing::AMQFrame& frame); + void abort() {} // TODO: Need to fix for heartbeat timeouts to work void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); @@ -372,8 +373,6 @@ void SslConnector::eof(SslIO&) { handleClosed(); } -// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing -// will never be called void SslConnector::run(){ // Keep the connection impl in memory until run() completes. boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); |