diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/acl/Acl.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/Acl.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp | 193 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclConnectionCounter.h | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclPlugin.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/management-schema.xml | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/AclModule.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionFactory.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp | 65 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp | 14 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_acl_tests | 4 |
15 files changed, 254 insertions, 163 deletions
diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index 2962a9c1ba..b68816288a 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -51,7 +51,7 @@ using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::acl; Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0), - connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp)) + connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal)) { agent = broker->getManagementAgent(); @@ -60,6 +60,9 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals _qmf::Package packageInit(agent); mgmtObject = new _qmf::Acl (agent, this, broker); agent->addObject (mgmtObject); + mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal); + mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp); + mgmtObject->set_maxConnectionsPerUser(aclValues.aclMaxConnectPerUser); } std::string errorString; if (!readAclFile(errorString)){ @@ -121,6 +124,11 @@ bool Acl::authorise( } +bool Acl::approveConnection(const broker::Connection& conn) +{ + return connectionCounter->approveConnection(conn); +} + bool Acl::result( const AclResult& aclreslt, const std::string& id, diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h index c3451018ef..4893f71ef2 100644 --- a/qpid/cpp/src/qpid/acl/Acl.h +++ b/qpid/cpp/src/qpid/acl/Acl.h @@ -38,6 +38,7 @@ namespace qpid { namespace broker { class Broker; +class Connection; } namespace acl { @@ -45,8 +46,9 @@ class ConnectionCounter; struct AclValues { std::string aclFile; - uint32_t aclMaxConnectPerUser; - uint32_t aclMaxConnectPerIp; + uint16_t aclMaxConnectPerUser; + uint16_t aclMaxConnectPerIp; + uint16_t aclMaxConnectTotal; }; @@ -66,6 +68,9 @@ private: public: Acl (AclValues& av, broker::Broker& b); + /** reportConnectLimit + * issue management counts and alerts for denied connections + */ void reportConnectLimit(const std::string user, const std::string addr); inline virtual bool doTransferAcl() { @@ -87,6 +92,8 @@ public: const std::string& ExchangeName, const std::string& RoutingKey); + virtual bool approveConnection(const broker::Connection& connection); + virtual ~Acl(); private: bool result( diff --git a/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp b/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp index 5d4e3c1544..70f0ca1da8 100644 --- a/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp +++ b/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp @@ -41,30 +41,74 @@ namespace acl { // // // -ConnectionCounter::ConnectionCounter(Acl& a, uint32_t nl, uint32_t hl) : - acl(a), nameLimit(nl), hostLimit(hl) {} +ConnectionCounter::ConnectionCounter(Acl& a, uint16_t nl, uint16_t hl, uint16_t tl) : + acl(a), nameLimit(nl), hostLimit(hl), totalLimit(tl), totalCurrentConnections(0) {} ConnectionCounter::~ConnectionCounter() {} // -// limitCheckLH +// limitApproveLH +// +// Connection creation approver. Return true only if user is under limit. +// Called with lock held. +// +bool ConnectionCounter::limitApproveLH( + connectCountsMap_t& theMap, + const std::string& theName, + uint16_t theLimit, + bool emitLog) { + + bool result(true); + if (theLimit > 0) { + uint16_t count; + connectCountsMap_t::iterator eRef = theMap.find(theName); + if (eRef != theMap.end()) { + count = (uint16_t)(*eRef).second; + result = count <= theLimit; + } else { + // Not found + count = 0; + } + if (emitLog) { + QPID_LOG(trace, "ACL ConnectionApprover IP=" << theName + << " limit=" << theLimit + << " curValue=" << count + << " result=" << (result ? "allow" : "deny")); + } + } + return result; +} + + +// +// countConnectionLH // // Increment the name's count in map and return a comparison against the limit. // called with dataLock already taken // -bool ConnectionCounter::limitCheckLH( - connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) { +bool ConnectionCounter::countConnectionLH( + connectCountsMap_t& theMap, + const std::string& theName, + uint16_t theLimit, + bool emitLog) { bool result(true); + uint16_t count(0); if (theLimit > 0) { connectCountsMap_t::iterator eRef = theMap.find(theName); if (eRef != theMap.end()) { - uint32_t count = (uint32_t)(*eRef).second + 1; + count = (uint16_t)(*eRef).second + 1; (*eRef).second = count; result = count <= theLimit; } else { - theMap[theName] = 1; + theMap[theName] = count = 1; + } + if (emitLog) { + QPID_LOG(trace, "ACL ConnectionApprover user=" << theName + << " limit=" << theLimit + << " curValue=" << count + << " result=" << (result ? "allow" : "deny")); } } return result; @@ -78,12 +122,12 @@ bool ConnectionCounter::limitCheckLH( // called with dataLock already taken // void ConnectionCounter::releaseLH( - connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) { + connectCountsMap_t& theMap, const std::string& theName, uint16_t theLimit) { if (theLimit > 0) { connectCountsMap_t::iterator eRef = theMap.find(theName); if (eRef != theMap.end()) { - uint32_t count = (uint32_t) (*eRef).second; + uint16_t count = (uint16_t) (*eRef).second; assert (count > 0); if (1 == count) { theMap.erase (eRef); @@ -103,52 +147,20 @@ void ConnectionCounter::releaseLH( // connection - called during Connection's constructor // void ConnectionCounter::connection(broker::Connection& connection) { - QPID_LOG(trace, "ACL ConnectionCounter connection IP:" << connection.getMgmtId() - << ", userId:" << connection.getUserId()); - - Mutex::ScopedLock locker(dataLock); - - connectProgressMap[connection.getMgmtId()] = C_CREATED; -} - + QPID_LOG(trace, "ACL ConnectionCounter new connection: " << connection.getMgmtId()); -// -// opened - called when first AMQP frame is received over Connection -// -void ConnectionCounter::opened(broker::Connection& connection) { - QPID_LOG(trace, "ACL ConnectionCounter Opened IP:" << connection.getMgmtId() - << ", userId:" << connection.getUserId()); - - Mutex::ScopedLock locker(dataLock); - - const std::string& userName( connection.getUserId()); const std::string& hostName(getClientHost(connection.getMgmtId())); - // Bump state from CREATED to OPENED - (void) limitCheckLH(connectProgressMap, connection.getMgmtId(), C_OPENED); + Mutex::ScopedLock locker(dataLock); - bool nameOk = limitCheckLH(connectByNameMap, userName, nameLimit); - bool hostOk = limitCheckLH(connectByHostMap, hostName, hostLimit); + // Total connections goes up + totalCurrentConnections += 1; - if (!nameOk) { - // User has too many - acl.reportConnectLimit(userName, hostName); - QPID_LOG(notice, "ACL ConnectionCounter User '" << userName - << "' exceeded maximum allowed connections"); - throw Exception( - QPID_MSG("User '" << userName - << "' exceeded maximum allowed connections")); - } + // Record the fact that this connection exists + connectProgressMap[connection.getMgmtId()] = C_CREATED; - if (!hostOk) { - // Host has too many - acl.reportConnectLimit(userName, hostName); - QPID_LOG(notice, "ACL ConnectionCounter Client host '" << hostName - << "' exceeded maximum allowed connections"); - throw Exception( - QPID_MSG("Client host '" << hostName - << "' exceeded maximum allowed connections")); - } + // Count the connection from this host. + (void) countConnectionLH(connectByHostMap, hostName, hostLimit, false); } @@ -156,7 +168,7 @@ void ConnectionCounter::opened(broker::Connection& connection) { // closed - called during Connection's destructor // void ConnectionCounter::closed(broker::Connection& connection) { - QPID_LOG(trace, "ACL ConnectionCounter Closed IP:" << connection.getMgmtId() + QPID_LOG(trace, "ACL ConnectionCounter closed: " << connection.getMgmtId() << ", userId:" << connection.getUserId()); Mutex::ScopedLock locker(dataLock); @@ -165,32 +177,99 @@ void ConnectionCounter::closed(broker::Connection& connection) { if (eRef != connectProgressMap.end()) { if ((*eRef).second == C_OPENED){ // Normal case: connection was created and opened. - // Decrement in-use counts + // Decrement user in-use counts releaseLH(connectByNameMap, connection.getUserId(), nameLimit); - - releaseLH(connectByHostMap, - getClientHost(connection.getMgmtId()), - hostLimit); } else { // Connection was created but not opened. - // Don't decrement any connection counts. + // Don't decrement user count. } + + // Decrement host in-use count. + releaseLH(connectByHostMap, + getClientHost(connection.getMgmtId()), + hostLimit); + + // destroy connection progress indicator connectProgressMap.erase(eRef); } else { // connection not found in progress map - QPID_LOG(notice, "ACL ConnectionCounter info for '" << connection.getMgmtId() + QPID_LOG(notice, "ACL ConnectionCounter closed info for '" << connection.getMgmtId() << "' not found in connection state pool"); } + + // total connections + totalCurrentConnections -= 1; } // +// approveConnection +// check total connections, connections from IP, connections by user and +// disallow if over any limit +// +bool ConnectionCounter::approveConnection(const broker::Connection& connection) +{ + const std::string& hostName(getClientHost(connection.getMgmtId())); + const std::string& userName( connection.getUserId()); + + Mutex::ScopedLock locker(dataLock); + + // Bump state from CREATED to OPENED + (void) countConnectionLH(connectProgressMap, connection.getMgmtId(), + C_OPENED, false); + + // Approve total connections + bool okTotal = true; + if (totalLimit > 0) { + okTotal = totalCurrentConnections <= totalLimit; + QPID_LOG(trace, "ACL ConnectionApprover totalLimit=" << totalLimit + << " curValue=" << totalCurrentConnections + << " result=" << (okTotal ? "allow" : "deny")); + } + + // Approve by IP host connections + bool okByIP = limitApproveLH(connectByHostMap, hostName, hostLimit, true); + + // Count and Approve the connection by the user + bool okByUser = countConnectionLH(connectByNameMap, userName, nameLimit, true); + + // Emit separate log for each disapproval + if (!okTotal) { + QPID_LOG(error, "Client max total connection count limit of " << totalLimit + << " exceeded by " + << connection.getMgmtId() << ", user: " + << userName << ". Connection refused"); + } + if (!okByIP) { + QPID_LOG(error, "Client max per-host connection count limit of " + << hostLimit << " exceeded by " + << connection.getMgmtId() << ", user: " + << userName << ". Connection refused."); + } + if (!okByUser) { + QPID_LOG(error, "Client max per-user connection count limit of " + << nameLimit << " exceeded by " + << connection.getMgmtId() << ", user: " + << userName << ". Connection refused."); + } + + // Count/Event once for each disapproval + bool result = okTotal && okByIP && okByUser; + if (!result) { + acl.reportConnectLimit(userName, hostName); + } + + return result; +} + +// // getClientIp - given a connection's mgmtId return the client host part. // // TODO: Ideally this would be a method of the connection itself. +// TODO: Verify it works with rdma connection names. // std::string ConnectionCounter::getClientHost(const std::string mgmtId) { diff --git a/qpid/cpp/src/qpid/acl/AclConnectionCounter.h b/qpid/cpp/src/qpid/acl/AclConnectionCounter.h index 31d11540fd..eec8e90256 100644 --- a/qpid/cpp/src/qpid/acl/AclConnectionCounter.h +++ b/qpid/cpp/src/qpid/acl/AclConnectionCounter.h @@ -48,32 +48,52 @@ private: enum CONNECTION_PROGRESS { C_CREATED=1, C_OPENED=2 }; Acl& acl; - uint32_t nameLimit; - uint32_t hostLimit; + uint16_t nameLimit; + uint16_t hostLimit; + uint16_t totalLimit; + uint16_t totalCurrentConnections; qpid::sys::Mutex dataLock; + /** Records per-connection state */ connectCountsMap_t connectProgressMap; + + /** Records per-username counts */ connectCountsMap_t connectByNameMap; + + /** Records per-host counts */ connectCountsMap_t connectByHostMap; + /** Given a connection's management ID, return the client host name */ std::string getClientHost(const std::string mgmtId); - bool limitCheckLH(connectCountsMap_t& theMap, - const std::string& theName, - uint32_t theLimit); + /** Return approval for proposed connection */ + bool limitApproveLH(connectCountsMap_t& theMap, + const std::string& theName, + uint16_t theLimit, + bool emitLog); + + /** Record a connection. + * @return indication if user/host is over its limit */ + bool countConnectionLH(connectCountsMap_t& theMap, + const std::string& theName, + uint16_t theLimit, + bool emitLog); + /** Release a connection */ void releaseLH(connectCountsMap_t& theMap, const std::string& theName, - uint32_t theLimit); + uint16_t theLimit); public: - ConnectionCounter(Acl& acl, uint32_t nl, uint32_t hl); + ConnectionCounter(Acl& acl, uint16_t nl, uint16_t hl, uint16_t tl); ~ConnectionCounter(); + // ConnectionObserver interface void connection(broker::Connection& connection); - void opened(broker::Connection& connection); void closed(broker::Connection& connection); + // Connection counting + bool approveConnection(const broker::Connection& conn); }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/acl/AclPlugin.cpp b/qpid/cpp/src/qpid/acl/AclPlugin.cpp index 6c18cd2749..ebf5e90afe 100644 --- a/qpid/cpp/src/qpid/acl/AclPlugin.cpp +++ b/qpid/cpp/src/qpid/acl/AclPlugin.cpp @@ -39,10 +39,13 @@ struct AclOptions : public Options { AclValues& values; AclOptions(AclValues& v) : Options("ACL Options"), values(v) { + values.aclMaxConnectTotal = 500; addOptions() ("acl-file", optValue(values.aclFile, "FILE"), "The policy file to load from, loaded from data dir") - ("acl-max-connect-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user") - ("acl-max-connect-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address"); + ("max-connections" , optValue(values.aclMaxConnectTotal, "N"), "The maximum combined number of connections allowed. 0 implies no limit.") + ("max-connections-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user. 0 implies no limit.") + ("max-connections-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address. 0 implies no limit.") + ; } }; @@ -69,7 +72,6 @@ struct AclPlugin : public Plugin { oss << b.getDataDir().getPath() << "/" << values.aclFile; values.aclFile = oss.str(); } - acl = new Acl(values, b); b.setAcl(acl.get()); b.addFinalizer(boost::bind(&AclPlugin::shutdown, this)); diff --git a/qpid/cpp/src/qpid/acl/management-schema.xml b/qpid/cpp/src/qpid/acl/management-schema.xml index 19fe37333c..f52c251bed 100644 --- a/qpid/cpp/src/qpid/acl/management-schema.xml +++ b/qpid/cpp/src/qpid/acl/management-schema.xml @@ -17,13 +17,16 @@ --> <class name="Acl"> - <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/> - <property name="policyFile" type="lstr" access="RO" desc="Name of the policy file"/> - <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/> - <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/> - <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/> - <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/> - <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/> + <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/> + <property name="policyFile" type="lstr" access="RO" desc="Name of the policy file"/> + <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/> + <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/> + <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/> + <property name="maxConnections" type="uint16" access="RO" desc="Maximum allowed connections"/> + <property name="maxConnectionsPerIp" type="uint16" access="RO" desc="Maximum allowed connections"/> + <property name="maxConnectionsPerUser" type="uint16" access="RO" desc="Maximum allowed connections"/> + <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/> + <statistic name="connectionDenyCount" type="count64" unit="connection" desc="Number of connections denied"/> <method name="reloadACLFile" desc="Reload the ACL file"/> diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h index ff9281b6fc..7c180439cf 100644 --- a/qpid/cpp/src/qpid/broker/AclModule.h +++ b/qpid/cpp/src/qpid/broker/AclModule.h @@ -113,6 +113,7 @@ namespace acl { namespace broker { + class Connection; class AclModule { @@ -139,6 +140,11 @@ namespace broker { // Add specialized authorise() methods as required. + /** Approve connection by counting connections total, per-IP, and + * per-user. + */ + virtual bool approveConnection (const Connection& connection)=0; + virtual ~AclModule() {}; }; } // namespace broker diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index ed24db14ad..4b7b4cb2a6 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -108,7 +108,6 @@ Broker::Options::Options(const std::string& name) : noDataDir(0), port(DEFAULT_PORT), workerThreads(5), - maxConnections(500), connectionBacklog(10), enableMgmt(1), mgmtPublish(1), @@ -148,7 +147,6 @@ Broker::Options::Options(const std::string& name) : ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") - ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)") @@ -212,7 +210,6 @@ Broker::Broker(const Broker::Options& conf) : inCluster(false), clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), - connectionCounter(conf.maxConnections), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) { @@ -231,7 +228,6 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); - mgmtObject->set_maxConns(conf.maxConnections); mgmtObject->set_connBacklog(conf.connectionBacklog); mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); mgmtObject->set_mgmtPublish(conf.mgmtPublish); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 22a35c0929..83f34a839a 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -103,7 +103,6 @@ class Broker : public sys::Runnable, public Plugin::Target, std::string dataDir; uint16_t port; int workerThreads; - int maxConnections; int connectionBacklog; bool enableMgmt; bool mgmtPublish; @@ -134,26 +133,6 @@ class Broker : public sys::Runnable, public Plugin::Target, std::string getHome(); }; - class ConnectionCounter { - int maxConnections; - int connectionCount; - sys::Mutex connectionCountLock; - public: - ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {}; - void inc_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - connectionCount++; - } - void dec_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - connectionCount--; - } - bool allowConnection() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - return (maxConnections <= connectionCount); - } - }; - private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; @@ -206,7 +185,6 @@ class Broker : public sys::Runnable, public Plugin::Target, bool recovery; bool inCluster, clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - ConnectionCounter connectionCounter; ConsumerFactories consumerFactories; mutable sys::Mutex linkClientPropertiesLock; @@ -322,8 +300,6 @@ class Broker : public sys::Runnable, public Plugin::Target, management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } - ConnectionCounter& getConnectionCounter() {return connectionCounter;} - /** * Never true in a stand-alone broker. In a cluster, return true * to defer delivery of messages deliveredg in a cluster-unsafe diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 03ff3d5793..15fffdfcb1 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -107,7 +107,6 @@ Connection::Connection(ConnectionOutputHandler* out_, broker.getConnectionObservers().connection(*this); // In a cluster, allow adding the management object to be delayed. if (!delayManagement) addManagementObject(); - if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); } void Connection::addManagementObject() { @@ -151,8 +150,6 @@ Connection::~Connection() if (linkHeartbeatTimer) { linkHeartbeatTimer->cancel(); } - - if (!isShadow()) broker.getConnectionCounter().dec_connectionCount(); } void Connection::received(framing::AMQFrame& frame) { diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp index 9e0020812b..d5d24ca629 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,11 +40,6 @@ ConnectionFactory::~ConnectionFactory() {} sys::ConnectionCodec* ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) { ConnectionPtr c(new amqp_0_10::Connection(out, id, false)); c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false))); @@ -62,5 +57,5 @@ ConnectionFactory::create(sys::OutputControl& out, const std::string& id, return c.release(); } - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 57926fa938..09df180fcc 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,7 @@ # include "config.h" #endif +#include "qpid/broker/AclModule.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" @@ -170,7 +171,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti if (c.getBroker().getOptions().auth) { if ( isShadow ) return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); - else + else return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); } else { QPID_LOG(debug, "SASL: No Authentication Performed"); @@ -179,7 +180,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti } -NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), +NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), realm(c.getBroker().getOptions().realm), encrypt(e) {} NullAuthenticator::~NullAuthenticator() {} @@ -215,7 +216,7 @@ void NullAuthenticator::start(const string& mechanism, const string* response) } else if (i != string::npos) { //authorization id is first null delimited field uid = response->substr(0, i); - }//else not a valid SASL PLAIN response, throw error? + }//else not a valid SASL PLAIN response, throw error? if (!uid.empty()) { //append realm if it has not already been added i = uid.find(realm); @@ -227,7 +228,12 @@ void NullAuthenticator::start(const string& mechanism, const string* response) } } else { connection.setUserId("anonymous"); - } + } + AclModule* acl = connection.getBroker().getAcl(); + if (acl && !acl->approveConnection(connection)) + { + throw ConnectionForcedException("User connection denied by configured limit"); + } client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax()); } @@ -241,7 +247,7 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t) #if HAVE_SASL -CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : +CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt) { init(); @@ -272,17 +278,17 @@ void CyrusAuthenticator::init() NULL, /* Callbacks */ 0, /* Connection flags */ &sasl_conn); - + if (SASL_OK != code) { QPID_LOG(error, "SASL: Connection creation failed: [" << code << "] " << sasl_errdetail(sasl_conn)); - + // TODO: Change this to an exception signaling // server error, when one is available throw ConnectionForcedException("Unable to perform authentication"); } sasl_security_properties_t secprops; - + //TODO: should the actual SSF values be configurable here? secprops.min_ssf = encrypt ? 10: 0; secprops.max_ssf = 256; @@ -320,14 +326,14 @@ void CyrusAuthenticator::init() secprops.property_values = 0; secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */ /* - * The nodict flag restricts SASL authentication mechanisms - * to those that are not susceptible to dictionary attacks. - * They are: + * The nodict flag restricts SASL authentication mechanisms + * to those that are not susceptible to dictionary attacks. + * They are: * SRP * PASSDSS-3DES-1 * EXTERNAL */ - if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY; + if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY; int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops); if (result != SASL_OK) { throw framing::InternalErrorException(QPID_MSG("SASL error: " << result)); @@ -372,10 +378,10 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms) "", separator, "", &list, &list_len, &count); - + if (SASL_OK != code) { QPID_LOG(info, "SASL: Mechanism listing failed: " << sasl_errdetail(sasl_conn)); - + // TODO: Change this to an exception signaling // server error, when one is available throw ConnectionForcedException("Mechanism listing failed"); @@ -383,17 +389,17 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms) string mechanism; unsigned int start; unsigned int end; - + QPID_LOG(info, "SASL: Mechanism list: " << list); - + end = 0; do { start = end; - + // Seek to end of next mechanism while (end < list_len && separator[0] != list[end]) end++; - + // Record the mechanism mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value(string(list, start, end - start)))); end++; @@ -405,20 +411,20 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response) { const char *challenge; unsigned int challenge_len; - + // This should be at same debug level as mech list in getMechanisms(). QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism); int code = sasl_server_start(sasl_conn, mechanism.c_str(), (response ? response->c_str() : 0), (response ? response->size() : 0), &challenge, &challenge_len); - + processAuthenticationStep(code, challenge, challenge_len); qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); - if ( cnxMgmt ) + if ( cnxMgmt ) cnxMgmt->set_saslMechanism(mechanism); } - + void CyrusAuthenticator::step(const string& response) { const char *challenge; @@ -440,10 +446,17 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen // authentication failure, when one is available throw ConnectionForcedException("Authenticated username unavailable"); } - QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid); connection.setUserId(uid); + AclModule* acl = connection.getBroker().getAcl(); + if (acl && !acl->approveConnection(connection)) + { + throw ConnectionForcedException("User connection denied by configured limit"); + } + + QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid); + client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax()); } else if (SASL_CONTINUE == code) { string challenge_str(challenge, challenge_len); @@ -491,7 +504,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); } qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); - if ( cnxMgmt ) + if ( cnxMgmt ) cnxMgmt->set_saslSsf(ssf); return securityLayer; } diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp index 754b443c22..757f6efc59 100644 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -41,11 +41,6 @@ SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {} sys::ConnectionCodec* SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) { SecureConnectionPtr sc(new SecureConnection()); CodecPtr c(new amqp_0_10::Connection(out, id, false)); @@ -71,5 +66,5 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, return sc.release(); } - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp index d0ba8abfb3..54327fbfe2 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ #include "qpid/cluster/Connection.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ProxyInputHandler.h" +#include "qpid/broker/AclModule.h" #include "qpid/broker/Connection.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" @@ -40,17 +41,10 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& external) { - broker::Broker& broker = cluster.getBroker(); - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " - << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) return new ConnectionCodec(v, out, id, cluster, false, false, external); else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection - return new ConnectionCodec(v, out, id, cluster, true, false, external); + return new ConnectionCodec(v, out, id, cluster, true, false, external); return 0; } diff --git a/qpid/cpp/src/tests/run_acl_tests b/qpid/cpp/src/tests/run_acl_tests index 3a8c03eda6..25241ad75e 100755 --- a/qpid/cpp/src/tests/run_acl_tests +++ b/qpid/cpp/src/tests/run_acl_tests @@ -30,9 +30,9 @@ trap stop_brokers INT TERM QUIT start_brokers() { ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIR --load-module $ACL_LIB --acl-file policy.acl --auth no --log-to-file local.log > qpidd.port LOCAL_PORT=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-ip 2 --log-to-file locali.log > qpiddi.port + ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-ip 2 --log-to-file locali.log > qpiddi.port LOCAL_PORTI=`cat qpiddi.port` - ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-user 2 --log-to-file localu.log > qpiddu.port + ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-user 2 --log-to-file localu.log > qpiddu.port LOCAL_PORTU=`cat qpiddu.port` } |