diff options
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 5233002850..3edc896724 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -23,6 +23,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" #include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/Timer.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" @@ -41,7 +42,25 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : +struct ProtocolTimeoutTask : public sys::TimerTask { + AsynchIOHandler& handler; + std::string id; + + ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) : + TimerTask(timeout, "ProtocolTimeout"), + handler(h), + id(i) + {} + + void fire() { + // If this fires it means that we didn't negotiate the connection in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection " << id << " No protocol received closing"); + handler.abort(); + } +}; + +AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) : identifier(id), aio(0), factory(f), @@ -57,9 +76,13 @@ AsynchIOHandler::~AsynchIOHandler() { delete codec; } -void AsynchIOHandler::init(AsynchIO* a, int numBuffs) { +void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) { aio = a; + // Start timer for this connection + timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this); + timer.add(timeoutTimerTask); + // Give connection some buffers to use for (int i = 0; i < numBuffs; i++) { aio->queueReadBuffer(new Buff); @@ -143,6 +166,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { decoded = in.getPosition(); + // We've just got the protocol negotiation so we can cancel the timeout for that + timeoutTimerTask->cancel(); + QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); try { codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); @@ -202,6 +228,10 @@ void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { codec = factory->create(*this, identifier, SecuritySettings()); write(framing::ProtocolInitiation(codec->getVersion())); + // We've just sent the protocol negotiation so we can cancel the timeout for that + // This is not ideal, because we've not received anything yet, but heartbeats will + // be active soon + timeoutTimerTask->cancel(); return; } if (codec == 0) return; |
