diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-06-03 13:52:51 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-06-03 13:52:51 +0000 |
commit | eaa1184eaa80a1095daf784260aa8b2434aa710a (patch) | |
tree | bbb17863d02201b998572bdc2cec6b2254a50c29 /qpid/cpp/src | |
parent | 0e4541fb7ce30f66025aa3371ad6887066f3f80d (diff) | |
download | qpid-python-eaa1184eaa80a1095daf784260aa8b2434aa710a.tar.gz |
Revert "QPID-1879 Don't use a thread for every new client Connection"
This reverts commit b54680d4b3341fa280a237a6d80952b9830ae3c5.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@781378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 92 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.cpp | 96 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.h | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/RdmaConnector.cpp | 61 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SslConnector.cpp | 82 |
5 files changed, 181 insertions, 183 deletions
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index ccaa8c0b87..6639f92324 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,13 +18,7 @@ * under the License. * */ - #include "ConnectionImpl.h" - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" @@ -34,16 +28,11 @@ #include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/SystemInfo.h" -#include "qpid/Options.h" #include <boost/bind.hpp> #include <boost/format.hpp> -#include <boost/shared_ptr.hpp> #include <limits> -#include <vector> namespace qpid { namespace client { @@ -53,10 +42,7 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -namespace { -// Maybe should amalgamate the singletons into a single client singleton - -// Get timer singleton +// Get timer singleton Timer& theTimer() { static Mutex timerInitLock; ScopedLock<Mutex> l(timerInitLock); @@ -65,73 +51,6 @@ Timer& theTimer() { return t; } -struct IOThreadOptions : public qpid::Options { - int maxIOThreads; - - IOThreadOptions(int c) : - Options("IO threading options"), - maxIOThreads(c) - { - addOptions() - ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use"); - } -}; - -// IO threads -class IOThread { - int maxIOThreads; - int ioThreads; - int connections; - Mutex threadLock; - std::vector<Thread> t; - Poller::shared_ptr poller_; - -public: - void add() { - ScopedLock<Mutex> l(threadLock); - ++connections; - if (ioThreads < maxIOThreads) { - QPID_LOG(debug, "Created IO thread: " << ioThreads); - ++ioThreads; - t.push_back( Thread(poller_.get()) ); - } - } - - void sub() { - ScopedLock<Mutex> l(threadLock); - --connections; - } - - Poller::shared_ptr poller() const { - return poller_; - } - - // Here is where the maximum number of threads is set - IOThread(int c) : - ioThreads(0), - connections(0), - poller_(new Poller) - { - IOThreadOptions options(c); - options.parse(0, 0, QPIDC_CONF_FILE, true); - maxIOThreads = (options.maxIOThreads != -1) ? - options.maxIOThreads : 1; - } - - // We can't destroy threads one-by-one as the only - // control we have is to shutdown the whole lot - // and we can't do that before we're unloaded as we can't - // restart the Poller after shutting it down - ~IOThread() { - poller_->shutdown(); - for (int i=0; i<ioThreads; ++i) { - t[i].join(); - } - } -}; - -static IOThread io(SystemInfo::concurrency()); - class HeartbeatTask : public TimerTask { TimeoutHandler& timeout; @@ -148,8 +67,6 @@ public: {} }; -} - ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), @@ -173,7 +90,6 @@ ConnectionImpl::~ConnectionImpl() { // is running. failover.reset(); if (connector) connector->close(); - io.sub(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) @@ -210,6 +126,7 @@ bool ConnectionImpl::isOpen() const return handler.isOpen(); } + void ConnectionImpl::open() { const std::string& protocol = handler.protocol; @@ -217,8 +134,7 @@ void ConnectionImpl::open() int port = handler.port; QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); - io.add(); - connector.reset(Connector::create(protocol, io.poller(), version, handler, this)); + connector.reset(Connector::create(protocol, version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); connector->connect(host, port); @@ -322,7 +238,7 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() { return handler; } - + std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index 946bf0138d..1558f292aa 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -27,11 +27,9 @@ #include "qpid/sys/Codec.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" -#include "qpid/sys/Socket.h" #include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" @@ -53,23 +51,21 @@ using boost::str; // Stuff for the registry of protocol connectors (maybe should be moved to its own file) namespace { typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; - + ProtocolRegistry& theProtocolRegistry() { static ProtocolRegistry protocolRegistry; - + return protocolRegistry; } } -Connector* Connector::create(const std::string& proto, - Poller::shared_ptr p, - framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) +Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); if (i==theProtocolRegistry().end()) { throw Exception(QPID_MSG("Unknown protocol: " << proto)); } - return (i->second)(p, v, s, c); + return (i->second)(v, s, c); } void Connector::registerFactory(const std::string& proto, Factory* connectorFactory) @@ -85,7 +81,7 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>) { } -class TCPConnector : public Connector, public sys::Codec +class TCPConnector : public Connector, public sys::Codec, private sys::Runnable { typedef std::deque<framing::AMQFrame> Frames; struct Buff; @@ -97,7 +93,7 @@ class TCPConnector : public Connector, public sys::Codec size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; - + framing::ProtocolVersion version; bool initiated; bool closed; @@ -108,25 +104,28 @@ class TCPConnector : public Connector, public sys::Codec framing::InitiationHandler* initialiser; framing::OutputHandler* output; + sys::Thread receiver; + sys::Socket socket; sys::AsynchIO* aio; std::string identifier; - Poller::shared_ptr poller; + boost::shared_ptr<sys::Poller> poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~TCPConnector(); + void run(); void handleClosed(); bool closeInternal(); - + bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); boost::weak_ptr<ConnectionImpl> impl; - + void connect(const std::string& host, int port); void init(); void close(); @@ -143,23 +142,18 @@ class TCPConnector : public Connector, public sys::Codec size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool canEncode(); + public: - TCPConnector(Poller::shared_ptr, - framing::ProtocolVersion pVersion, - const ConnectionSettings&, + TCPConnector(framing::ProtocolVersion pVersion, + const ConnectionSettings&, ConnectionImpl*); }; -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new TCPConnector(p, v, s, c); + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(v, s, c); } struct StaticInit { @@ -169,21 +163,19 @@ namespace { } init; } -TCPConnector::TCPConnector(Poller::shared_ptr p, - ProtocolVersion ver, +TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), lastEof(0), currentSize(0), bounds(cimpl), - version(ver), + version(ver), initiated(false), closed(true), joined(true), shutdownHandler(0), aio(0), - poller(p), impl(cimpl->shared_from_this()) { QPID_LOG(debug, "TCPConnector created for " << version.toString()); @@ -205,6 +197,7 @@ void TCPConnector::connect(const std::string& host, int port){ } identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + poller = Poller::shared_ptr(new Poller); aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), @@ -221,24 +214,28 @@ void TCPConnector::init(){ ProtocolInitiation init(version); writeDataBlock(init); joined = false; - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); + receiver = Thread(this); } bool TCPConnector::closeInternal() { + bool ret; + { Mutex::ScopedLock l(lock); - bool ret = !closed; + ret = !closed; if (!closed) { closed = true; aio->queueForDeletion(); - socket.close(); + poller->shutdown(); + } + if (joined || receiver.id() == Thread::current().id()) { + return ret; + } + joined = true; } + receiver.join(); return ret; } - + void TCPConnector::close() { closeInternal(); } @@ -288,13 +285,18 @@ void TCPConnector::handleClosed() { shutdownHandler->shutdown(); } +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + void TCPConnector::writebuff(AsynchIO& /*aio*/) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; if (codec->canEncode()) { std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; @@ -380,6 +382,28 @@ void TCPConnector::eof(AsynchIO&) { handleClosed(); } +void TCPConnector::run() { + // Keep the connection impl in memory until run() completes. + boost::shared_ptr<ConnectionImpl> protect = impl.lock(); + assert(protect); + try { + Dispatcher d(poller); + + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + + aio->start(poller); + d.run(); + } catch (const std::exception& e) { + QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); + handleClosed(); + } + try { + socket.close(); + } catch (const std::exception&) {} +} + void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h index 880c81affe..78ddaa33cd 100644 --- a/qpid/cpp/src/qpid/client/Connector.h +++ b/qpid/cpp/src/qpid/client/Connector.h @@ -22,24 +22,27 @@ #define _Connector_ +#include "qpid/framing/InputHandler.h" #include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" - +#include "qpid/sys/ShutdownHandler.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/Time.h" + +#include <queue> +#include <boost/weak_ptr.hpp> #include <boost/shared_ptr.hpp> -#include <string> - namespace qpid { namespace sys { -class ShutdownHandler; class SecurityLayer; -class Poller; -} - -namespace framing { -class InputHandler; -class AMQFrame; } namespace client { @@ -49,14 +52,11 @@ class ConnectionImpl; ///@internal class Connector : public framing::OutputHandler -{ +{ public: // Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future) - typedef Connector* Factory(boost::shared_ptr<qpid::sys::Poller>, - framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); - static Connector* create(const std::string& proto, - boost::shared_ptr<qpid::sys::Poller>, - framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); + typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); + static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); static void registerFactory(const std::string& proto, Factory* connectorFactory); virtual ~Connector() {}; @@ -73,6 +73,7 @@ class Connector : public framing::OutputHandler virtual const std::string& getIdentifier() const = 0; virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); + }; }} diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index f6bedf63f5..ad85104f3a 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -26,7 +26,6 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" @@ -49,7 +48,7 @@ using namespace qpid::framing; using boost::format; using boost::str; - class RdmaConnector : public Connector, public sys::Codec + class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable { struct Buff; @@ -61,12 +60,13 @@ using boost::str; Frames frames; size_t lastEof; // Position after last EOF in frames uint64_t currentSize; - Bounds* bounds; - + Bounds* bounds; + + framing::ProtocolVersion version; bool initiated; - sys::Mutex pollingLock; + sys::Mutex pollingLock; bool polling; bool joined; @@ -75,12 +75,15 @@ using boost::str; framing::InitiationHandler* initialiser; framing::OutputHandler* output; + sys::Thread receiver; + Rdma::AsynchIO* aio; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); + void run(); void handleClosed(); bool closeInternal(); @@ -98,7 +101,7 @@ using boost::str; std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); @@ -116,16 +119,15 @@ using boost::str; bool canEncode(); public: - RdmaConnector(Poller::shared_ptr, - framing::ProtocolVersion pVersion, + RdmaConnector(framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; // Static constructor which registers connector here namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new RdmaConnector(p, v, s, c); + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new RdmaConnector(v, s, c); } struct StaticInit { @@ -137,8 +139,7 @@ namespace { } -RdmaConnector::RdmaConnector(Poller::shared_ptr p, - ProtocolVersion ver, +RdmaConnector::RdmaConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -151,7 +152,6 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, joined(true), shutdownHandler(0), aio(0), - poller(p), impl(cimpl) { QPID_LOG(debug, "RdmaConnector created for " << version); @@ -165,6 +165,7 @@ void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); assert(joined); + poller = Poller::shared_ptr(new Poller); // This stuff needs to abstracted out of here to a platform specific file ::addrinfo *res; @@ -189,6 +190,7 @@ void RdmaConnector::connect(const std::string& host, int port){ polling = true; joined = false; + receiver = Thread(this); } // The following only gets run when connected @@ -224,14 +226,23 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv bool RdmaConnector::closeInternal() { bool ret; + { Mutex::ScopedLock l(pollingLock); ret = polling; if (polling) { polling = false; + poller->shutdown(); } + if (joined || receiver.id() == Thread::current().id()) { + return ret; + } + joined = true; + } + + receiver.join(); return ret; } - + void RdmaConnector::close() { closeInternal(); } @@ -355,6 +366,28 @@ void RdmaConnector::eof(Rdma::AsynchIO&) { handleClosed(); } +void RdmaConnector::run(){ + // Keep the connection impl in memory until run() completes. + //GRS: currently the ConnectionImpls destructor is where the Io thread is joined + //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); + //assert(protect); + try { + Dispatcher d(poller); + + //aio->start(poller); + d.run(); + //aio->queueForDeletion(); + } catch (const std::exception& e) { + { + // We're no longer polling + Mutex::ScopedLock l(pollingLock); + polling = false; + } + QPID_LOG(error, e.what()); + handleClosed(); + } +} + void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp index 8194371b8a..7b0bcc6f1e 100644 --- a/qpid/cpp/src/qpid/client/SslConnector.cpp +++ b/qpid/cpp/src/qpid/client/SslConnector.cpp @@ -28,7 +28,6 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" @@ -51,7 +50,7 @@ using boost::format; using boost::str; -class SslConnector : public Connector +class SslConnector : public Connector, private sys::Runnable { struct Buff; @@ -69,25 +68,25 @@ class SslConnector : public Connector framing::Buffer encode; size_t framesEncoded; std::string identifier; - Bounds* bounds; - + Bounds* bounds; + void writeOne(); void newBuffer(); public: - + Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, sys::ssl::SslIO*); void handle(framing::AMQFrame&); void write(sys::ssl::SslIO&); }; - + const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; - sys::Mutex closedLock; + sys::Mutex closedLock; bool closed; bool joined; @@ -97,17 +96,20 @@ class SslConnector : public Connector framing::OutputHandler* output; Writer writer; + + sys::Thread receiver; sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; - Poller::shared_ptr poller; + boost::shared_ptr<sys::Poller> poller; ~SslConnector(); + void run(); void handleClosed(); bool closeInternal(); - + void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); void writebuff(qpid::sys::ssl::SslIO&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -116,7 +118,7 @@ class SslConnector : public Connector std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void init(); void close(); @@ -130,20 +132,15 @@ class SslConnector : public Connector const std::string& getIdentifier() const; public: - SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, + SslConnector(framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new SslConnector(p, v, s, c); + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new SslConnector(v, s, c); } struct StaticInit { @@ -152,9 +149,9 @@ namespace { SslOptions options; options.parse (0, 0, QPIDC_CONF_FILE, true); if (options.certDbPath.empty()) { - QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); + QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); } else { - initNSS(options); + initNSS(options); Connector::registerFactory("ssl", &create); } } catch (const std::exception& e) { @@ -166,8 +163,7 @@ namespace { } init; } -SslConnector::SslConnector(Poller::shared_ptr p, - ProtocolVersion ver, +SslConnector::SslConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -178,7 +174,6 @@ SslConnector::SslConnector(Poller::shared_ptr p, shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), - poller(p), impl(cimpl) { QPID_LOG(debug, "SslConnector created for " << version.toString()); @@ -202,6 +197,7 @@ void SslConnector::connect(const std::string& host, int port){ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; + poller = Poller::shared_ptr(new Poller); aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), @@ -218,10 +214,7 @@ void SslConnector::init(){ ProtocolInitiation init(version); writeDataBlock(init); joined = false; - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - aio->start(poller); + receiver = Thread(this); } bool SslConnector::closeInternal() { @@ -230,11 +223,16 @@ bool SslConnector::closeInternal() { if (!closed) { closed = true; aio->queueForDeletion(); - socket.close(); + poller->shutdown(); + } + if (!joined && receiver.id() != Thread::current().id()) { + joined = true; + Mutex::ScopedUnlock u(closedLock); + receiver.join(); } return ret; } - + void SslConnector::close() { closeInternal(); } @@ -268,6 +266,11 @@ void SslConnector::handleClosed() { shutdownHandler->shutdown(); } +struct SslConnector::Buff : public SslIO::BufferBase { + Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -372,4 +375,25 @@ void SslConnector::eof(SslIO&) { handleClosed(); } +void SslConnector::run(){ + // Keep the connection impl in memory until run() completes. + boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); + assert(protect); + try { + Dispatcher d(poller); + + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + + aio->start(poller); + d.run(); + socket.close(); + } catch (const std::exception& e) { + QPID_LOG(error, e.what()); + handleClosed(); + } +} + + }} // namespace qpid::client |