summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SslConnector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SslConnector.cpp')
-rw-r--r--cpp/src/qpid/client/SslConnector.cpp56
1 files changed, 37 insertions, 19 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp
index c49deaa279..3a146592e6 100644
--- a/cpp/src/qpid/client/SslConnector.cpp
+++ b/cpp/src/qpid/client/SslConnector.cpp
@@ -30,8 +30,9 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/ssl/util.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/SecuritySettings.h"
@@ -72,7 +73,8 @@ class SslConnector : public Connector
sys::ssl::SslSocket socket;
- sys::ssl::SslIO* aio;
+ sys::AsynchConnector* connector;
+ sys::AsynchIO* aio;
std::string identifier;
Poller::shared_ptr poller;
SecuritySettings securitySettings;
@@ -86,6 +88,8 @@ class SslConnector : public Connector
void disconnected(AsynchIO&);
void connect(const std::string& host, const std::string& port);
+ void connected(const sys::Socket&);
+ void connectFailed(const std::string& msg);
void close();
void send(framing::AMQFrame& frame);
void abort() {} // TODO: Need to fix for heartbeat timeouts to work
@@ -164,24 +168,28 @@ SslConnector::~SslConnector() {
close();
}
-void SslConnector::connect(const std::string& host, const std::string& port){
+void SslConnector::connect(const std::string& host, const std::string& port) {
Mutex::ScopedLock l(lock);
assert(closed);
- try {
- socket.connect(SocketAddress(host, port));
- } catch (const std::exception& e) {
- socket.close();
- throw TransportFailure(e.what());
- }
-
+ connector = AsynchConnector::create(
+ socket,
+ host, port,
+ boost::bind(&SslConnector::connected, this, _1),
+ boost::bind(&SslConnector::connectFailed, this, _3));
closed = false;
- aio = new SslIO(socket,
- boost::bind(&SslConnector::readbuff, this, _1, _2),
- boost::bind(&SslConnector::eof, this, _1),
- boost::bind(&SslConnector::disconnected, this, _1),
- boost::bind(&SslConnector::socketClosed, this, _1, _2),
- 0, // nobuffs
- boost::bind(&SslConnector::writebuff, this, _1));
+
+ connector->start(poller);
+}
+
+void SslConnector::connected(const Socket&) {
+ connector = 0;
+ aio = AsynchIO::create(socket,
+ boost::bind(&SslConnector::readbuff, this, _1, _2),
+ boost::bind(&SslConnector::eof, this, _1),
+ boost::bind(&SslConnector::disconnected, this, _1),
+ boost::bind(&SslConnector::socketClosed, this, _1, _2),
+ 0, // nobuffs
+ boost::bind(&SslConnector::writebuff, this, _1));
aio->createBuffers(maxFrameSize);
identifier = str(format("[%1%]") % socket.getFullAddress());
@@ -190,6 +198,16 @@ void SslConnector::connect(const std::string& host, const std::string& port){
aio->start(poller);
}
+void SslConnector::connectFailed(const std::string& msg) {
+ connector = 0;
+ QPID_LOG(warning, "Connect failed: " << msg);
+ socket.close();
+ if (!closed)
+ closed = true;
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
void SslConnector::close() {
Mutex::ScopedLock l(lock);
if (!closed) {
@@ -265,7 +283,7 @@ void SslConnector::writebuff(AsynchIO& /*aio*/)
return;
}
- SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+ AsynchIOBufferBase* buffer = aio->getQueuedBuffer();
if (buffer) {
size_t encoded = encode(buffer->bytes, buffer->byteCount);
@@ -343,7 +361,7 @@ size_t SslConnector::decode(const char* buffer, size_t size)
}
void SslConnector::writeDataBlock(const AMQDataBlock& data) {
- SslIO::BufferBase* buff = aio->getQueuedBuffer();
+ AsynchIOBufferBase* buff = aio->getQueuedBuffer();
assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);