diff options
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOHandler.cpp')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 5233002850..8a485db72d 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -23,6 +23,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" #include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/Timer.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" @@ -41,11 +42,30 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : +struct ProtocolTimeoutTask : public sys::TimerTask { + AsynchIOHandler& handler; + std::string id; + + ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) : + TimerTask(timeout, "ProtocolTimeout"), + handler(h), + id(i) + {} + + void fire() { + // If this fires it means that we didn't negotiate the connection in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection " << id << " No protocol received closing"); + handler.abort(); + } +}; + +AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) : identifier(id), aio(0), factory(f), codec(0), + reads(0), readError(false), isClient(false), readCredit(InfiniteCredit) @@ -54,12 +74,18 @@ AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : AsynchIOHandler::~AsynchIOHandler() { if (codec) codec->closed(); + if (timeoutTimerTask) + timeoutTimerTask->cancel(); delete codec; } -void AsynchIOHandler::init(AsynchIO* a, int numBuffs) { +void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) { aio = a; + // Start timer for this connection + timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this); + timer.add(timeoutTimerTask); + // Give connection some buffers to use for (int i = 0; i < numBuffs; i++) { aio->queueReadBuffer(new Buff); @@ -129,10 +155,18 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { } } + ++reads; size_t decoded = 0; if (codec) { // Already initiated try { decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); + // When we've decoded 3 reads (probably frames) we will have authenticated and + // started heartbeats, if specified, in many (but not all) cases so now we will cancel + // the idle connection timeout - this is really hacky, and would be better implemented + // in the connection, but that isn't actually created until the first decode. + if (reads == 3) { + timeoutTimerTask->cancel(); + } }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; @@ -143,6 +177,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { decoded = in.getPosition(); + QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); try { codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); @@ -202,6 +237,10 @@ void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { codec = factory->create(*this, identifier, SecuritySettings()); write(framing::ProtocolInitiation(codec->getVersion())); + // We've just sent the protocol negotiation so we can cancel the timeout for that + // This is not ideal, because we've not received anything yet, but heartbeats will + // be active soon + timeoutTimerTask->cancel(); return; } if (codec == 0) return; |