summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/acl/Acl.cpp10
-rw-r--r--qpid/cpp/src/qpid/acl/Acl.h11
-rw-r--r--qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp193
-rw-r--r--qpid/cpp/src/qpid/acl/AclConnectionCounter.h36
-rw-r--r--qpid/cpp/src/qpid/acl/AclPlugin.cpp8
-rw-r--r--qpid/cpp/src/qpid/acl/management-schema.xml17
-rw-r--r--qpid/cpp/src/qpid/broker/AclModule.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h24
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp65
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp14
-rwxr-xr-xqpid/cpp/src/tests/run_acl_tests4
15 files changed, 254 insertions, 163 deletions
diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp
index 2962a9c1ba..b68816288a 100644
--- a/qpid/cpp/src/qpid/acl/Acl.cpp
+++ b/qpid/cpp/src/qpid/acl/Acl.cpp
@@ -51,7 +51,7 @@ using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::acl;
Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0),
- connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp))
+ connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal))
{
agent = broker->getManagementAgent();
@@ -60,6 +60,9 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals
_qmf::Package packageInit(agent);
mgmtObject = new _qmf::Acl (agent, this, broker);
agent->addObject (mgmtObject);
+ mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal);
+ mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp);
+ mgmtObject->set_maxConnectionsPerUser(aclValues.aclMaxConnectPerUser);
}
std::string errorString;
if (!readAclFile(errorString)){
@@ -121,6 +124,11 @@ bool Acl::authorise(
}
+bool Acl::approveConnection(const broker::Connection& conn)
+{
+ return connectionCounter->approveConnection(conn);
+}
+
bool Acl::result(
const AclResult& aclreslt,
const std::string& id,
diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h
index c3451018ef..4893f71ef2 100644
--- a/qpid/cpp/src/qpid/acl/Acl.h
+++ b/qpid/cpp/src/qpid/acl/Acl.h
@@ -38,6 +38,7 @@
namespace qpid {
namespace broker {
class Broker;
+class Connection;
}
namespace acl {
@@ -45,8 +46,9 @@ class ConnectionCounter;
struct AclValues {
std::string aclFile;
- uint32_t aclMaxConnectPerUser;
- uint32_t aclMaxConnectPerIp;
+ uint16_t aclMaxConnectPerUser;
+ uint16_t aclMaxConnectPerIp;
+ uint16_t aclMaxConnectTotal;
};
@@ -66,6 +68,9 @@ private:
public:
Acl (AclValues& av, broker::Broker& b);
+ /** reportConnectLimit
+ * issue management counts and alerts for denied connections
+ */
void reportConnectLimit(const std::string user, const std::string addr);
inline virtual bool doTransferAcl() {
@@ -87,6 +92,8 @@ public:
const std::string& ExchangeName,
const std::string& RoutingKey);
+ virtual bool approveConnection(const broker::Connection& connection);
+
virtual ~Acl();
private:
bool result(
diff --git a/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp b/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp
index 5d4e3c1544..70f0ca1da8 100644
--- a/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp
+++ b/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp
@@ -41,30 +41,74 @@ namespace acl {
//
//
//
-ConnectionCounter::ConnectionCounter(Acl& a, uint32_t nl, uint32_t hl) :
- acl(a), nameLimit(nl), hostLimit(hl) {}
+ConnectionCounter::ConnectionCounter(Acl& a, uint16_t nl, uint16_t hl, uint16_t tl) :
+ acl(a), nameLimit(nl), hostLimit(hl), totalLimit(tl), totalCurrentConnections(0) {}
ConnectionCounter::~ConnectionCounter() {}
//
-// limitCheckLH
+// limitApproveLH
+//
+// Connection creation approver. Return true only if user is under limit.
+// Called with lock held.
+//
+bool ConnectionCounter::limitApproveLH(
+ connectCountsMap_t& theMap,
+ const std::string& theName,
+ uint16_t theLimit,
+ bool emitLog) {
+
+ bool result(true);
+ if (theLimit > 0) {
+ uint16_t count;
+ connectCountsMap_t::iterator eRef = theMap.find(theName);
+ if (eRef != theMap.end()) {
+ count = (uint16_t)(*eRef).second;
+ result = count <= theLimit;
+ } else {
+ // Not found
+ count = 0;
+ }
+ if (emitLog) {
+ QPID_LOG(trace, "ACL ConnectionApprover IP=" << theName
+ << " limit=" << theLimit
+ << " curValue=" << count
+ << " result=" << (result ? "allow" : "deny"));
+ }
+ }
+ return result;
+}
+
+
+//
+// countConnectionLH
//
// Increment the name's count in map and return a comparison against the limit.
// called with dataLock already taken
//
-bool ConnectionCounter::limitCheckLH(
- connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) {
+bool ConnectionCounter::countConnectionLH(
+ connectCountsMap_t& theMap,
+ const std::string& theName,
+ uint16_t theLimit,
+ bool emitLog) {
bool result(true);
+ uint16_t count(0);
if (theLimit > 0) {
connectCountsMap_t::iterator eRef = theMap.find(theName);
if (eRef != theMap.end()) {
- uint32_t count = (uint32_t)(*eRef).second + 1;
+ count = (uint16_t)(*eRef).second + 1;
(*eRef).second = count;
result = count <= theLimit;
} else {
- theMap[theName] = 1;
+ theMap[theName] = count = 1;
+ }
+ if (emitLog) {
+ QPID_LOG(trace, "ACL ConnectionApprover user=" << theName
+ << " limit=" << theLimit
+ << " curValue=" << count
+ << " result=" << (result ? "allow" : "deny"));
}
}
return result;
@@ -78,12 +122,12 @@ bool ConnectionCounter::limitCheckLH(
// called with dataLock already taken
//
void ConnectionCounter::releaseLH(
- connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) {
+ connectCountsMap_t& theMap, const std::string& theName, uint16_t theLimit) {
if (theLimit > 0) {
connectCountsMap_t::iterator eRef = theMap.find(theName);
if (eRef != theMap.end()) {
- uint32_t count = (uint32_t) (*eRef).second;
+ uint16_t count = (uint16_t) (*eRef).second;
assert (count > 0);
if (1 == count) {
theMap.erase (eRef);
@@ -103,52 +147,20 @@ void ConnectionCounter::releaseLH(
// connection - called during Connection's constructor
//
void ConnectionCounter::connection(broker::Connection& connection) {
- QPID_LOG(trace, "ACL ConnectionCounter connection IP:" << connection.getMgmtId()
- << ", userId:" << connection.getUserId());
-
- Mutex::ScopedLock locker(dataLock);
-
- connectProgressMap[connection.getMgmtId()] = C_CREATED;
-}
-
+ QPID_LOG(trace, "ACL ConnectionCounter new connection: " << connection.getMgmtId());
-//
-// opened - called when first AMQP frame is received over Connection
-//
-void ConnectionCounter::opened(broker::Connection& connection) {
- QPID_LOG(trace, "ACL ConnectionCounter Opened IP:" << connection.getMgmtId()
- << ", userId:" << connection.getUserId());
-
- Mutex::ScopedLock locker(dataLock);
-
- const std::string& userName( connection.getUserId());
const std::string& hostName(getClientHost(connection.getMgmtId()));
- // Bump state from CREATED to OPENED
- (void) limitCheckLH(connectProgressMap, connection.getMgmtId(), C_OPENED);
+ Mutex::ScopedLock locker(dataLock);
- bool nameOk = limitCheckLH(connectByNameMap, userName, nameLimit);
- bool hostOk = limitCheckLH(connectByHostMap, hostName, hostLimit);
+ // Total connections goes up
+ totalCurrentConnections += 1;
- if (!nameOk) {
- // User has too many
- acl.reportConnectLimit(userName, hostName);
- QPID_LOG(notice, "ACL ConnectionCounter User '" << userName
- << "' exceeded maximum allowed connections");
- throw Exception(
- QPID_MSG("User '" << userName
- << "' exceeded maximum allowed connections"));
- }
+ // Record the fact that this connection exists
+ connectProgressMap[connection.getMgmtId()] = C_CREATED;
- if (!hostOk) {
- // Host has too many
- acl.reportConnectLimit(userName, hostName);
- QPID_LOG(notice, "ACL ConnectionCounter Client host '" << hostName
- << "' exceeded maximum allowed connections");
- throw Exception(
- QPID_MSG("Client host '" << hostName
- << "' exceeded maximum allowed connections"));
- }
+ // Count the connection from this host.
+ (void) countConnectionLH(connectByHostMap, hostName, hostLimit, false);
}
@@ -156,7 +168,7 @@ void ConnectionCounter::opened(broker::Connection& connection) {
// closed - called during Connection's destructor
//
void ConnectionCounter::closed(broker::Connection& connection) {
- QPID_LOG(trace, "ACL ConnectionCounter Closed IP:" << connection.getMgmtId()
+ QPID_LOG(trace, "ACL ConnectionCounter closed: " << connection.getMgmtId()
<< ", userId:" << connection.getUserId());
Mutex::ScopedLock locker(dataLock);
@@ -165,32 +177,99 @@ void ConnectionCounter::closed(broker::Connection& connection) {
if (eRef != connectProgressMap.end()) {
if ((*eRef).second == C_OPENED){
// Normal case: connection was created and opened.
- // Decrement in-use counts
+ // Decrement user in-use counts
releaseLH(connectByNameMap,
connection.getUserId(),
nameLimit);
-
- releaseLH(connectByHostMap,
- getClientHost(connection.getMgmtId()),
- hostLimit);
} else {
// Connection was created but not opened.
- // Don't decrement any connection counts.
+ // Don't decrement user count.
}
+
+ // Decrement host in-use count.
+ releaseLH(connectByHostMap,
+ getClientHost(connection.getMgmtId()),
+ hostLimit);
+
+ // destroy connection progress indicator
connectProgressMap.erase(eRef);
} else {
// connection not found in progress map
- QPID_LOG(notice, "ACL ConnectionCounter info for '" << connection.getMgmtId()
+ QPID_LOG(notice, "ACL ConnectionCounter closed info for '" << connection.getMgmtId()
<< "' not found in connection state pool");
}
+
+ // total connections
+ totalCurrentConnections -= 1;
}
//
+// approveConnection
+// check total connections, connections from IP, connections by user and
+// disallow if over any limit
+//
+bool ConnectionCounter::approveConnection(const broker::Connection& connection)
+{
+ const std::string& hostName(getClientHost(connection.getMgmtId()));
+ const std::string& userName( connection.getUserId());
+
+ Mutex::ScopedLock locker(dataLock);
+
+ // Bump state from CREATED to OPENED
+ (void) countConnectionLH(connectProgressMap, connection.getMgmtId(),
+ C_OPENED, false);
+
+ // Approve total connections
+ bool okTotal = true;
+ if (totalLimit > 0) {
+ okTotal = totalCurrentConnections <= totalLimit;
+ QPID_LOG(trace, "ACL ConnectionApprover totalLimit=" << totalLimit
+ << " curValue=" << totalCurrentConnections
+ << " result=" << (okTotal ? "allow" : "deny"));
+ }
+
+ // Approve by IP host connections
+ bool okByIP = limitApproveLH(connectByHostMap, hostName, hostLimit, true);
+
+ // Count and Approve the connection by the user
+ bool okByUser = countConnectionLH(connectByNameMap, userName, nameLimit, true);
+
+ // Emit separate log for each disapproval
+ if (!okTotal) {
+ QPID_LOG(error, "Client max total connection count limit of " << totalLimit
+ << " exceeded by "
+ << connection.getMgmtId() << ", user: "
+ << userName << ". Connection refused");
+ }
+ if (!okByIP) {
+ QPID_LOG(error, "Client max per-host connection count limit of "
+ << hostLimit << " exceeded by "
+ << connection.getMgmtId() << ", user: "
+ << userName << ". Connection refused.");
+ }
+ if (!okByUser) {
+ QPID_LOG(error, "Client max per-user connection count limit of "
+ << nameLimit << " exceeded by "
+ << connection.getMgmtId() << ", user: "
+ << userName << ". Connection refused.");
+ }
+
+ // Count/Event once for each disapproval
+ bool result = okTotal && okByIP && okByUser;
+ if (!result) {
+ acl.reportConnectLimit(userName, hostName);
+ }
+
+ return result;
+}
+
+//
// getClientIp - given a connection's mgmtId return the client host part.
//
// TODO: Ideally this would be a method of the connection itself.
+// TODO: Verify it works with rdma connection names.
//
std::string ConnectionCounter::getClientHost(const std::string mgmtId)
{
diff --git a/qpid/cpp/src/qpid/acl/AclConnectionCounter.h b/qpid/cpp/src/qpid/acl/AclConnectionCounter.h
index 31d11540fd..eec8e90256 100644
--- a/qpid/cpp/src/qpid/acl/AclConnectionCounter.h
+++ b/qpid/cpp/src/qpid/acl/AclConnectionCounter.h
@@ -48,32 +48,52 @@ private:
enum CONNECTION_PROGRESS { C_CREATED=1, C_OPENED=2 };
Acl& acl;
- uint32_t nameLimit;
- uint32_t hostLimit;
+ uint16_t nameLimit;
+ uint16_t hostLimit;
+ uint16_t totalLimit;
+ uint16_t totalCurrentConnections;
qpid::sys::Mutex dataLock;
+ /** Records per-connection state */
connectCountsMap_t connectProgressMap;
+
+ /** Records per-username counts */
connectCountsMap_t connectByNameMap;
+
+ /** Records per-host counts */
connectCountsMap_t connectByHostMap;
+ /** Given a connection's management ID, return the client host name */
std::string getClientHost(const std::string mgmtId);
- bool limitCheckLH(connectCountsMap_t& theMap,
- const std::string& theName,
- uint32_t theLimit);
+ /** Return approval for proposed connection */
+ bool limitApproveLH(connectCountsMap_t& theMap,
+ const std::string& theName,
+ uint16_t theLimit,
+ bool emitLog);
+
+ /** Record a connection.
+ * @return indication if user/host is over its limit */
+ bool countConnectionLH(connectCountsMap_t& theMap,
+ const std::string& theName,
+ uint16_t theLimit,
+ bool emitLog);
+ /** Release a connection */
void releaseLH(connectCountsMap_t& theMap,
const std::string& theName,
- uint32_t theLimit);
+ uint16_t theLimit);
public:
- ConnectionCounter(Acl& acl, uint32_t nl, uint32_t hl);
+ ConnectionCounter(Acl& acl, uint16_t nl, uint16_t hl, uint16_t tl);
~ConnectionCounter();
+ // ConnectionObserver interface
void connection(broker::Connection& connection);
- void opened(broker::Connection& connection);
void closed(broker::Connection& connection);
+ // Connection counting
+ bool approveConnection(const broker::Connection& conn);
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/acl/AclPlugin.cpp b/qpid/cpp/src/qpid/acl/AclPlugin.cpp
index 6c18cd2749..ebf5e90afe 100644
--- a/qpid/cpp/src/qpid/acl/AclPlugin.cpp
+++ b/qpid/cpp/src/qpid/acl/AclPlugin.cpp
@@ -39,10 +39,13 @@ struct AclOptions : public Options {
AclValues& values;
AclOptions(AclValues& v) : Options("ACL Options"), values(v) {
+ values.aclMaxConnectTotal = 500;
addOptions()
("acl-file", optValue(values.aclFile, "FILE"), "The policy file to load from, loaded from data dir")
- ("acl-max-connect-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user")
- ("acl-max-connect-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address");
+ ("max-connections" , optValue(values.aclMaxConnectTotal, "N"), "The maximum combined number of connections allowed. 0 implies no limit.")
+ ("max-connections-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user. 0 implies no limit.")
+ ("max-connections-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address. 0 implies no limit.")
+ ;
}
};
@@ -69,7 +72,6 @@ struct AclPlugin : public Plugin {
oss << b.getDataDir().getPath() << "/" << values.aclFile;
values.aclFile = oss.str();
}
-
acl = new Acl(values, b);
b.setAcl(acl.get());
b.addFinalizer(boost::bind(&AclPlugin::shutdown, this));
diff --git a/qpid/cpp/src/qpid/acl/management-schema.xml b/qpid/cpp/src/qpid/acl/management-schema.xml
index 19fe37333c..f52c251bed 100644
--- a/qpid/cpp/src/qpid/acl/management-schema.xml
+++ b/qpid/cpp/src/qpid/acl/management-schema.xml
@@ -17,13 +17,16 @@
-->
<class name="Acl">
- <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/>
- <property name="policyFile" type="lstr" access="RO" desc="Name of the policy file"/>
- <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/>
- <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/>
- <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/>
- <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/>
- <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/>
+ <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/>
+ <property name="policyFile" type="lstr" access="RO" desc="Name of the policy file"/>
+ <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/>
+ <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/>
+ <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/>
+ <property name="maxConnections" type="uint16" access="RO" desc="Maximum allowed connections"/>
+ <property name="maxConnectionsPerIp" type="uint16" access="RO" desc="Maximum allowed connections"/>
+ <property name="maxConnectionsPerUser" type="uint16" access="RO" desc="Maximum allowed connections"/>
+ <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/>
+ <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/>
<method name="reloadACLFile" desc="Reload the ACL file"/>
diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h
index ff9281b6fc..7c180439cf 100644
--- a/qpid/cpp/src/qpid/broker/AclModule.h
+++ b/qpid/cpp/src/qpid/broker/AclModule.h
@@ -113,6 +113,7 @@ namespace acl {
namespace broker {
+ class Connection;
class AclModule
{
@@ -139,6 +140,11 @@ namespace broker {
// Add specialized authorise() methods as required.
+ /** Approve connection by counting connections total, per-IP, and
+ * per-user.
+ */
+ virtual bool approveConnection (const Connection& connection)=0;
+
virtual ~AclModule() {};
};
} // namespace broker
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index ed24db14ad..4b7b4cb2a6 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -108,7 +108,6 @@ Broker::Options::Options(const std::string& name) :
noDataDir(0),
port(DEFAULT_PORT),
workerThreads(5),
- maxConnections(500),
connectionBacklog(10),
enableMgmt(1),
mgmtPublish(1),
@@ -148,7 +147,6 @@ Broker::Options::Options(const std::string& name) :
("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
- ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)")
@@ -212,7 +210,6 @@ Broker::Broker(const Broker::Options& conf) :
inCluster(false),
clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
- connectionCounter(conf.maxConnections),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
{
@@ -231,7 +228,6 @@ Broker::Broker(const Broker::Options& conf) :
mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
- mgmtObject->set_maxConns(conf.maxConnections);
mgmtObject->set_connBacklog(conf.connectionBacklog);
mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
mgmtObject->set_mgmtPublish(conf.mgmtPublish);
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 22a35c0929..83f34a839a 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -103,7 +103,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
std::string dataDir;
uint16_t port;
int workerThreads;
- int maxConnections;
int connectionBacklog;
bool enableMgmt;
bool mgmtPublish;
@@ -134,26 +133,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
std::string getHome();
};
- class ConnectionCounter {
- int maxConnections;
- int connectionCount;
- sys::Mutex connectionCountLock;
- public:
- ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
- void inc_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- connectionCount++;
- }
- void dec_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- connectionCount--;
- }
- bool allowConnection() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- return (maxConnections <= connectionCount);
- }
- };
-
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
@@ -206,7 +185,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
bool recovery;
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- ConnectionCounter connectionCounter;
ConsumerFactories consumerFactories;
mutable sys::Mutex linkClientPropertiesLock;
@@ -322,8 +300,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
- ConnectionCounter& getConnectionCounter() {return connectionCounter;}
-
/**
* Never true in a stand-alone broker. In a cluster, return true
* to defer delivery of messages deliveredg in a cluster-unsafe
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 03ff3d5793..15fffdfcb1 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -107,7 +107,6 @@ Connection::Connection(ConnectionOutputHandler* out_,
broker.getConnectionObservers().connection(*this);
// In a cluster, allow adding the management object to be delayed.
if (!delayManagement) addManagementObject();
- if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::addManagementObject() {
@@ -151,8 +150,6 @@ Connection::~Connection()
if (linkHeartbeatTimer) {
linkHeartbeatTimer->cancel();
}
-
- if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
}
void Connection::received(framing::AMQFrame& frame) {
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
index 9e0020812b..d5d24ca629 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,11 +40,6 @@ ConnectionFactory::~ConnectionFactory() {}
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
- if (broker.getConnectionCounter().allowConnection())
- {
- QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
- return 0;
- }
if (v == ProtocolVersion(0, 10)) {
ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false)));
@@ -62,5 +57,5 @@ ConnectionFactory::create(sys::OutputControl& out, const std::string& id,
return c.release();
}
-
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 57926fa938..09df180fcc 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
# include "config.h"
#endif
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
@@ -170,7 +171,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti
if (c.getBroker().getOptions().auth) {
if ( isShadow )
return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
- else
+ else
return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
} else {
QPID_LOG(debug, "SASL: No Authentication Performed");
@@ -179,7 +180,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti
}
-NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
+NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
realm(c.getBroker().getOptions().realm), encrypt(e) {}
NullAuthenticator::~NullAuthenticator() {}
@@ -215,7 +216,7 @@ void NullAuthenticator::start(const string& mechanism, const string* response)
} else if (i != string::npos) {
//authorization id is first null delimited field
uid = response->substr(0, i);
- }//else not a valid SASL PLAIN response, throw error?
+ }//else not a valid SASL PLAIN response, throw error?
if (!uid.empty()) {
//append realm if it has not already been added
i = uid.find(realm);
@@ -227,7 +228,12 @@ void NullAuthenticator::start(const string& mechanism, const string* response)
}
} else {
connection.setUserId("anonymous");
- }
+ }
+ AclModule* acl = connection.getBroker().getAcl();
+ if (acl && !acl->approveConnection(connection))
+ {
+ throw ConnectionForcedException("User connection denied by configured limit");
+ }
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
}
@@ -241,7 +247,7 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t)
#if HAVE_SASL
-CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
+CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
{
init();
@@ -272,17 +278,17 @@ void CyrusAuthenticator::init()
NULL, /* Callbacks */
0, /* Connection flags */
&sasl_conn);
-
+
if (SASL_OK != code) {
QPID_LOG(error, "SASL: Connection creation failed: [" << code << "] " << sasl_errdetail(sasl_conn));
-
+
// TODO: Change this to an exception signaling
// server error, when one is available
throw ConnectionForcedException("Unable to perform authentication");
}
sasl_security_properties_t secprops;
-
+
//TODO: should the actual SSF values be configurable here?
secprops.min_ssf = encrypt ? 10: 0;
secprops.max_ssf = 256;
@@ -320,14 +326,14 @@ void CyrusAuthenticator::init()
secprops.property_values = 0;
secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */
/*
- * The nodict flag restricts SASL authentication mechanisms
- * to those that are not susceptible to dictionary attacks.
- * They are:
+ * The nodict flag restricts SASL authentication mechanisms
+ * to those that are not susceptible to dictionary attacks.
+ * They are:
* SRP
* PASSDSS-3DES-1
* EXTERNAL
*/
- if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;
+ if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;
int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops);
if (result != SASL_OK) {
throw framing::InternalErrorException(QPID_MSG("SASL error: " << result));
@@ -372,10 +378,10 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms)
"", separator, "",
&list, &list_len,
&count);
-
+
if (SASL_OK != code) {
QPID_LOG(info, "SASL: Mechanism listing failed: " << sasl_errdetail(sasl_conn));
-
+
// TODO: Change this to an exception signaling
// server error, when one is available
throw ConnectionForcedException("Mechanism listing failed");
@@ -383,17 +389,17 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms)
string mechanism;
unsigned int start;
unsigned int end;
-
+
QPID_LOG(info, "SASL: Mechanism list: " << list);
-
+
end = 0;
do {
start = end;
-
+
// Seek to end of next mechanism
while (end < list_len && separator[0] != list[end])
end++;
-
+
// Record the mechanism
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value(string(list, start, end - start))));
end++;
@@ -405,20 +411,20 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response)
{
const char *challenge;
unsigned int challenge_len;
-
+
// This should be at same debug level as mech list in getMechanisms().
QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
int code = sasl_server_start(sasl_conn,
mechanism.c_str(),
(response ? response->c_str() : 0), (response ? response->size() : 0),
&challenge, &challenge_len);
-
+
processAuthenticationStep(code, challenge, challenge_len);
qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
- if ( cnxMgmt )
+ if ( cnxMgmt )
cnxMgmt->set_saslMechanism(mechanism);
}
-
+
void CyrusAuthenticator::step(const string& response)
{
const char *challenge;
@@ -440,10 +446,17 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen
// authentication failure, when one is available
throw ConnectionForcedException("Authenticated username unavailable");
}
- QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
connection.setUserId(uid);
+ AclModule* acl = connection.getBroker().getAcl();
+ if (acl && !acl->approveConnection(connection))
+ {
+ throw ConnectionForcedException("User connection denied by configured limit");
+ }
+
+ QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
+
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
} else if (SASL_CONTINUE == code) {
string challenge_str(challenge, challenge_len);
@@ -491,7 +504,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr
securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
}
qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
- if ( cnxMgmt )
+ if ( cnxMgmt )
cnxMgmt->set_saslSsf(ssf);
return securityLayer;
}
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
index 754b443c22..757f6efc59 100644
--- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -41,11 +41,6 @@ SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {}
sys::ConnectionCodec*
SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
- if (broker.getConnectionCounter().allowConnection())
- {
- QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
- return 0;
- }
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new amqp_0_10::Connection(out, id, false));
@@ -71,5 +66,5 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
return sc.release();
}
-
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
index d0ba8abfb3..54327fbfe2 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
#include "qpid/cluster/Connection.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ProxyInputHandler.h"
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
@@ -40,17 +41,10 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out,
const std::string& id,
const qpid::sys::SecuritySettings& external)
{
- broker::Broker& broker = cluster.getBroker();
- if (broker.getConnectionCounter().allowConnection())
- {
- QPID_LOG(error, "Client max connection count limit exceeded: "
- << broker.getOptions().maxConnections << " connection refused");
- return 0;
- }
if (v == ProtocolVersion(0, 10))
return new ConnectionCodec(v, out, id, cluster, false, false, external);
else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
- return new ConnectionCodec(v, out, id, cluster, true, false, external);
+ return new ConnectionCodec(v, out, id, cluster, true, false, external);
return 0;
}
diff --git a/qpid/cpp/src/tests/run_acl_tests b/qpid/cpp/src/tests/run_acl_tests
index 3a8c03eda6..25241ad75e 100755
--- a/qpid/cpp/src/tests/run_acl_tests
+++ b/qpid/cpp/src/tests/run_acl_tests
@@ -30,9 +30,9 @@ trap stop_brokers INT TERM QUIT
start_brokers() {
../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIR --load-module $ACL_LIB --acl-file policy.acl --auth no --log-to-file local.log > qpidd.port
LOCAL_PORT=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-ip 2 --log-to-file locali.log > qpiddi.port
+ ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-ip 2 --log-to-file locali.log > qpiddi.port
LOCAL_PORTI=`cat qpiddi.port`
- ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-user 2 --log-to-file localu.log > qpiddu.port
+ ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-user 2 --log-to-file localu.log > qpiddu.port
LOCAL_PORTU=`cat qpiddu.port`
}