diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2009-11-17 21:08:10 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2009-11-17 21:08:10 +0000 |
commit | 520f9d2bd95379fbbcc83e8812bfaa531387f937 (patch) | |
tree | 3592f0ac60ac67dcedfea3f54881096cb92490ba /cpp/src | |
parent | 86562de96c2d63ca37e8c5435d29ae877fe34929 (diff) | |
download | qpid-python-520f9d2bd95379fbbcc83e8812bfaa531387f937.tar.gz |
QPID-2188 , support for maxConnections, limit is set to broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@881517 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionFactory.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SecureConnectionFactory.cpp | 6 |
5 files changed, 40 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index fa041db5cd..58569f5503 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -155,6 +155,7 @@ Broker::Broker(const Broker::Options& conf) : queueEvents(poller,!conf.asyncQueueEvents), recovery(true), expiryPolicy(new ExpiryPolicy), + connectionCounter(conf.maxConnections), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index d150410de7..5e14aa487d 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -51,6 +51,7 @@ #include "qpid/sys/Timer.h" #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" +#include "qpid/sys/Mutex.h" #include <boost/intrusive_ptr.hpp> #include <string> @@ -116,6 +117,26 @@ public: private: std::string getHome(); }; + + class ConnectionCounter { + int maxConnections; + int connectionCount; + sys::Mutex connectionCountLock; + public: + ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {}; + void inc_connectionCount() { + sys::ScopedLock<sys::Mutex> l(connectionCountLock); + connectionCount++; + } + void dec_connectionCount() { + sys::ScopedLock<sys::Mutex> l(connectionCountLock); + connectionCount--; + } + bool allowConnection() { + sys::ScopedLock<sys::Mutex> l(connectionCountLock); + return (maxConnections <= connectionCount); + } + }; private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; @@ -147,7 +168,7 @@ public: std::string federationTag; bool recovery; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - + ConnectionCounter connectionCounter; public: virtual ~Broker(); @@ -238,6 +259,8 @@ public: bool getRecovery() const { return recovery; } management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } + + ConnectionCounter& getConnectionCounter() {return connectionCounter;} }; }} diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 7aa632c5a5..17de83e033 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -102,6 +102,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std } ConnectionState::setUrl(mgmtId); } + if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); } void Connection::requestIOProcessing(boost::function0<void> callback) @@ -125,6 +126,8 @@ Connection::~Connection() heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + + if (!isShadow()) broker.getConnectionCounter().dec_connectionCount(); } void Connection::received(framing::AMQFrame& frame) { diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index dd794d7d95..24d812aefb 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -22,6 +22,7 @@ #include "qpid/framing/ProtocolVersion.h" #include "qpid/amqp_0_10/Connection.h" #include "qpid/broker/Connection.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -37,6 +38,11 @@ ConnectionFactory::~ConnectionFactory() {} sys::ConnectionCodec* ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, unsigned int ) { + if (broker.getConnectionCounter().allowConnection()) + { + QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refushed"); + return 0; + } if (v == ProtocolVersion(0, 10)) { ConnectionPtr c(new amqp_0_10::Connection(out, id, false)); c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false))); diff --git a/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/cpp/src/qpid/broker/SecureConnectionFactory.cpp index 4926851f95..9d92d26ac8 100644 --- a/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ b/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -23,6 +23,7 @@ #include "qpid/amqp_0_10/Connection.h" #include "qpid/broker/Connection.h" #include "qpid/broker/SecureConnection.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -38,6 +39,11 @@ SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {} sys::ConnectionCodec* SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, unsigned int conn_ssf ) { + if (broker.getConnectionCounter().allowConnection()) + { + QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refushed"); + return 0; + } if (v == ProtocolVersion(0, 10)) { SecureConnectionPtr sc(new SecureConnection()); CodecPtr c(new amqp_0_10::Connection(out, id, false)); |