diff options
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 92 |
1 files changed, 4 insertions, 88 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index ccaa8c0b87..6639f92324 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,13 +18,7 @@ * under the License. * */ - #include "ConnectionImpl.h" - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" @@ -34,16 +28,11 @@ #include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/SystemInfo.h" -#include "qpid/Options.h" #include <boost/bind.hpp> #include <boost/format.hpp> -#include <boost/shared_ptr.hpp> #include <limits> -#include <vector> namespace qpid { namespace client { @@ -53,10 +42,7 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -namespace { -// Maybe should amalgamate the singletons into a single client singleton - -// Get timer singleton +// Get timer singleton Timer& theTimer() { static Mutex timerInitLock; ScopedLock<Mutex> l(timerInitLock); @@ -65,73 +51,6 @@ Timer& theTimer() { return t; } -struct IOThreadOptions : public qpid::Options { - int maxIOThreads; - - IOThreadOptions(int c) : - Options("IO threading options"), - maxIOThreads(c) - { - addOptions() - ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use"); - } -}; - -// IO threads -class IOThread { - int maxIOThreads; - int ioThreads; - int connections; - Mutex threadLock; - std::vector<Thread> t; - Poller::shared_ptr poller_; - -public: - void add() { - ScopedLock<Mutex> l(threadLock); - ++connections; - if (ioThreads < maxIOThreads) { - QPID_LOG(debug, "Created IO thread: " << ioThreads); - ++ioThreads; - t.push_back( Thread(poller_.get()) ); - } - } - - void sub() { - ScopedLock<Mutex> l(threadLock); - --connections; - } - - Poller::shared_ptr poller() const { - return poller_; - } - - // Here is where the maximum number of threads is set - IOThread(int c) : - ioThreads(0), - connections(0), - poller_(new Poller) - { - IOThreadOptions options(c); - options.parse(0, 0, QPIDC_CONF_FILE, true); - maxIOThreads = (options.maxIOThreads != -1) ? - options.maxIOThreads : 1; - } - - // We can't destroy threads one-by-one as the only - // control we have is to shutdown the whole lot - // and we can't do that before we're unloaded as we can't - // restart the Poller after shutting it down - ~IOThread() { - poller_->shutdown(); - for (int i=0; i<ioThreads; ++i) { - t[i].join(); - } - } -}; - -static IOThread io(SystemInfo::concurrency()); - class HeartbeatTask : public TimerTask { TimeoutHandler& timeout; @@ -148,8 +67,6 @@ public: {} }; -} - ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), @@ -173,7 +90,6 @@ ConnectionImpl::~ConnectionImpl() { // is running. failover.reset(); if (connector) connector->close(); - io.sub(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) @@ -210,6 +126,7 @@ bool ConnectionImpl::isOpen() const return handler.isOpen(); } + void ConnectionImpl::open() { const std::string& protocol = handler.protocol; @@ -217,8 +134,7 @@ void ConnectionImpl::open() int port = handler.port; QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); - io.add(); - connector.reset(Connector::create(protocol, io.poller(), version, handler, this)); + connector.reset(Connector::create(protocol, version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); connector->connect(host, port); @@ -322,7 +238,7 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() { return handler; } - + std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } |