summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp92
1 files changed, 4 insertions, 88 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index ccaa8c0b87..6639f92324 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,13 +18,7 @@
* under the License.
*
*/
-
#include "ConnectionImpl.h"
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
#include "Connector.h"
#include "ConnectionSettings.h"
#include "SessionImpl.h"
@@ -34,16 +28,11 @@
#include "qpid/Url.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/Options.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include <boost/shared_ptr.hpp>
#include <limits>
-#include <vector>
namespace qpid {
namespace client {
@@ -53,10 +42,7 @@ using namespace qpid::framing::connection;
using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
-namespace {
-// Maybe should amalgamate the singletons into a single client singleton
-
-// Get timer singleton
+// Get timer singleton
Timer& theTimer() {
static Mutex timerInitLock;
ScopedLock<Mutex> l(timerInitLock);
@@ -65,73 +51,6 @@ Timer& theTimer() {
return t;
}
-struct IOThreadOptions : public qpid::Options {
- int maxIOThreads;
-
- IOThreadOptions(int c) :
- Options("IO threading options"),
- maxIOThreads(c)
- {
- addOptions()
- ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use");
- }
-};
-
-// IO threads
-class IOThread {
- int maxIOThreads;
- int ioThreads;
- int connections;
- Mutex threadLock;
- std::vector<Thread> t;
- Poller::shared_ptr poller_;
-
-public:
- void add() {
- ScopedLock<Mutex> l(threadLock);
- ++connections;
- if (ioThreads < maxIOThreads) {
- QPID_LOG(debug, "Created IO thread: " << ioThreads);
- ++ioThreads;
- t.push_back( Thread(poller_.get()) );
- }
- }
-
- void sub() {
- ScopedLock<Mutex> l(threadLock);
- --connections;
- }
-
- Poller::shared_ptr poller() const {
- return poller_;
- }
-
- // Here is where the maximum number of threads is set
- IOThread(int c) :
- ioThreads(0),
- connections(0),
- poller_(new Poller)
- {
- IOThreadOptions options(c);
- options.parse(0, 0, QPIDC_CONF_FILE, true);
- maxIOThreads = (options.maxIOThreads != -1) ?
- options.maxIOThreads : 1;
- }
-
- // We can't destroy threads one-by-one as the only
- // control we have is to shutdown the whole lot
- // and we can't do that before we're unloaded as we can't
- // restart the Poller after shutting it down
- ~IOThread() {
- poller_->shutdown();
- for (int i=0; i<ioThreads; ++i) {
- t[i].join();
- }
- }
-};
-
-static IOThread io(SystemInfo::concurrency());
-
class HeartbeatTask : public TimerTask {
TimeoutHandler& timeout;
@@ -148,8 +67,6 @@ public:
{}
};
-}
-
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
@@ -173,7 +90,6 @@ ConnectionImpl::~ConnectionImpl() {
// is running.
failover.reset();
if (connector) connector->close();
- io.sub();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
@@ -210,6 +126,7 @@ bool ConnectionImpl::isOpen() const
return handler.isOpen();
}
+
void ConnectionImpl::open()
{
const std::string& protocol = handler.protocol;
@@ -217,8 +134,7 @@ void ConnectionImpl::open()
int port = handler.port;
QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port);
- io.add();
- connector.reset(Connector::create(protocol, io.poller(), version, handler, this));
+ connector.reset(Connector::create(protocol, version, handler, this));
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
connector->connect(host, port);
@@ -322,7 +238,7 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
{
return handler;
}
-
+
std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
}