summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AsynchIOHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOHandler.cpp')
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp34
1 files changed, 32 insertions, 2 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 5233002850..3edc896724 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -23,6 +23,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -41,7 +42,25 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ AsynchIOHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received closing");
+ handler.abort();
+ }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
identifier(id),
aio(0),
factory(f),
@@ -57,9 +76,13 @@ AsynchIOHandler::~AsynchIOHandler() {
delete codec;
}
-void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -143,6 +166,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
+ // We've just got the protocol negotiation so we can cancel the timeout for that
+ timeoutTimerTask->cancel();
+
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -202,6 +228,10 @@ void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, SecuritySettings());
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the timeout for that
+ // This is not ideal, because we've not received anything yet, but heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;