diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-01-21 06:17:10 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-01-21 06:17:10 +0000 |
commit | df3fe9778d87dd256a2d4c08146d86830ac1e8be (patch) | |
tree | 23f64b58e3ec94c6024368d1b90910db9d711c84 /cpp/src | |
parent | 66266d1f34066c5960ae1eb4f28b8c7758cb46c9 (diff) | |
download | qpid-python-df3fe9778d87dd256a2d4c08146d86830ac1e8be.tar.gz |
QPID-1879 Don't use a thread for every new client Connection
- By default the max number of threads now used for network io
is the number of cpus available.
- This can be overridden with the QPID_MAX_IOTHREADS environment
variable or the config file
- The client threads are initialised (via a singleton) when first
used in a Connection::open()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901550 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 95 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 33 | ||||
-rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 72 | ||||
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 87 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 83 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.h | 22 |
7 files changed, 196 insertions, 220 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index cede7f7310..f348493fd0 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,7 +18,9 @@ * under the License. * */ + #include "qpid/client/ConnectionImpl.h" + #include "qpid/client/Connector.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/client/SessionImpl.h" @@ -27,11 +29,20 @@ #include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/Options.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/shared_ptr.hpp> #include <limits> +#include <vector> + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif namespace qpid { namespace client { @@ -41,7 +52,10 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -// Get timer singleton +namespace { +// Maybe should amalgamate the singletons into a single client singleton + +// Get timer singleton Timer& theTimer() { static Mutex timerInitLock; ScopedLock<Mutex> l(timerInitLock); @@ -50,6 +64,76 @@ Timer& theTimer() { return t; } +struct IOThreadOptions : public qpid::Options { + int maxIOThreads; + + IOThreadOptions(int c) : + Options("IO threading options"), + maxIOThreads(c) + { + addOptions() + ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use"); + } +}; + +// IO threads +class IOThread { + int maxIOThreads; + int ioThreads; + int connections; + Mutex threadLock; + std::vector<Thread> t; + Poller::shared_ptr poller_; + +public: + void add() { + ScopedLock<Mutex> l(threadLock); + ++connections; + if (ioThreads < maxIOThreads) { + QPID_LOG(debug, "Created IO thread: " << ioThreads); + ++ioThreads; + t.push_back( Thread(poller_.get()) ); + } + } + + void sub() { + ScopedLock<Mutex> l(threadLock); + --connections; + } + + Poller::shared_ptr poller() const { + return poller_; + } + + // Here is where the maximum number of threads is set + IOThread(int c) : + ioThreads(0), + connections(0), + poller_(new Poller) + { + IOThreadOptions options(c); + options.parse(0, 0, QPIDC_CONF_FILE, true); + maxIOThreads = (options.maxIOThreads != -1) ? + options.maxIOThreads : 1; + } + + // We can't destroy threads one-by-one as the only + // control we have is to shutdown the whole lot + // and we can't do that before we're unloaded as we can't + // restart the Poller after shutting it down + ~IOThread() { + poller_->shutdown(); + for (int i=0; i<ioThreads; ++i) { + t[i].join(); + } + } +}; + +IOThread& theIO() { + static IOThread io(SystemInfo::concurrency()); + return io; +} + class HeartbeatTask : public TimerTask { TimeoutHandler& timeout; @@ -66,6 +150,8 @@ public: {} }; +} + ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), @@ -89,6 +175,7 @@ ConnectionImpl::~ConnectionImpl() { // connector thread does not call on us while the destructor // is running. if (connector) connector->close(); + theIO().sub(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) @@ -131,11 +218,10 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) } bool ConnectionImpl::isOpen() const -{ +{ return handler.isOpen(); } - void ConnectionImpl::open() { const std::string& protocol = handler.protocol; @@ -143,7 +229,8 @@ void ConnectionImpl::open() int port = handler.port; QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); - connector.reset(Connector::create(protocol, version, handler, this)); + theIO().add(); + connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); connector->connect(host, port); diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 2c4feffdcf..71a49ad54a 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -21,32 +21,17 @@ #include "qpid/client/Connector.h" -#include "qpid/client/ConnectionImpl.h" -#include "qpid/client/ConnectionSettings.h" +#include "qpid/Exception.h" #include "qpid/log/Statement.h" -#include "qpid/sys/Codec.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/sys/AsynchIO.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/SecurityLayer.h" -#include "qpid/Msg.h" -#include <iostream> #include <map> -#include <boost/bind.hpp> -#include <boost/format.hpp> namespace qpid { namespace client { using namespace qpid::sys; using namespace qpid::framing; -using boost::format; -using boost::str; -// Stuff for the registry of protocol connectors (maybe should be moved to its own file) namespace { typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; @@ -57,13 +42,15 @@ namespace { } } -Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) +Connector* Connector::create(const std::string& proto, + boost::shared_ptr<Poller> p, + framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); if (i==theProtocolRegistry().end()) { throw Exception(QPID_MSG("Unknown protocol: " << proto)); } - return (i->second)(v, s, c); + return (i->second)(p, v, s, c); } void Connector::registerFactory(const std::string& proto, Factory* connectorFactory) @@ -79,4 +66,5 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>) { } + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 3a49ae9012..0203895b00 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -22,27 +22,24 @@ #define _Connector_ -#include "qpid/framing/InputHandler.h" #include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" -#include "qpid/sys/ShutdownHandler.h" -#include "qpid/sys/TimeoutHandler.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Socket.h" -#include "qpid/sys/Time.h" - -#include <queue> -#include <boost/weak_ptr.hpp> + #include <boost/shared_ptr.hpp> +#include <string> + namespace qpid { namespace sys { +class ShutdownHandler; class SecurityLayer; +class Poller; +} + +namespace framing { +class InputHandler; +class AMQFrame; } namespace client { @@ -52,11 +49,14 @@ class ConnectionImpl; ///@internal class Connector : public framing::OutputHandler -{ +{ public: // Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future) - typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); - static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); + typedef Connector* Factory(boost::shared_ptr<qpid::sys::Poller>, + framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); + static Connector* create(const std::string& proto, + boost::shared_ptr<qpid::sys::Poller>, + framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); static void registerFactory(const std::string& proto, Factory* connectorFactory); virtual ~Connector() {}; @@ -75,7 +75,6 @@ class Connector : public framing::OutputHandler virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); virtual unsigned int getSSF() = 0; - }; }} diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index ea3566dacb..e51ee95c61 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -26,6 +26,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" @@ -48,7 +49,7 @@ using namespace qpid::framing; using boost::format; using boost::str; - class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable + class RdmaConnector : public Connector, public sys::Codec { struct Buff; @@ -60,30 +61,25 @@ using boost::str; Frames frames; size_t lastEof; // Position after last EOF in frames uint64_t currentSize; - Bounds* bounds; - - + Bounds* bounds; + framing::ProtocolVersion version; bool initiated; - sys::Mutex pollingLock; + sys::Mutex pollingLock; bool polling; - bool joined; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; framing::InitiationHandler* initialiser; framing::OutputHandler* output; - sys::Thread receiver; - Rdma::AsynchIO* aio; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); - void run(); void handleClosed(); bool closeInternal(); @@ -101,7 +97,7 @@ using boost::str; std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); @@ -120,15 +116,16 @@ using boost::str; bool canEncode(); public: - RdmaConnector(framing::ProtocolVersion pVersion, + RdmaConnector(Poller::shared_ptr, + framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new RdmaConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new RdmaConnector(p, v, s, c); } struct StaticInit { @@ -140,7 +137,8 @@ namespace { } -RdmaConnector::RdmaConnector(ProtocolVersion ver, +RdmaConnector::RdmaConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -150,9 +148,9 @@ RdmaConnector::RdmaConnector(ProtocolVersion ver, version(ver), initiated(false), polling(false), - joined(true), shutdownHandler(0), aio(0), + poller(p), impl(cimpl) { QPID_LOG(debug, "RdmaConnector created for " << version); @@ -165,8 +163,6 @@ RdmaConnector::~RdmaConnector() { void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); - assert(joined); - poller = Poller::shared_ptr(new Poller); SocketAddress sa(host, boost::lexical_cast<std::string>(port)); Rdma::Connector* c = new Rdma::Connector( @@ -179,8 +175,6 @@ void RdmaConnector::connect(const std::string& host, int port){ c->start(poller); polling = true; - joined = false; - receiver = Thread(this); } // The following only gets run when connected @@ -215,24 +209,12 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv } bool RdmaConnector::closeInternal() { - bool ret; - { Mutex::ScopedLock l(pollingLock); - ret = polling; - if (polling) { - polling = false; - poller->shutdown(); - } - if (joined || receiver.id() == Thread::current().id()) { - return ret; - } - joined = true; - } - - receiver.join(); + bool ret = polling; + polling = false; return ret; } - + void RdmaConnector::close() { closeInternal(); } @@ -356,28 +338,6 @@ void RdmaConnector::eof(Rdma::AsynchIO&) { handleClosed(); } -void RdmaConnector::run(){ - // Keep the connection impl in memory until run() completes. - //GRS: currently the ConnectionImpls destructor is where the Io thread is joined - //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); - //assert(protect); - try { - Dispatcher d(poller); - - //aio->start(poller); - d.run(); - //aio->queueForDeletion(); - } catch (const std::exception& e) { - { - // We're no longer polling - Mutex::ScopedLock l(pollingLock); - polling = false; - } - QPID_LOG(error, e.what()); - handleClosed(); - } -} - void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 2b34651fa0..cf6d54d261 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -28,6 +28,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" @@ -50,7 +51,7 @@ using boost::format; using boost::str; -class SslConnector : public Connector, private sys::Runnable +class SslConnector : public Connector { struct Buff; @@ -68,27 +69,26 @@ class SslConnector : public Connector, private sys::Runnable framing::Buffer encode; size_t framesEncoded; std::string identifier; - Bounds* bounds; - + Bounds* bounds; + void writeOne(); void newBuffer(); public: - + Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, sys::ssl::SslIO*); void handle(framing::AMQFrame&); void write(sys::ssl::SslIO&); }; - + const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; - sys::Mutex closedLock; + sys::Mutex closedLock; bool closed; - bool joined; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; @@ -96,20 +96,17 @@ class SslConnector : public Connector, private sys::Runnable framing::OutputHandler* output; Writer writer; - - sys::Thread receiver; sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; - boost::shared_ptr<sys::Poller> poller; + Poller::shared_ptr poller; ~SslConnector(); - void run(); void handleClosed(); bool closeInternal(); - + void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); void writebuff(qpid::sys::ssl::SslIO&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -117,8 +114,6 @@ class SslConnector : public Connector, private sys::Runnable std::string identifier; - ConnectionImpl* impl; - void connect(const std::string& host, int port); void init(); void close(); @@ -133,15 +128,20 @@ class SslConnector : public Connector, private sys::Runnable unsigned int getSSF() { return socket.getKeyLen(); } public: - SslConnector(framing::ProtocolVersion pVersion, + SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; +struct SslConnector::Buff : public SslIO::BufferBase { + Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new SslConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new SslConnector(p, v, s, c); } struct StaticInit { @@ -150,9 +150,9 @@ namespace { SslOptions options; options.parse (0, 0, QPIDC_CONF_FILE, true); if (options.certDbPath.empty()) { - QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); + QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); } else { - initNSS(options); + initNSS(options); Connector::registerFactory("ssl", &create); } } catch (const std::exception& e) { @@ -164,18 +164,18 @@ namespace { } init; } -SslConnector::SslConnector(ProtocolVersion ver, +SslConnector::SslConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), version(ver), initiated(false), closed(true), - joined(true), shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), - impl(cimpl) + poller(p) { QPID_LOG(debug, "SslConnector created for " << version.toString()); //TODO: how do we want to handle socket configuration with ssl? @@ -198,7 +198,6 @@ void SslConnector::connect(const std::string& host, int port){ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; - poller = Poller::shared_ptr(new Poller); aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), @@ -211,11 +210,12 @@ void SslConnector::connect(const std::string& host, int port){ void SslConnector::init(){ Mutex::ScopedLock l(closedLock); - assert(joined); ProtocolInitiation init(version); writeDataBlock(init); - joined = false; - receiver = Thread(this); + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + aio->start(poller); } bool SslConnector::closeInternal() { @@ -224,16 +224,11 @@ bool SslConnector::closeInternal() { if (!closed) { closed = true; aio->queueForDeletion(); - poller->shutdown(); - } - if (!joined && receiver.id() != Thread::current().id()) { - joined = true; - Mutex::ScopedUnlock u(closedLock); - receiver.join(); + socket.close(); } return ret; } - + void SslConnector::close() { closeInternal(); } @@ -267,11 +262,6 @@ void SslConnector::handleClosed() { shutdownHandler->shutdown(); } -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -376,25 +366,4 @@ void SslConnector::eof(SslIO&) { handleClosed(); } -void SslConnector::run(){ - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); - assert(protect); - try { - Dispatcher d(poller); - - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); - d.run(); - socket.close(); - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - handleClosed(); - } -} - - }} // namespace qpid::client diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index 9369dd2ef4..2de139d5df 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -27,6 +27,7 @@ #include "qpid/sys/Codec.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" @@ -45,10 +46,15 @@ using namespace qpid::framing; using boost::format; using boost::str; +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new TCPConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(p, v, s, c); } struct StaticInit { @@ -58,25 +64,20 @@ namespace { } init; } -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - -TCPConnector::TCPConnector(ProtocolVersion ver, - const ConnectionSettings& settings, - ConnectionImpl* cimpl) +TCPConnector::TCPConnector(Poller::shared_ptr p, + ProtocolVersion ver, + const ConnectionSettings& settings, + ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), lastEof(0), currentSize(0), bounds(cimpl), - version(ver), + version(ver), initiated(false), closed(true), - joined(true), shutdownHandler(0), aio(0), - impl(cimpl->shared_from_this()) + poller(p) { QPID_LOG(debug, "TCPConnector created for " << version.toString()); settings.configureSocket(socket); @@ -89,16 +90,13 @@ TCPConnector::~TCPConnector() { void TCPConnector::connect(const std::string& host, int port) { Mutex::ScopedLock l(lock); assert(closed); - assert(joined); - poller = Poller::shared_ptr(new Poller); - AsynchConnector* c = - AsynchConnector::create(socket, - host, port, - boost::bind(&TCPConnector::connected, this, _1), - boost::bind(&TCPConnector::connectFailed, this, _3)); + AsynchConnector* c = AsynchConnector::create( + socket, + host, port, + boost::bind(&TCPConnector::connected, this, _1), + boost::bind(&TCPConnector::connectFailed, this, _3)); closed = false; - joined = false; - receiver = Thread(this); + c->start(poller); } @@ -113,38 +111,31 @@ void TCPConnector::connected(const Socket&) { for (int i = 0; i < 32; i++) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - aio->start(poller); identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); ProtocolInitiation init(version); writeDataBlock(init); + + aio->start(poller); } void TCPConnector::connectFailed(const std::string& msg) { QPID_LOG(warning, "Connecting failed: " << msg); - closed = true; - poller->shutdown(); - closeInternal(); - if (shutdownHandler) + socket.close(); + if (!closed && shutdownHandler) { + closed = true; shutdownHandler->shutdown(); + } } bool TCPConnector::closeInternal() { - bool ret; - { Mutex::ScopedLock l(lock); - ret = !closed; + bool ret = !closed; if (!closed) { closed = true; aio->queueForDeletion(); - poller->shutdown(); - } - if (joined || receiver.id() == Thread::current().id()) { - return ret; - } - joined = true; + socket.close(); } - receiver.join(); return ret; } @@ -301,28 +292,10 @@ void TCPConnector::eof(AsynchIO&) { handleClosed(); } -void TCPConnector::run() { - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl.lock(); - assert(protect); - try { - Dispatcher d(poller); - - d.run(); - } catch (const std::exception& e) { - QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); - handleClosed(); - } - try { - socket.close(); - } catch (const std::exception&) {} -} - void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; securityLayer->init(this); } - }} // namespace qpid::client diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index 7499b26fd2..0de06de40c 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -40,9 +40,14 @@ #include <string> namespace qpid { + +namespace framing { + class InitiationHandler; +} + namespace client { -class TCPConnector : public Connector, public sys::Codec, private sys::Runnable +class TCPConnector : public Connector, public sys::Codec { typedef std::deque<framing::AMQFrame> Frames; struct Buff; @@ -58,15 +63,12 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable framing::ProtocolVersion version; bool initiated; bool closed; - bool joined; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; framing::InitiationHandler* initialiser; framing::OutputHandler* output; - sys::Thread receiver; - sys::Socket socket; sys::AsynchIO* aio; @@ -76,19 +78,16 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable ~TCPConnector(); - void run(); void handleClosed(); bool closeInternal(); - virtual void connected(const qpid::sys::Socket&); + void connected(const sys::Socket&); void connectFailed(const std::string& msg); bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); - boost::weak_ptr<ConnectionImpl> impl; - void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); @@ -107,9 +106,10 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable bool canEncode(); public: - TCPConnector(framing::ProtocolVersion pVersion, - const ConnectionSettings&, - ConnectionImpl*); + TCPConnector(boost::shared_ptr<sys::Poller>, + framing::ProtocolVersion pVersion, + const ConnectionSettings&, + ConnectionImpl*); }; }} // namespace qpid::client |