summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-01-13 19:15:31 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-01-13 19:15:31 +0000
commit766c218055567f4e387aab4678d8d1c840916465 (patch)
treea8caf1fc75a6505124b3e6d4ed3b36de5894f50c /cpp/src
parent5fec8f487c510e2309b3bc939fea70078a11af97 (diff)
downloadqpid-python-766c218055567f4e387aab4678d8d1c840916465.tar.gz
Implement heartbeat timeout on client:
- The client shuts down a connection if it receives no traffic on it in 2 timeout periods git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@734221 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp43
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h5
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp40
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h2
-rw-r--r--cpp/src/qpid/client/ConnectionSettings.h3
-rw-r--r--cpp/src/qpid/client/Connector.cpp5
-rw-r--r--cpp/src/qpid/client/Connector.h1
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp1
-rw-r--r--cpp/src/qpid/client/SslConnector.cpp3
9 files changed, 83 insertions, 20 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index d5b3f2264b..d6d024cf3f 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -33,6 +33,13 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::framing::connection;
using qpid::sys::SecurityLayer;
+using qpid::sys::Duration;
+using qpid::sys::TimerTask;
+using qpid::sys::Timer;
+using qpid::sys::AbsTime;
+using qpid::sys::TIME_SEC;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
namespace {
const std::string OK("OK");
@@ -60,7 +67,7 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode)
ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v)
: StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler),
errorCode(CLOSE_CODE_NORMAL), version(v)
-{
+{
insist = true;
ESTABLISHED.insert(FAILED);
@@ -69,14 +76,18 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio
FINISHED.insert(FAILED);
FINISHED.insert(CLOSED);
-}
+}
void ConnectionHandler::incoming(AMQFrame& frame)
{
if (getState() == CLOSED) {
- throw Exception("Received frame on closed connection");
+ throw Exception("Received frame on closed connection");
}
+ if (rcvTimeoutTask) {
+ // Received frame on connection so delay timeout
+ rcvTimeoutTask->restart();
+ }
AMQBody* body = frame.getBody();
try {
@@ -86,18 +97,18 @@ void ConnectionHandler::incoming(AMQFrame& frame)
in(frame);
break;
case CLOSING:
- QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
+ QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
break;
default:
throw Exception("Cannot receive frames on non-zero channel until connection is established.");
}
}
}catch(std::exception& e){
- QPID_LOG(warning, "Closing connection due to " << e.what());
+ QPID_LOG(warning, "Closing connection due to " << e.what());
setState(CLOSING);
errorCode = CLOSE_CODE_FRAMING_ERROR;
errorText = e.what();
- proxy.close(501, e.what());
+ proxy.close(501, e.what());
}
}
@@ -135,9 +146,9 @@ void ConnectionHandler::close()
void ConnectionHandler::heartbeat()
{
- // Do nothing - the purpose of heartbeats is just to make sure that there is some
- // traffic on the connection within the heart beat interval, we check for the
- // traffic and don't need to do anything in response to heartbeats
+ // Do nothing - the purpose of heartbeats is just to make sure that there is some
+ // traffic on the connection within the heart beat interval, we check for the
+ // traffic and don't need to do anything in response to heartbeats
}
void ConnectionHandler::checkState(STATES s, const std::string& msg)
@@ -175,7 +186,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me
if (i != mechanisms.begin()) mechlist += SPACE;
mechlist += (*i)->get<std::string>();
}
- }
+ }
if (!chosenMechanismSupported) {
fail("Selected mechanism not supported: " + mechanism);
@@ -210,11 +221,10 @@ void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSize
// Clip the requested heartbeat to the maximum/minimum offered
uint16_t heartbeat = ConnectionSettings::heartbeat;
heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
- heartbeat > heartbeatMax ? heartbeatMax :
- heartbeat;
+ heartbeat > heartbeatMax ? heartbeatMax :
+ heartbeat;
+ ConnectionSettings::heartbeat = heartbeat;
proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
- // TODO set connection timeout to be 2x heart beat interval
- // TODO and set an alarm for it.
setState(OPENING);
proxy.open(virtualhost, capabilities, insist);
}
@@ -279,3 +289,8 @@ std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer()
{
return securityLayer;
}
+
+void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t)
+{
+ rcvTimeoutTask = t;
+}
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index 58b540cfeb..b3b15e2f04 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -35,6 +35,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/InputHandler.h"
#include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/Timer.h"
#include "qpid/Url.h"
#include <memory>
@@ -69,6 +70,7 @@ class ConnectionHandler : private StateManager,
framing::FieldTable properties;
std::auto_ptr<Sasl> sasl;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+ boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask;
void checkState(STATES s, const std::string& msg);
@@ -109,7 +111,8 @@ public:
bool isClosed() const;
bool isClosing() const;
- std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer();
+ std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer();
+ void setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask>);
CloseListener onClose;
ErrorListener onError;
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index aa9eeb7489..46ef9eda1e 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -41,6 +41,31 @@ using namespace qpid::framing::connection;
using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
+// Get timer singleton
+Timer& theTimer() {
+ static Mutex timerInitLock;
+ ScopedLock<Mutex> l(timerInitLock);
+
+ static qpid::sys::Timer t;
+ return t;
+}
+
+class HeartbeatTask : public TimerTask {
+ TimeoutHandler& timeout;
+
+ void fire() {
+ // If we ever get here then we have timed out
+ QPID_LOG(debug, "Traffic timeout");
+ timeout.idleIn();
+ }
+
+public:
+ HeartbeatTask(Duration p, TimeoutHandler& t) :
+ TimerTask(p),
+ timeout(t)
+ {}
+};
+
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
@@ -110,6 +135,16 @@ void ConnectionImpl::open()
connector->connect(host, port);
connector->init();
handler.waitForOpen();
+
+ // Enable heartbeat if requested
+ uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
+ if (heartbeat) {
+ // Set connection timeout to be 2x heart beat interval and setup timer
+ heartbeatTask = new HeartbeatTask(heartbeat * 2 * TIME_SEC, *this);
+ handler.setRcvTimeoutTask(heartbeatTask);
+ theTimer().add(heartbeatTask);
+ }
+
//enable security layer if one has been negotiated:
std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer();
if (securityLayer.get()) {
@@ -124,7 +159,7 @@ void ConnectionImpl::open()
void ConnectionImpl::idleIn()
{
- close();
+ connector->abort();
}
void ConnectionImpl::idleOut()
@@ -136,6 +171,9 @@ void ConnectionImpl::idleOut()
void ConnectionImpl::close()
{
if (!handler.isOpen()) return;
+ if (heartbeatTask) {
+ heartbeatTask->cancel();
+ }
handler.close();
closed(CLOSE_CODE_NORMAL, "Closed by client");
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index 55a4929028..9385687238 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -61,6 +61,8 @@ class ConnectionImpl : public Bounds,
uint16_t nextChannel;
sys::Mutex lock;
+ boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask;
+
template <class F> void closeInternal(const F&);
void incoming(framing::AMQFrame& frame);
diff --git a/cpp/src/qpid/client/ConnectionSettings.h b/cpp/src/qpid/client/ConnectionSettings.h
index c7725e19f0..f60b11a4ab 100644
--- a/cpp/src/qpid/client/ConnectionSettings.h
+++ b/cpp/src/qpid/client/ConnectionSettings.h
@@ -89,8 +89,7 @@ struct ConnectionSettings {
*/
std::string locale;
/**
- * Allows a heartbeat frequency to be specified (this feature is
- * not yet implemented).
+ * Allows a heartbeat frequency to be specified
*/
uint16_t heartbeat;
/**
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 5cdde723af..e6355601df 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -128,6 +128,7 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
void init();
void close();
void send(framing::AMQFrame& frame);
+ void abort();
void setInputHandler(framing::InputHandler* handler);
void setShutdownHandler(sys::ShutdownHandler* handler);
@@ -233,6 +234,10 @@ void TCPConnector::close() {
closeInternal();
}
+void TCPConnector::abort() {
+ aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+}
+
void TCPConnector::setInputHandler(InputHandler* handler){
input = handler;
}
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index e23fb8875b..2966166d28 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -64,6 +64,7 @@ class Connector : public framing::OutputHandler
virtual void init() {};
virtual void close() = 0;
virtual void send(framing::AMQFrame& frame) = 0;
+ virtual void abort() = 0;
virtual void setInputHandler(framing::InputHandler* handler) = 0;
virtual void setShutdownHandler(sys::ShutdownHandler* handler) = 0;
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
index 8e330448c9..ad85104f3a 100644
--- a/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -105,6 +105,7 @@ using boost::str;
void connect(const std::string& host, int port);
void close();
void send(framing::AMQFrame& frame);
+ void abort() {} // TODO: need to fix this for heartbeat timeouts to work
void setInputHandler(framing::InputHandler* handler);
void setShutdownHandler(sys::ShutdownHandler* handler);
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp
index 6dbdbd003e..a68af78354 100644
--- a/cpp/src/qpid/client/SslConnector.cpp
+++ b/cpp/src/qpid/client/SslConnector.cpp
@@ -122,6 +122,7 @@ class SslConnector : public Connector, private sys::Runnable
void init();
void close();
void send(framing::AMQFrame& frame);
+ void abort() {} // TODO: Need to fix for heartbeat timeouts to work
void setInputHandler(framing::InputHandler* handler);
void setShutdownHandler(sys::ShutdownHandler* handler);
@@ -372,8 +373,6 @@ void SslConnector::eof(SslIO&) {
handleClosed();
}
-// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
-// will never be called
void SslConnector::run(){
// Keep the connection impl in memory until run() completes.
boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();