summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2009-11-17 21:08:10 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2009-11-17 21:08:10 +0000
commit520f9d2bd95379fbbcc83e8812bfaa531387f937 (patch)
tree3592f0ac60ac67dcedfea3f54881096cb92490ba /cpp/src
parent86562de96c2d63ca37e8c5435d29ae877fe34929 (diff)
downloadqpid-python-520f9d2bd95379fbbcc83e8812bfaa531387f937.tar.gz
QPID-2188 , support for maxConnections, limit is set to broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@881517 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Broker.h25
-rw-r--r--cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp6
-rw-r--r--cpp/src/qpid/broker/SecureConnectionFactory.cpp6
5 files changed, 40 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index fa041db5cd..58569f5503 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -155,6 +155,7 @@ Broker::Broker(const Broker::Options& conf) :
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
expiryPolicy(new ExpiryPolicy),
+ connectionCounter(conf.maxConnections),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (conf.enableMgmt) {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index d150410de7..5e14aa487d 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -51,6 +51,7 @@
#include "qpid/sys/Timer.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/sys/Mutex.h"
#include <boost/intrusive_ptr.hpp>
#include <string>
@@ -116,6 +117,26 @@ public:
private:
std::string getHome();
};
+
+ class ConnectionCounter {
+ int maxConnections;
+ int connectionCount;
+ sys::Mutex connectionCountLock;
+ public:
+ ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
+ void inc_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ connectionCount++;
+ }
+ void dec_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ connectionCount--;
+ }
+ bool allowConnection() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ return (maxConnections <= connectionCount);
+ }
+ };
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
@@ -147,7 +168,7 @@ public:
std::string federationTag;
bool recovery;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-
+ ConnectionCounter connectionCounter;
public:
virtual ~Broker();
@@ -238,6 +259,8 @@ public:
bool getRecovery() const { return recovery; }
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
+
+ ConnectionCounter& getConnectionCounter() {return connectionCounter;}
};
}}
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 7aa632c5a5..17de83e033 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -102,6 +102,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
}
ConnectionState::setUrl(mgmtId);
}
+ if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -125,6 +126,8 @@ Connection::~Connection()
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+
+ if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
}
void Connection::received(framing::AMQFrame& frame) {
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index dd794d7d95..24d812aefb 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -22,6 +22,7 @@
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -37,6 +38,11 @@ ConnectionFactory::~ConnectionFactory() {}
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
unsigned int ) {
+ if (broker.getConnectionCounter().allowConnection())
+ {
+ QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refushed");
+ return 0;
+ }
if (v == ProtocolVersion(0, 10)) {
ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false)));
diff --git a/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/cpp/src/qpid/broker/SecureConnectionFactory.cpp
index 4926851f95..9d92d26ac8 100644
--- a/cpp/src/qpid/broker/SecureConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/SecureConnectionFactory.cpp
@@ -23,6 +23,7 @@
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/SecureConnection.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -38,6 +39,11 @@ SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {}
sys::ConnectionCodec*
SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
unsigned int conn_ssf ) {
+ if (broker.getConnectionCounter().allowConnection())
+ {
+ QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refushed");
+ return 0;
+ }
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new amqp_0_10::Connection(out, id, false));