diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/messaging/amqp/DriverImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SslTransport.cpp | 46 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SslTransport.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/TcpTransport.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/TcpTransport.h | 3 | ||||
-rw-r--r-- | cpp/src/ssl.cmake | 2 | ||||
-rw-r--r-- | cpp/src/ssl.mk | 6 |
7 files changed, 51 insertions, 42 deletions
diff --git a/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/cpp/src/qpid/messaging/amqp/DriverImpl.cpp index 0c119c87c8..16307b3c22 100644 --- a/cpp/src/qpid/messaging/amqp/DriverImpl.cpp +++ b/cpp/src/qpid/messaging/amqp/DriverImpl.cpp @@ -53,7 +53,7 @@ void DriverImpl::stop() boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection) { boost::shared_ptr<Transport> t(Transport::create(protocol, connection, poller)); - if (!t) throw new qpid::messaging::ConnectionError("No such transport: " + protocol); + if (!t) throw qpid::messaging::ConnectionError("No such transport: " + protocol); return t; } diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.cpp b/cpp/src/qpid/messaging/amqp/SslTransport.cpp index a02b0d7052..ea2375cb26 100644 --- a/cpp/src/qpid/messaging/amqp/SslTransport.cpp +++ b/cpp/src/qpid/messaging/amqp/SslTransport.cpp @@ -20,7 +20,8 @@ */ #include "SslTransport.h" #include "TransportContext.h" -#include "qpid/sys/ssl/SslIo.h" +#include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/Poller.h" #include "qpid/log/Statement.h" @@ -51,18 +52,19 @@ struct StaticInit } -SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), aio(0), poller(p) {} +SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) {} void SslTransport::connect(const std::string& host, const std::string& port) { + assert(!connector); assert(!aio); - try { - socket.connect(host, port); - connected(socket); - } catch (const std::exception& e) { - failed(e.what()); - } + connector = AsynchConnector::create( + socket, + host, port, + boost::bind(&SslTransport::connected, this, _1), + boost::bind(&SslTransport::failed, this, _3)); + connector->start(poller); } void SslTransport::failed(const std::string& msg) @@ -72,22 +74,22 @@ void SslTransport::failed(const std::string& msg) context.closed(); } -void SslTransport::connected(const SslSocket&) +void SslTransport::connected(const Socket&) { context.opened(); - aio = new SslIO(socket, - boost::bind(&SslTransport::read, this, _1, _2), - boost::bind(&SslTransport::eof, this, _1), - boost::bind(&SslTransport::disconnected, this, _1), - boost::bind(&SslTransport::socketClosed, this, _1, _2), - 0, // nobuffs - boost::bind(&SslTransport::write, this, _1)); + aio = AsynchIO::create(socket, + boost::bind(&SslTransport::read, this, _1, _2), + boost::bind(&SslTransport::eof, this, _1), + boost::bind(&SslTransport::disconnected, this, _1), + boost::bind(&SslTransport::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&SslTransport::write, this, _1)); aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes id = boost::str(boost::format("[%1%]") % socket.getFullAddress()); aio->start(poller); } -void SslTransport::read(SslIO&, SslIO::BufferBase* buffer) +void SslTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer) { int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount); if (decoded < buffer->dataCount) { @@ -101,10 +103,10 @@ void SslTransport::read(SslIO&, SslIO::BufferBase* buffer) } } -void SslTransport::write(SslIO&) +void SslTransport::write(AsynchIO&) { if (context.getCodec().canEncode()) { - SslIO::BufferBase* buffer = aio->getQueuedBuffer(); + AsynchIO::BufferBase* buffer = aio->getQueuedBuffer(); if (buffer) { size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount); @@ -123,18 +125,18 @@ void SslTransport::close() aio->queueWriteClose(); } -void SslTransport::eof(SslIO&) +void SslTransport::eof(AsynchIO&) { close(); } -void SslTransport::disconnected(SslIO&) +void SslTransport::disconnected(AsynchIO&) { close(); socketClosed(*aio, socket); } -void SslTransport::socketClosed(SslIO&, const SslSocket&) +void SslTransport::socketClosed(AsynchIO&, const Socket&) { if (aio) aio->queueForDeletion(); diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.h b/cpp/src/qpid/messaging/amqp/SslTransport.h index e83c3346e6..120bd983c1 100644 --- a/cpp/src/qpid/messaging/amqp/SslTransport.h +++ b/cpp/src/qpid/messaging/amqp/SslTransport.h @@ -30,10 +30,9 @@ namespace qpid { namespace sys { class ConnectionCodec; class Poller; -namespace ssl { -class SslIO; -class SslIOBufferBase; -} +class AsynchConnector; +class AsynchIO; +class AsynchIOBufferBase; } namespace messaging { @@ -54,18 +53,19 @@ class SslTransport : public Transport private: qpid::sys::ssl::SslSocket socket; TransportContext& context; - qpid::sys::ssl::SslIO* aio; + qpid::sys::AsynchConnector* connector; + qpid::sys::AsynchIO* aio; boost::shared_ptr<qpid::sys::Poller> poller; bool closed; std::string id; - void connected(const qpid::sys::ssl::SslSocket&); + void connected(const qpid::sys::Socket&); void failed(const std::string& msg); - void read(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); - void write(qpid::sys::ssl::SslIO&); - void eof(qpid::sys::ssl::SslIO&); - void disconnected(qpid::sys::ssl::SslIO&); - void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); + void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void write(qpid::sys::AsynchIO&); + void eof(qpid::sys::AsynchIO&); + void disconnected(qpid::sys::AsynchIO&); + void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&); friend class DriverImpl; }; diff --git a/cpp/src/qpid/messaging/amqp/TcpTransport.cpp b/cpp/src/qpid/messaging/amqp/TcpTransport.cpp index 5105c9f384..98022d634c 100644 --- a/cpp/src/qpid/messaging/amqp/TcpTransport.cpp +++ b/cpp/src/qpid/messaging/amqp/TcpTransport.cpp @@ -48,14 +48,14 @@ struct StaticInit } init; } -TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) {} +TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p) {} void TcpTransport::connect(const std::string& host, const std::string& port) { assert(!connector); assert(!aio); connector = AsynchConnector::create( - socket, + *socket, host, port, boost::bind(&TcpTransport::connected, this, _1), boost::bind(&TcpTransport::failed, this, _3)); @@ -67,7 +67,7 @@ void TcpTransport::failed(const std::string& msg) { QPID_LOG(debug, "Failed to connect: " << msg); connector = 0; - socket.close(); + socket->close(); context.closed(); } @@ -75,7 +75,7 @@ void TcpTransport::connected(const Socket&) { context.opened(); connector = 0; - aio = AsynchIO::create(socket, + aio = AsynchIO::create(*socket, boost::bind(&TcpTransport::read, this, _1, _2), boost::bind(&TcpTransport::eof, this, _1), boost::bind(&TcpTransport::disconnected, this, _1), @@ -83,7 +83,7 @@ void TcpTransport::connected(const Socket&) 0, // nobuffs boost::bind(&TcpTransport::write, this, _1)); aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes - id = boost::str(boost::format("[%1%]") % socket.getFullAddress()); + id = boost::str(boost::format("[%1%]") % socket->getFullAddress()); aio->start(poller); } @@ -131,7 +131,7 @@ void TcpTransport::eof(AsynchIO&) void TcpTransport::disconnected(AsynchIO&) { close(); - socketClosed(*aio, socket); + socketClosed(*aio, *socket); } void TcpTransport::socketClosed(AsynchIO&, const Socket&) diff --git a/cpp/src/qpid/messaging/amqp/TcpTransport.h b/cpp/src/qpid/messaging/amqp/TcpTransport.h index 142b36ba8c..b9031dcee2 100644 --- a/cpp/src/qpid/messaging/amqp/TcpTransport.h +++ b/cpp/src/qpid/messaging/amqp/TcpTransport.h @@ -24,6 +24,7 @@ #include "qpid/messaging/amqp/Transport.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" +#include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> namespace qpid { @@ -51,7 +52,7 @@ class TcpTransport : public Transport void giveReadCredit(int32_t) {} private: - qpid::sys::Socket socket; + boost::scoped_ptr<qpid::sys::Socket> socket; TransportContext& context; qpid::sys::AsynchConnector* connector; qpid::sys::AsynchIO* aio; diff --git a/cpp/src/ssl.cmake b/cpp/src/ssl.cmake index 8e9e270c00..b1a1ba9fa3 100644 --- a/cpp/src/ssl.cmake +++ b/cpp/src/ssl.cmake @@ -100,7 +100,7 @@ if (BUILD_SSL) DESTINATION ${QPIDD_MODULE_DIR} COMPONENT ${QPID_COMPONENT_BROKER}) - add_library (sslconnector MODULE qpid/client/SslConnector.cpp) + add_library (sslconnector MODULE qpid/client/SslConnector.cpp qpid/messaging/amqp/SslTransport.cpp) target_link_libraries (sslconnector qpidclient sslcommon) set_target_properties (sslconnector PROPERTIES PREFIX "" diff --git a/cpp/src/ssl.mk b/cpp/src/ssl.mk index 89e7ed8049..24ba8f585e 100644 --- a/cpp/src/ssl.mk +++ b/cpp/src/ssl.mk @@ -48,6 +48,12 @@ dmoduleexec_LTLIBRARIES += ssl.la sslconnector_la_SOURCES = \ qpid/client/SslConnector.cpp +if HAVE_PROTON +sslconnector_la_SOURCES += \ + qpid/messaging/amqp/SslTransport.cpp +endif #HAVE_PROTON + + sslconnector_la_LIBADD = \ libqpidclient.la \ libsslcommon.la |