summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/ssl/SslHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/ssl/SslHandler.cpp')
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.cpp41
1 files changed, 37 insertions, 4 deletions
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp
index 67bf4ea893..8613059f28 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.cpp
+++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp
@@ -19,9 +19,9 @@
*
*/
#include "qpid/sys/ssl/SslHandler.h"
-
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -42,6 +42,24 @@ struct Buff : public SslIO::BufferBase {
{ delete [] bytes;}
};
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ SslHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received closing");
+ handler.abort();
+ }
+};
+
SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) :
identifier(id),
aio(0),
@@ -55,12 +73,18 @@ SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict
SslHandler::~SslHandler() {
if (codec)
codec->closed();
+ if (timeoutTimerTask)
+ timeoutTimerTask->cancel();
delete codec;
}
-void SslHandler::init(SslIO* a, int numBuffs) {
+void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -80,8 +104,10 @@ void SslHandler::write(const framing::ProtocolInitiation& data)
}
void SslHandler::abort() {
- // TODO: can't implement currently as underlying functionality not implemented
- // aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
+ // Don't disconnect if we're already disconnecting
+ if (!readError) {
+ aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
+ }
}
void SslHandler::activateOutput() {
aio->notifyPendingWrite();
@@ -109,6 +135,9 @@ void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) {
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
+ // We've just got the protocol negotiation so we can cancel the timeout for that
+ timeoutTimerTask->cancel();
+
decoded = in.getPosition();
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
@@ -169,6 +198,10 @@ void SslHandler::idle(SslIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, getSecuritySettings(aio));
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the timeout for that
+ // This is not ideal, because we've not received anything yet, but heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;