diff options
Diffstat (limited to 'cpp/src/qpid/client/SslConnector.cpp')
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 56 |
1 files changed, 37 insertions, 19 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index c49deaa279..3a146592e6 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -30,8 +30,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" -#include "qpid/sys/ssl/SslIo.h" +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" #include "qpid/sys/SecuritySettings.h" @@ -72,7 +73,8 @@ class SslConnector : public Connector sys::ssl::SslSocket socket; - sys::ssl::SslIO* aio; + sys::AsynchConnector* connector; + sys::AsynchIO* aio; std::string identifier; Poller::shared_ptr poller; SecuritySettings securitySettings; @@ -86,6 +88,8 @@ class SslConnector : public Connector void disconnected(AsynchIO&); void connect(const std::string& host, const std::string& port); + void connected(const sys::Socket&); + void connectFailed(const std::string& msg); void close(); void send(framing::AMQFrame& frame); void abort() {} // TODO: Need to fix for heartbeat timeouts to work @@ -164,24 +168,28 @@ SslConnector::~SslConnector() { close(); } -void SslConnector::connect(const std::string& host, const std::string& port){ +void SslConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); - try { - socket.connect(SocketAddress(host, port)); - } catch (const std::exception& e) { - socket.close(); - throw TransportFailure(e.what()); - } - + connector = AsynchConnector::create( + socket, + host, port, + boost::bind(&SslConnector::connected, this, _1), + boost::bind(&SslConnector::connectFailed, this, _3)); closed = false; - aio = new SslIO(socket, - boost::bind(&SslConnector::readbuff, this, _1, _2), - boost::bind(&SslConnector::eof, this, _1), - boost::bind(&SslConnector::disconnected, this, _1), - boost::bind(&SslConnector::socketClosed, this, _1, _2), - 0, // nobuffs - boost::bind(&SslConnector::writebuff, this, _1)); + + connector->start(poller); +} + +void SslConnector::connected(const Socket&) { + connector = 0; + aio = AsynchIO::create(socket, + boost::bind(&SslConnector::readbuff, this, _1, _2), + boost::bind(&SslConnector::eof, this, _1), + boost::bind(&SslConnector::disconnected, this, _1), + boost::bind(&SslConnector::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&SslConnector::writebuff, this, _1)); aio->createBuffers(maxFrameSize); identifier = str(format("[%1%]") % socket.getFullAddress()); @@ -190,6 +198,16 @@ void SslConnector::connect(const std::string& host, const std::string& port){ aio->start(poller); } +void SslConnector::connectFailed(const std::string& msg) { + connector = 0; + QPID_LOG(warning, "Connect failed: " << msg); + socket.close(); + if (!closed) + closed = true; + if (shutdownHandler) + shutdownHandler->shutdown(); +} + void SslConnector::close() { Mutex::ScopedLock l(lock); if (!closed) { @@ -265,7 +283,7 @@ void SslConnector::writebuff(AsynchIO& /*aio*/) return; } - SslIO::BufferBase* buffer = aio->getQueuedBuffer(); + AsynchIOBufferBase* buffer = aio->getQueuedBuffer(); if (buffer) { size_t encoded = encode(buffer->bytes, buffer->byteCount); @@ -343,7 +361,7 @@ size_t SslConnector::decode(const char* buffer, size_t size) } void SslConnector::writeDataBlock(const AMQDataBlock& data) { - SslIO::BufferBase* buff = aio->getQueuedBuffer(); + AsynchIOBufferBase* buff = aio->getQueuedBuffer(); assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); |