diff options
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 95 |
1 files changed, 91 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index cede7f7310..f348493fd0 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,7 +18,9 @@ * under the License. * */ + #include "qpid/client/ConnectionImpl.h" + #include "qpid/client/Connector.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/client/SessionImpl.h" @@ -27,11 +29,20 @@ #include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/Options.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/shared_ptr.hpp> #include <limits> +#include <vector> + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif namespace qpid { namespace client { @@ -41,7 +52,10 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -// Get timer singleton +namespace { +// Maybe should amalgamate the singletons into a single client singleton + +// Get timer singleton Timer& theTimer() { static Mutex timerInitLock; ScopedLock<Mutex> l(timerInitLock); @@ -50,6 +64,76 @@ Timer& theTimer() { return t; } +struct IOThreadOptions : public qpid::Options { + int maxIOThreads; + + IOThreadOptions(int c) : + Options("IO threading options"), + maxIOThreads(c) + { + addOptions() + ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use"); + } +}; + +// IO threads +class IOThread { + int maxIOThreads; + int ioThreads; + int connections; + Mutex threadLock; + std::vector<Thread> t; + Poller::shared_ptr poller_; + +public: + void add() { + ScopedLock<Mutex> l(threadLock); + ++connections; + if (ioThreads < maxIOThreads) { + QPID_LOG(debug, "Created IO thread: " << ioThreads); + ++ioThreads; + t.push_back( Thread(poller_.get()) ); + } + } + + void sub() { + ScopedLock<Mutex> l(threadLock); + --connections; + } + + Poller::shared_ptr poller() const { + return poller_; + } + + // Here is where the maximum number of threads is set + IOThread(int c) : + ioThreads(0), + connections(0), + poller_(new Poller) + { + IOThreadOptions options(c); + options.parse(0, 0, QPIDC_CONF_FILE, true); + maxIOThreads = (options.maxIOThreads != -1) ? + options.maxIOThreads : 1; + } + + // We can't destroy threads one-by-one as the only + // control we have is to shutdown the whole lot + // and we can't do that before we're unloaded as we can't + // restart the Poller after shutting it down + ~IOThread() { + poller_->shutdown(); + for (int i=0; i<ioThreads; ++i) { + t[i].join(); + } + } +}; + +IOThread& theIO() { + static IOThread io(SystemInfo::concurrency()); + return io; +} + class HeartbeatTask : public TimerTask { TimeoutHandler& timeout; @@ -66,6 +150,8 @@ public: {} }; +} + ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), @@ -89,6 +175,7 @@ ConnectionImpl::~ConnectionImpl() { // connector thread does not call on us while the destructor // is running. if (connector) connector->close(); + theIO().sub(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) @@ -131,11 +218,10 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) } bool ConnectionImpl::isOpen() const -{ +{ return handler.isOpen(); } - void ConnectionImpl::open() { const std::string& protocol = handler.protocol; @@ -143,7 +229,8 @@ void ConnectionImpl::open() int port = handler.port; QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); - connector.reset(Connector::create(protocol, version, handler, this)); + theIO().add(); + connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); connector->connect(host, port); |