diff options
Diffstat (limited to 'cpp/src/qpid/sys/ssl/SslHandler.cpp')
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.cpp | 41 |
1 files changed, 37 insertions, 4 deletions
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp index 67bf4ea893..8613059f28 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.cpp +++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp @@ -19,9 +19,9 @@ * */ #include "qpid/sys/ssl/SslHandler.h" - #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/Timer.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" @@ -42,6 +42,24 @@ struct Buff : public SslIO::BufferBase { { delete [] bytes;} }; +struct ProtocolTimeoutTask : public sys::TimerTask { + SslHandler& handler; + std::string id; + + ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) : + TimerTask(timeout, "ProtocolTimeout"), + handler(h), + id(i) + {} + + void fire() { + // If this fires it means that we didn't negotiate the connection in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection " << id << " No protocol received closing"); + handler.abort(); + } +}; + SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) : identifier(id), aio(0), @@ -55,12 +73,18 @@ SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict SslHandler::~SslHandler() { if (codec) codec->closed(); + if (timeoutTimerTask) + timeoutTimerTask->cancel(); delete codec; } -void SslHandler::init(SslIO* a, int numBuffs) { +void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) { aio = a; + // Start timer for this connection + timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this); + timer.add(timeoutTimerTask); + // Give connection some buffers to use for (int i = 0; i < numBuffs; i++) { aio->queueReadBuffer(new Buff); @@ -80,8 +104,10 @@ void SslHandler::write(const framing::ProtocolInitiation& data) } void SslHandler::abort() { - // TODO: can't implement currently as underlying functionality not implemented - // aio->requestCallback(boost::bind(&SslHandler::eof, this, _1)); + // Don't disconnect if we're already disconnecting + if (!readError) { + aio->requestCallback(boost::bind(&SslHandler::eof, this, _1)); + } } void SslHandler::activateOutput() { aio->notifyPendingWrite(); @@ -109,6 +135,9 @@ void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) { framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { + // We've just got the protocol negotiation so we can cancel the timeout for that + timeoutTimerTask->cancel(); + decoded = in.getPosition(); QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); try { @@ -169,6 +198,10 @@ void SslHandler::idle(SslIO&){ if (isClient && codec == 0) { codec = factory->create(*this, identifier, getSecuritySettings(aio)); write(framing::ProtocolInitiation(codec->getVersion())); + // We've just sent the protocol negotiation so we can cancel the timeout for that + // This is not ideal, because we've not received anything yet, but heartbeats will + // be active soon + timeoutTimerTask->cancel(); return; } if (codec == 0) return; |