diff options
Diffstat (limited to 'cpp/src/qpid/client/SslConnector.cpp')
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 82 |
1 files changed, 53 insertions, 29 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 8194371b8a..7b0bcc6f1e 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -28,7 +28,6 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" @@ -51,7 +50,7 @@ using boost::format; using boost::str; -class SslConnector : public Connector +class SslConnector : public Connector, private sys::Runnable { struct Buff; @@ -69,25 +68,25 @@ class SslConnector : public Connector framing::Buffer encode; size_t framesEncoded; std::string identifier; - Bounds* bounds; - + Bounds* bounds; + void writeOne(); void newBuffer(); public: - + Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, sys::ssl::SslIO*); void handle(framing::AMQFrame&); void write(sys::ssl::SslIO&); }; - + const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; - sys::Mutex closedLock; + sys::Mutex closedLock; bool closed; bool joined; @@ -97,17 +96,20 @@ class SslConnector : public Connector framing::OutputHandler* output; Writer writer; + + sys::Thread receiver; sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; - Poller::shared_ptr poller; + boost::shared_ptr<sys::Poller> poller; ~SslConnector(); + void run(); void handleClosed(); bool closeInternal(); - + void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); void writebuff(qpid::sys::ssl::SslIO&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -116,7 +118,7 @@ class SslConnector : public Connector std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void init(); void close(); @@ -130,20 +132,15 @@ class SslConnector : public Connector const std::string& getIdentifier() const; public: - SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, + SslConnector(framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new SslConnector(p, v, s, c); + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new SslConnector(v, s, c); } struct StaticInit { @@ -152,9 +149,9 @@ namespace { SslOptions options; options.parse (0, 0, QPIDC_CONF_FILE, true); if (options.certDbPath.empty()) { - QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); + QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); } else { - initNSS(options); + initNSS(options); Connector::registerFactory("ssl", &create); } } catch (const std::exception& e) { @@ -166,8 +163,7 @@ namespace { } init; } -SslConnector::SslConnector(Poller::shared_ptr p, - ProtocolVersion ver, +SslConnector::SslConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -178,7 +174,6 @@ SslConnector::SslConnector(Poller::shared_ptr p, shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), - poller(p), impl(cimpl) { QPID_LOG(debug, "SslConnector created for " << version.toString()); @@ -202,6 +197,7 @@ void SslConnector::connect(const std::string& host, int port){ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; + poller = Poller::shared_ptr(new Poller); aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), @@ -218,10 +214,7 @@ void SslConnector::init(){ ProtocolInitiation init(version); writeDataBlock(init); joined = false; - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - aio->start(poller); + receiver = Thread(this); } bool SslConnector::closeInternal() { @@ -230,11 +223,16 @@ bool SslConnector::closeInternal() { if (!closed) { closed = true; aio->queueForDeletion(); - socket.close(); + poller->shutdown(); + } + if (!joined && receiver.id() != Thread::current().id()) { + joined = true; + Mutex::ScopedUnlock u(closedLock); + receiver.join(); } return ret; } - + void SslConnector::close() { closeInternal(); } @@ -268,6 +266,11 @@ void SslConnector::handleClosed() { shutdownHandler->shutdown(); } +struct SslConnector::Buff : public SslIO::BufferBase { + Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -372,4 +375,25 @@ void SslConnector::eof(SslIO&) { handleClosed(); } +void SslConnector::run(){ + // Keep the connection impl in memory until run() completes. + boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); + assert(protect); + try { + Dispatcher d(poller); + + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + + aio->start(poller); + d.run(); + socket.close(); + } catch (const std::exception& e) { + QPID_LOG(error, e.what()); + handleClosed(); + } +} + + }} // namespace qpid::client |