summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AsynchIOHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOHandler.cpp')
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp43
1 files changed, 41 insertions, 2 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 5233002850..8a485db72d 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -23,6 +23,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -41,11 +42,30 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ AsynchIOHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received closing");
+ handler.abort();
+ }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
identifier(id),
aio(0),
factory(f),
codec(0),
+ reads(0),
readError(false),
isClient(false),
readCredit(InfiniteCredit)
@@ -54,12 +74,18 @@ AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
AsynchIOHandler::~AsynchIOHandler() {
if (codec)
codec->closed();
+ if (timeoutTimerTask)
+ timeoutTimerTask->cancel();
delete codec;
}
-void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -129,10 +155,18 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
}
}
+ ++reads;
size_t decoded = 0;
if (codec) { // Already initiated
try {
decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // When we've decoded 3 reads (probably frames) we will have authenticated and
+ // started heartbeats, if specified, in many (but not all) cases so now we will cancel
+ // the idle connection timeout - this is really hacky, and would be better implemented
+ // in the connection, but that isn't actually created until the first decode.
+ if (reads == 3) {
+ timeoutTimerTask->cancel();
+ }
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
@@ -143,6 +177,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
+
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -202,6 +237,10 @@ void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, SecuritySettings());
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the timeout for that
+ // This is not ideal, because we've not received anything yet, but heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;