diff options
author | Charles E. Rolke <chug@apache.org> | 2012-06-11 02:09:38 +0000 |
---|---|---|
committer | Charles E. Rolke <chug@apache.org> | 2012-06-11 02:09:38 +0000 |
commit | 700cca77312bc5cb0e1caf6475326ae18c6b1bb9 (patch) | |
tree | ce1dd39ea2ac706097c0899cf2461e142d543686 /qpid/cpp/src | |
parent | 67aaa894835321c731bb9ee08652f93a8cd90832 (diff) | |
download | qpid-python-700cca77312bc5cb0e1caf6475326ae18c6b1bb9.tar.gz |
QPID-4022 C++ Broker connection limits by host ip and by user name.
Rework the strategy to deny connections based on configured limits.
All limits checked in one function from points in broker when the
user's authenticated name is known. Denied connections receive the
AMQP exception instead of getting the socket closed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1348707 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/acl/Acl.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/Acl.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp | 193 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclConnectionCounter.h | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclPlugin.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/management-schema.xml | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/AclModule.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionFactory.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp | 65 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp | 14 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_acl_tests | 4 |
15 files changed, 254 insertions, 163 deletions
diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index 2962a9c1ba..b68816288a 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -51,7 +51,7 @@ using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::acl; Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0), - connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp)) + connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal)) { agent = broker->getManagementAgent(); @@ -60,6 +60,9 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals _qmf::Package packageInit(agent); mgmtObject = new _qmf::Acl (agent, this, broker); agent->addObject (mgmtObject); + mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal); + mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp); + mgmtObject->set_maxConnectionsPerUser(aclValues.aclMaxConnectPerUser); } std::string errorString; if (!readAclFile(errorString)){ @@ -121,6 +124,11 @@ bool Acl::authorise( } +bool Acl::approveConnection(const broker::Connection& conn) +{ + return connectionCounter->approveConnection(conn); +} + bool Acl::result( const AclResult& aclreslt, const std::string& id, diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h index c3451018ef..4893f71ef2 100644 --- a/qpid/cpp/src/qpid/acl/Acl.h +++ b/qpid/cpp/src/qpid/acl/Acl.h @@ -38,6 +38,7 @@ namespace qpid { namespace broker { class Broker; +class Connection; } namespace acl { @@ -45,8 +46,9 @@ class ConnectionCounter; struct AclValues { std::string aclFile; - uint32_t aclMaxConnectPerUser; - uint32_t aclMaxConnectPerIp; + uint16_t aclMaxConnectPerUser; + uint16_t aclMaxConnectPerIp; + uint16_t aclMaxConnectTotal; }; @@ -66,6 +68,9 @@ private: public: Acl (AclValues& av, broker::Broker& b); + /** reportConnectLimit + * issue management counts and alerts for denied connections + */ void reportConnectLimit(const std::string user, const std::string addr); inline virtual bool doTransferAcl() { @@ -87,6 +92,8 @@ public: const std::string& ExchangeName, const std::string& RoutingKey); + virtual bool approveConnection(const broker::Connection& connection); + virtual ~Acl(); private: bool result( diff --git a/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp b/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp index 5d4e3c1544..70f0ca1da8 100644 --- a/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp +++ b/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp @@ -41,30 +41,74 @@ namespace acl { // // // -ConnectionCounter::ConnectionCounter(Acl& a, uint32_t nl, uint32_t hl) : - acl(a), nameLimit(nl), hostLimit(hl) {} +ConnectionCounter::ConnectionCounter(Acl& a, uint16_t nl, uint16_t hl, uint16_t tl) : + acl(a), nameLimit(nl), hostLimit(hl), totalLimit(tl), totalCurrentConnections(0) {} ConnectionCounter::~ConnectionCounter() {} // -// limitCheckLH +// limitApproveLH +// +// Connection creation approver. Return true only if user is under limit. +// Called with lock held. +// +bool ConnectionCounter::limitApproveLH( + connectCountsMap_t& theMap, + const std::string& theName, + uint16_t theLimit, + bool emitLog) { + + bool result(true); + if (theLimit > 0) { + uint16_t count; + connectCountsMap_t::iterator eRef = theMap.find(theName); + if (eRef != theMap.end()) { + count = (uint16_t)(*eRef).second; + result = count <= theLimit; + } else { + // Not found + count = 0; + } + if (emitLog) { + QPID_LOG(trace, "ACL ConnectionApprover IP=" << theName + << " limit=" << theLimit + << " curValue=" << count + << " result=" << (result ? "allow" : "deny")); + } + } + return result; +} + + +// +// countConnectionLH // // Increment the name's count in map and return a comparison against the limit. // called with dataLock already taken // -bool ConnectionCounter::limitCheckLH( - connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) { +bool ConnectionCounter::countConnectionLH( + connectCountsMap_t& theMap, + const std::string& theName, + uint16_t theLimit, + bool emitLog) { bool result(true); + uint16_t count(0); if (theLimit > 0) { connectCountsMap_t::iterator eRef = theMap.find(theName); if (eRef != theMap.end()) { - uint32_t count = (uint32_t)(*eRef).second + 1; + count = (uint16_t)(*eRef).second + 1; (*eRef).second = count; result = count <= theLimit; } else { - theMap[theName] = 1; + theMap[theName] = count = 1; + } + if (emitLog) { + QPID_LOG(trace, "ACL ConnectionApprover user=" << theName + << " limit=" << theLimit + << " curValue=" << count + << " result=" << (result ? "allow" : "deny")); } } return result; @@ -78,12 +122,12 @@ bool ConnectionCounter::limitCheckLH( // called with dataLock already taken // void ConnectionCounter::releaseLH( - connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) { + connectCountsMap_t& theMap, const std::string& theName, uint16_t theLimit) { if (theLimit > 0) { connectCountsMap_t::iterator eRef = theMap.find(theName); if (eRef != theMap.end()) { - uint32_t count = (uint32_t) (*eRef).second; + uint16_t count = (uint16_t) (*eRef).second; assert (count > 0); if (1 == count) { theMap.erase (eRef); @@ -103,52 +147,20 @@ void ConnectionCounter::releaseLH( // connection - called during Connection's constructor // void ConnectionCounter::connection(broker::Connection& connection) { - QPID_LOG(trace, "ACL ConnectionCounter connection IP:" << connection.getMgmtId() - << ", userId:" << connection.getUserId()); - - Mutex::ScopedLock locker(dataLock); - - connectProgressMap[connection.getMgmtId()] = C_CREATED; -} - + QPID_LOG(trace, "ACL ConnectionCounter new connection: " << connection.getMgmtId()); -// -// opened - called when first AMQP frame is received over Connection -// -void ConnectionCounter::opened(broker::Connection& connection) { - QPID_LOG(trace, "ACL ConnectionCounter Opened IP:" << connection.getMgmtId() - << ", userId:" << connection.getUserId()); - - Mutex::ScopedLock locker(dataLock); - - const std::string& userName( connection.getUserId()); const std::string& hostName(getClientHost(connection.getMgmtId())); - // Bump state from CREATED to OPENED - (void) limitCheckLH(connectProgressMap, connection.getMgmtId(), C_OPENED); + Mutex::ScopedLock locker(dataLock); - bool nameOk = limitCheckLH(connectByNameMap, userName, nameLimit); - bool hostOk = limitCheckLH(connectByHostMap, hostName, hostLimit); + // Total connections goes up + totalCurrentConnections += 1; - if (!nameOk) { - // User has too many - acl.reportConnectLimit(userName, hostName); - QPID_LOG(notice, "ACL ConnectionCounter User '" << userName - << "' exceeded maximum allowed connections"); - throw Exception( - QPID_MSG("User '" << userName - << "' exceeded maximum allowed connections")); - } + // Record the fact that this connection exists + connectProgressMap[connection.getMgmtId()] = C_CREATED; - if (!hostOk) { - // Host has too many - acl.reportConnectLimit(userName, hostName); - QPID_LOG(notice, "ACL ConnectionCounter Client host '" << hostName - << "' exceeded maximum allowed connections"); - throw Exception( - QPID_MSG("Client host '" << hostName - << "' exceeded maximum allowed connections")); - } + // Count the connection from this host. + (void) countConnectionLH(connectByHostMap, hostName, hostLimit, false); } @@ -156,7 +168,7 @@ void ConnectionCounter::opened(broker::Connection& connection) { // closed - called during Connection's destructor // void ConnectionCounter::closed(broker::Connection& connection) { - QPID_LOG(trace, "ACL ConnectionCounter Closed IP:" << connection.getMgmtId() + QPID_LOG(trace, "ACL ConnectionCounter closed: " << connection.getMgmtId() << ", userId:" << connection.getUserId()); Mutex::ScopedLock locker(dataLock); @@ -165,32 +177,99 @@ void ConnectionCounter::closed(broker::Connection& connection) { if (eRef != connectProgressMap.end()) { if ((*eRef).second == C_OPENED){ // Normal case: connection was created and opened. - // Decrement in-use counts + // Decrement user in-use counts releaseLH(connectByNameMap, connection.getUserId(), nameLimit); - - releaseLH(connectByHostMap, - getClientHost(connection.getMgmtId()), - hostLimit); } else { // Connection was created but not opened. - // Don't decrement any connection counts. + // Don't decrement user count. } + + // Decrement host in-use count. + releaseLH(connectByHostMap, + getClientHost(connection.getMgmtId()), + hostLimit); + + // destroy connection progress indicator connectProgressMap.erase(eRef); } else { // connection not found in progress map - QPID_LOG(notice, "ACL ConnectionCounter info for '" << connection.getMgmtId() + QPID_LOG(notice, "ACL ConnectionCounter closed info for '" << connection.getMgmtId() << "' not found in connection state pool"); } + + // total connections + totalCurrentConnections -= 1; } // +// approveConnection +// check total connections, connections from IP, connections by user and +// disallow if over any limit +// +bool ConnectionCounter::approveConnection(const broker::Connection& connection) +{ + const std::string& hostName(getClientHost(connection.getMgmtId())); + const std::string& userName( connection.getUserId()); + + Mutex::ScopedLock locker(dataLock); + + // Bump state from CREATED to OPENED + (void) countConnectionLH(connectProgressMap, connection.getMgmtId(), + C_OPENED, false); + + // Approve total connections + bool okTotal = true; + if (totalLimit > 0) { + okTotal = totalCurrentConnections <= totalLimit; + QPID_LOG(trace, "ACL ConnectionApprover totalLimit=" << totalLimit + << " curValue=" << totalCurrentConnections + << " result=" << (okTotal ? "allow" : "deny")); + } + + // Approve by IP host connections + bool okByIP = limitApproveLH(connectByHostMap, hostName, hostLimit, true); + + // Count and Approve the connection by the user + bool okByUser = countConnectionLH(connectByNameMap, userName, nameLimit, true); + + // Emit separate log for each disapproval + if (!okTotal) { + QPID_LOG(error, "Client max total connection count limit of " << totalLimit + << " exceeded by " + << connection.getMgmtId() << ", user: " + << userName << ". Connection refused"); + } + if (!okByIP) { + QPID_LOG(error, "Client max per-host connection count limit of " + << hostLimit << " exceeded by " + << connection.getMgmtId() << ", user: " + << userName << ". Connection refused."); + } + if (!okByUser) { + QPID_LOG(error, "Client max per-user connection count limit of " + << nameLimit << " exceeded by " + << connection.getMgmtId() << ", user: " + << userName << ". Connection refused."); + } + + // Count/Event once for each disapproval + bool result = okTotal && okByIP && okByUser; + if (!result) { + acl.reportConnectLimit(userName, hostName); + } + + return result; +} + +// // getClientIp - given a connection's mgmtId return the client host part. // // TODO: Ideally this would be a method of the connection itself. +// TODO: Verify it works with rdma connection names. // std::string ConnectionCounter::getClientHost(const std::string mgmtId) { diff --git a/qpid/cpp/src/qpid/acl/AclConnectionCounter.h b/qpid/cpp/src/qpid/acl/AclConnectionCounter.h index 31d11540fd..eec8e90256 100644 --- a/qpid/cpp/src/qpid/acl/AclConnectionCounter.h +++ b/qpid/cpp/src/qpid/acl/AclConnectionCounter.h @@ -48,32 +48,52 @@ private: enum CONNECTION_PROGRESS { C_CREATED=1, C_OPENED=2 }; Acl& acl; - uint32_t nameLimit; - uint32_t hostLimit; + uint16_t nameLimit; + uint16_t hostLimit; + uint16_t totalLimit; + uint16_t totalCurrentConnections; qpid::sys::Mutex dataLock; + /** Records per-connection state */ connectCountsMap_t connectProgressMap; + + /** Records per-username counts */ connectCountsMap_t connectByNameMap; + + /** Records per-host counts */ connectCountsMap_t connectByHostMap; + /** Given a connection's management ID, return the client host name */ std::string getClientHost(const std::string mgmtId); - bool limitCheckLH(connectCountsMap_t& theMap, - const std::string& theName, - uint32_t theLimit); + /** Return approval for proposed connection */ + bool limitApproveLH(connectCountsMap_t& theMap, + const std::string& theName, + uint16_t theLimit, + bool emitLog); + + /** Record a connection. + * @return indication if user/host is over its limit */ + bool countConnectionLH(connectCountsMap_t& theMap, + const std::string& theName, + uint16_t theLimit, + bool emitLog); + /** Release a connection */ void releaseLH(connectCountsMap_t& theMap, const std::string& theName, - uint32_t theLimit); + uint16_t theLimit); public: - ConnectionCounter(Acl& acl, uint32_t nl, uint32_t hl); + ConnectionCounter(Acl& acl, uint16_t nl, uint16_t hl, uint16_t tl); ~ConnectionCounter(); + // ConnectionObserver interface void connection(broker::Connection& connection); - void opened(broker::Connection& connection); void closed(broker::Connection& connection); + // Connection counting + bool approveConnection(const broker::Connection& conn); }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/acl/AclPlugin.cpp b/qpid/cpp/src/qpid/acl/AclPlugin.cpp index 6c18cd2749..ebf5e90afe 100644 --- a/qpid/cpp/src/qpid/acl/AclPlugin.cpp +++ b/qpid/cpp/src/qpid/acl/AclPlugin.cpp @@ -39,10 +39,13 @@ struct AclOptions : public Options { AclValues& values; AclOptions(AclValues& v) : Options("ACL Options"), values(v) { + values.aclMaxConnectTotal = 500; addOptions() ("acl-file", optValue(values.aclFile, "FILE"), "The policy file to load from, loaded from data dir") - ("acl-max-connect-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user") - ("acl-max-connect-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address"); + ("max-connections" , optValue(values.aclMaxConnectTotal, "N"), "The maximum combined number of connections allowed. 0 implies no limit.") + ("max-connections-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user. 0 implies no limit.") + ("max-connections-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address. 0 implies no limit.") + ; } }; @@ -69,7 +72,6 @@ struct AclPlugin : public Plugin { oss << b.getDataDir().getPath() << "/" << values.aclFile; values.aclFile = oss.str(); } - acl = new Acl(values, b); b.setAcl(acl.get()); b.addFinalizer(boost::bind(&AclPlugin::shutdown, this)); diff --git a/qpid/cpp/src/qpid/acl/management-schema.xml b/qpid/cpp/src/qpid/acl/management-schema.xml index 19fe37333c..f52c251bed 100644 --- a/qpid/cpp/src/qpid/acl/management-schema.xml +++ b/qpid/cpp/src/qpid/acl/management-schema.xml @@ -17,13 +17,16 @@ --> <class name="Acl"> - <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/> - <property name="policyFile" type="lstr" access="RO" desc="Name of the policy file"/> - <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/> - <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/> - <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/> - <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/> - <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/> + <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/> + <property name="policyFile" type="lstr" access="RO" desc="Name of the policy file"/> + <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/> + <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/> + <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/> + <property name="maxConnections" type="uint16" access="RO" desc="Maximum allowed connections"/> + <property name="maxConnectionsPerIp" type="uint16" access="RO" desc="Maximum allowed connections"/> + <property name="maxConnectionsPerUser" type="uint16" access="RO" desc="Maximum allowed connections"/> + <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/> + <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/> <method name="reloadACLFile" desc="Reload the ACL file"/> diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h index ff9281b6fc..7c180439cf 100644 --- a/qpid/cpp/src/qpid/broker/AclModule.h +++ b/qpid/cpp/src/qpid/broker/AclModule.h @@ -113,6 +113,7 @@ namespace acl { namespace broker { + class Connection; class AclModule { @@ -139,6 +140,11 @@ namespace broker { // Add specialized authorise() methods as required. + /** Approve connection by counting connections total, per-IP, and + * per-user. + */ + virtual bool approveConnection (const Connection& connection)=0; + virtual ~AclModule() {}; }; } // namespace broker diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index ed24db14ad..4b7b4cb2a6 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -108,7 +108,6 @@ Broker::Options::Options(const std::string& name) : noDataDir(0), port(DEFAULT_PORT), workerThreads(5), - maxConnections(500), connectionBacklog(10), enableMgmt(1), mgmtPublish(1), @@ -148,7 +147,6 @@ Broker::Options::Options(const std::string& name) : ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") - ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)") @@ -212,7 +210,6 @@ Broker::Broker(const Broker::Options& conf) : inCluster(false), clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), - connectionCounter(conf.maxConnections), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) { @@ -231,7 +228,6 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); - mgmtObject->set_maxConns(conf.maxConnections); mgmtObject->set_connBacklog(conf.connectionBacklog); mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); mgmtObject->set_mgmtPublish(conf.mgmtPublish); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 22a35c0929..83f34a839a 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -103,7 +103,6 @@ class Broker : public sys::Runnable, public Plugin::Target, std::string dataDir; uint16_t port; int workerThreads; - int maxConnections; int connectionBacklog; bool enableMgmt; bool mgmtPublish; @@ -134,26 +133,6 @@ class Broker : public sys::Runnable, public Plugin::Target, std::string getHome(); }; - class ConnectionCounter { - int maxConnections; - int connectionCount; - sys::Mutex connectionCountLock; - public: - ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {}; - void inc_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - connectionCount++; - } - void dec_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - connectionCount--; - } - bool allowConnection() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - return (maxConnections <= connectionCount); - } - }; - private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; @@ -206,7 +185,6 @@ class Broker : public sys::Runnable, public Plugin::Target, bool recovery; bool inCluster, clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - ConnectionCounter connectionCounter; ConsumerFactories consumerFactories; mutable sys::Mutex linkClientPropertiesLock; @@ -322,8 +300,6 @@ class Broker : public sys::Runnable, public Plugin::Target, management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } - ConnectionCounter& getConnectionCounter() {return connectionCounter;} - /** * Never true in a stand-alone broker. In a cluster, return true * to defer delivery of messages deliveredg in a cluster-unsafe diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 03ff3d5793..15fffdfcb1 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -107,7 +107,6 @@ Connection::Connection(ConnectionOutputHandler* out_, broker.getConnectionObservers().connection(*this); // In a cluster, allow adding the management object to be delayed. if (!delayManagement) addManagementObject(); - if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); } void Connection::addManagementObject() { @@ -151,8 +150,6 @@ Connection::~Connection() if (linkHeartbeatTimer) { linkHeartbeatTimer->cancel(); } - - if (!isShadow()) broker.getConnectionCounter().dec_connectionCount(); } void Connection::received(framing::AMQFrame& frame) { diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp index 9e0020812b..d5d24ca629 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,11 +40,6 @@ ConnectionFactory::~ConnectionFactory() {} sys::ConnectionCodec* ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) { ConnectionPtr c(new amqp_0_10::Connection(out, id, false)); c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false))); @@ -62,5 +57,5 @@ ConnectionFactory::create(sys::OutputControl& out, const std::string& id, return c.release(); } - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 57926fa938..09df180fcc 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,7 @@ # include "config.h" #endif +#include "qpid/broker/AclModule.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" @@ -170,7 +171,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti if (c.getBroker().getOptions().auth) { if ( isShadow ) return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); - else + else return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); } else { QPID_LOG(debug, "SASL: No Authentication Performed"); @@ -179,7 +180,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti } -NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), +NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), realm(c.getBroker().getOptions().realm), encrypt(e) {} NullAuthenticator::~NullAuthenticator() {} @@ -215,7 +216,7 @@ void NullAuthenticator::start(const string& mechanism, const string* response) } else if (i != string::npos) { //authorization id is first null delimited field uid = response->substr(0, i); - }//else not a valid SASL PLAIN response, throw error? + }//else not a valid SASL PLAIN response, throw error? if (!uid.empty()) { //append realm if it has not already been added i = uid.find(realm); @@ -227,7 +228,12 @@ void NullAuthenticator::start(const string& mechanism, const string* response) } } else { connection.setUserId("anonymous"); - } + } + AclModule* acl = connection.getBroker().getAcl(); + if (acl && !acl->approveConnection(connection)) + { + throw ConnectionForcedException("User connection denied by configured limit"); + } client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax()); } @@ -241,7 +247,7 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t) #if HAVE_SASL -CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : +CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt) { init(); @@ -272,17 +278,17 @@ void CyrusAuthenticator::init() NULL, /* Callbacks */ 0, /* Connection flags */ &sasl_conn); - + if (SASL_OK != code) { QPID_LOG(error, "SASL: Connection creation failed: [" << code << "] " << sasl_errdetail(sasl_conn)); - + // TODO: Change this to an exception signaling // server error, when one is available throw ConnectionForcedException("Unable to perform authentication"); } sasl_security_properties_t secprops; - + //TODO: should the actual SSF values be configurable here? secprops.min_ssf = encrypt ? 10: 0; secprops.max_ssf = 256; @@ -320,14 +326,14 @@ void CyrusAuthenticator::init() secprops.property_values = 0; secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */ /* - * The nodict flag restricts SASL authentication mechanisms - * to those that are not susceptible to dictionary attacks. - * They are: + * The nodict flag restricts SASL authentication mechanisms + * to those that are not susceptible to dictionary attacks. + * They are: * SRP * PASSDSS-3DES-1 * EXTERNAL */ - if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY; + if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY; int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops); if (result != SASL_OK) { throw framing::InternalErrorException(QPID_MSG("SASL error: " << result)); @@ -372,10 +378,10 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms) "", separator, "", &list, &list_len, &count); - + if (SASL_OK != code) { QPID_LOG(info, "SASL: Mechanism listing failed: " << sasl_errdetail(sasl_conn)); - + // TODO: Change this to an exception signaling // server error, when one is available throw ConnectionForcedException("Mechanism listing failed"); @@ -383,17 +389,17 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms) string mechanism; unsigned int start; unsigned int end; - + QPID_LOG(info, "SASL: Mechanism list: " << list); - + end = 0; do { start = end; - + // Seek to end of next mechanism while (end < list_len && separator[0] != list[end]) end++; - + // Record the mechanism mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value(string(list, start, end - start)))); end++; @@ -405,20 +411,20 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response) { const char *challenge; unsigned int challenge_len; - + // This should be at same debug level as mech list in getMechanisms(). QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism); int code = sasl_server_start(sasl_conn, mechanism.c_str(), (response ? response->c_str() : 0), (response ? response->size() : 0), &challenge, &challenge_len); - + processAuthenticationStep(code, challenge, challenge_len); qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); - if ( cnxMgmt ) + if ( cnxMgmt ) cnxMgmt->set_saslMechanism(mechanism); } - + void CyrusAuthenticator::step(const string& response) { const char *challenge; @@ -440,10 +446,17 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen // authentication failure, when one is available throw ConnectionForcedException("Authenticated username unavailable"); } - QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid); connection.setUserId(uid); + AclModule* acl = connection.getBroker().getAcl(); + if (acl && !acl->approveConnection(connection)) + { + throw ConnectionForcedException("User connection denied by configured limit"); + } + + QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid); + client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax()); } else if (SASL_CONTINUE == code) { string challenge_str(challenge, challenge_len); @@ -491,7 +504,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); } qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); - if ( cnxMgmt ) + if ( cnxMgmt ) cnxMgmt->set_saslSsf(ssf); return securityLayer; } diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp index 754b443c22..757f6efc59 100644 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -41,11 +41,6 @@ SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {} sys::ConnectionCodec* SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) { SecureConnectionPtr sc(new SecureConnection()); CodecPtr c(new amqp_0_10::Connection(out, id, false)); @@ -71,5 +66,5 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, return sc.release(); } - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp index d0ba8abfb3..54327fbfe2 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ #include "qpid/cluster/Connection.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ProxyInputHandler.h" +#include "qpid/broker/AclModule.h" #include "qpid/broker/Connection.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" @@ -40,17 +41,10 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& external) { - broker::Broker& broker = cluster.getBroker(); - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " - << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) return new ConnectionCodec(v, out, id, cluster, false, false, external); else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection - return new ConnectionCodec(v, out, id, cluster, true, false, external); + return new ConnectionCodec(v, out, id, cluster, true, false, external); return 0; } diff --git a/qpid/cpp/src/tests/run_acl_tests b/qpid/cpp/src/tests/run_acl_tests index 3a8c03eda6..25241ad75e 100755 --- a/qpid/cpp/src/tests/run_acl_tests +++ b/qpid/cpp/src/tests/run_acl_tests @@ -30,9 +30,9 @@ trap stop_brokers INT TERM QUIT start_brokers() { ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIR --load-module $ACL_LIB --acl-file policy.acl --auth no --log-to-file local.log > qpidd.port LOCAL_PORT=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-ip 2 --log-to-file locali.log > qpiddi.port + ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-ip 2 --log-to-file locali.log > qpiddi.port LOCAL_PORTI=`cat qpiddi.port` - ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-user 2 --log-to-file localu.log > qpiddu.port + ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-user 2 --log-to-file localu.log > qpiddu.port LOCAL_PORTU=`cat qpiddu.port` } |