summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp95
1 files changed, 91 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index cede7f7310..f348493fd0 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,7 +18,9 @@
* under the License.
*
*/
+
#include "qpid/client/ConnectionImpl.h"
+
#include "qpid/client/Connector.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/SessionImpl.h"
@@ -27,11 +29,20 @@
#include "qpid/Url.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/Options.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/shared_ptr.hpp>
#include <limits>
+#include <vector>
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
namespace qpid {
namespace client {
@@ -41,7 +52,10 @@ using namespace qpid::framing::connection;
using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
-// Get timer singleton
+namespace {
+// Maybe should amalgamate the singletons into a single client singleton
+
+// Get timer singleton
Timer& theTimer() {
static Mutex timerInitLock;
ScopedLock<Mutex> l(timerInitLock);
@@ -50,6 +64,76 @@ Timer& theTimer() {
return t;
}
+struct IOThreadOptions : public qpid::Options {
+ int maxIOThreads;
+
+ IOThreadOptions(int c) :
+ Options("IO threading options"),
+ maxIOThreads(c)
+ {
+ addOptions()
+ ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use");
+ }
+};
+
+// IO threads
+class IOThread {
+ int maxIOThreads;
+ int ioThreads;
+ int connections;
+ Mutex threadLock;
+ std::vector<Thread> t;
+ Poller::shared_ptr poller_;
+
+public:
+ void add() {
+ ScopedLock<Mutex> l(threadLock);
+ ++connections;
+ if (ioThreads < maxIOThreads) {
+ QPID_LOG(debug, "Created IO thread: " << ioThreads);
+ ++ioThreads;
+ t.push_back( Thread(poller_.get()) );
+ }
+ }
+
+ void sub() {
+ ScopedLock<Mutex> l(threadLock);
+ --connections;
+ }
+
+ Poller::shared_ptr poller() const {
+ return poller_;
+ }
+
+ // Here is where the maximum number of threads is set
+ IOThread(int c) :
+ ioThreads(0),
+ connections(0),
+ poller_(new Poller)
+ {
+ IOThreadOptions options(c);
+ options.parse(0, 0, QPIDC_CONF_FILE, true);
+ maxIOThreads = (options.maxIOThreads != -1) ?
+ options.maxIOThreads : 1;
+ }
+
+ // We can't destroy threads one-by-one as the only
+ // control we have is to shutdown the whole lot
+ // and we can't do that before we're unloaded as we can't
+ // restart the Poller after shutting it down
+ ~IOThread() {
+ poller_->shutdown();
+ for (int i=0; i<ioThreads; ++i) {
+ t[i].join();
+ }
+ }
+};
+
+IOThread& theIO() {
+ static IOThread io(SystemInfo::concurrency());
+ return io;
+}
+
class HeartbeatTask : public TimerTask {
TimeoutHandler& timeout;
@@ -66,6 +150,8 @@ public:
{}
};
+}
+
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
@@ -89,6 +175,7 @@ ConnectionImpl::~ConnectionImpl() {
// connector thread does not call on us while the destructor
// is running.
if (connector) connector->close();
+ theIO().sub();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
@@ -131,11 +218,10 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
}
bool ConnectionImpl::isOpen() const
-{
+{
return handler.isOpen();
}
-
void ConnectionImpl::open()
{
const std::string& protocol = handler.protocol;
@@ -143,7 +229,8 @@ void ConnectionImpl::open()
int port = handler.port;
QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port);
- connector.reset(Connector::create(protocol, version, handler, this));
+ theIO().add();
+ connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this));
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
connector->connect(host, port);