diff options
author | Charles E. Rolke <chug@apache.org> | 2012-04-27 19:26:18 +0000 |
---|---|---|
committer | Charles E. Rolke <chug@apache.org> | 2012-04-27 19:26:18 +0000 |
commit | 9eb4edcd1a30aa8c7d6005a5e3d13189f3d4265d (patch) | |
tree | b8a01304014f4d0287ad7259618133d4f5d7c879 | |
parent | 2b9f9ce817339547ad64ca7b9442ca8105e6e8b3 (diff) | |
download | qpid-python-9eb4edcd1a30aa8c7d6005a5e3d13189f3d4265d.tar.gz |
QPID-2616 Count and limit client connections
Limit client connections by name and by host address.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1331549 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | qpid/cpp/src/acl.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/Acl.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/Acl.h | 93 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp | 211 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclConnectionCounter.h | 79 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclPlugin.cpp | 4 |
7 files changed, 353 insertions, 47 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 9045d2a208..cc882f80ca 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -594,6 +594,8 @@ if (BUILD_ACL) set (acl_SOURCES qpid/acl/Acl.cpp qpid/acl/Acl.h + qpid/acl/AclConnectionCounter.cpp + qpid/acl/AclConnectionCounter.h qpid/acl/AclData.cpp qpid/acl/AclData.h qpid/acl/AclPlugin.cpp diff --git a/qpid/cpp/src/acl.mk b/qpid/cpp/src/acl.mk index b8e2ff0e13..0301f8c754 100644 --- a/qpid/cpp/src/acl.mk +++ b/qpid/cpp/src/acl.mk @@ -24,6 +24,8 @@ dmoduleexec_LTLIBRARIES += acl.la acl_la_SOURCES = \ qpid/acl/Acl.cpp \ qpid/acl/Acl.h \ + qpid/acl/AclConnectionCounter.cpp \ + qpid/acl/AclConnectionCounter.h \ qpid/acl/AclData.cpp \ qpid/acl/AclData.h \ qpid/acl/AclPlugin.cpp \ diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index db77abf8c1..71e8c8b564 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -17,6 +17,7 @@ */ #include "qpid/acl/Acl.h" +#include "qpid/acl/AclConnectionCounter.h" #include "qpid/acl/AclData.h" #include "qpid/acl/AclValidator.h" #include "qpid/sys/Mutex.h" @@ -48,7 +49,8 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::acl; -Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0) +Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0), + connectionCounter(new ConnectionCounter(aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp)) { agent = broker->getManagementAgent(); @@ -63,6 +65,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals throw Exception("Could not read ACL file " + errorString); if (mgmtObject!=0) mgmtObject->set_enforcingAcl(0); } + broker->getConnectionObservers().add(connectionCounter); QPID_LOG(info, "ACL Plugin loaded"); if (mgmtObject!=0) mgmtObject->set_enforcingAcl(1); } @@ -267,7 +270,9 @@ Manageable::status_t Acl::lookupPublish(qpid::management::Args& args, std::strin } -Acl::~Acl(){} +Acl::~Acl(){ + broker->getConnectionObservers().remove(connectionCounter); +} ManagementObject* Acl::GetManagementObject(void) const { diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h index a1d3422a0a..c65a06d317 100644 --- a/qpid/cpp/src/qpid/acl/Acl.h +++ b/qpid/cpp/src/qpid/acl/Acl.h @@ -30,6 +30,7 @@ #include "qmf/org/apache/qpid/acl/Acl.h" #include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> #include <map> #include <string> @@ -40,9 +41,12 @@ class Broker; } namespace acl { +class ConnectionCounter; struct AclValues { - std::string aclFile; + std::string aclFile; + uint32_t aclMaxConnectPerUser; + uint32_t aclMaxConnectPerIp; }; @@ -50,55 +54,56 @@ class Acl : public broker::AclModule, public RefCounted, public management::Mana { private: - acl::AclValues aclValues; - broker::Broker* broker; - bool transferAcl; - boost::shared_ptr<AclData> data; - qmf::org::apache::qpid::acl::Acl* mgmtObject; // mgnt owns lifecycle - qpid::management::ManagementAgent* agent; - mutable qpid::sys::Mutex dataLock; + acl::AclValues aclValues; + broker::Broker* broker; + bool transferAcl; + boost::shared_ptr<AclData> data; + qmf::org::apache::qpid::acl::Acl* mgmtObject; // mgnt owns lifecycle + qpid::management::ManagementAgent* agent; + mutable qpid::sys::Mutex dataLock; + boost::shared_ptr<ConnectionCounter> connectionCounter; public: - Acl (AclValues& av, broker::Broker& b); - - void initialize(); - - inline virtual bool doTransferAcl() {return transferAcl;}; - - // create specilied authorise methods for cases that need faster matching as needed. - virtual bool authorise( - const std::string& id, - const Action& action, - const ObjectType& objType, - const std::string& name, - std::map<Property, std::string>* params=0); - - virtual bool authorise( - const std::string& id, - const Action& action, - const ObjectType& objType, - const std::string& ExchangeName, - const std::string& RoutingKey); - - virtual ~Acl(); + Acl (AclValues& av, broker::Broker& b); + + void initialize(); + + inline virtual bool doTransferAcl() { + return transferAcl; + }; + +// create specilied authorise methods for cases that need faster matching as needed. + virtual bool authorise( + const std::string& id, + const Action& action, + const ObjectType& objType, + const std::string& name, + std::map<Property, std::string>* params=0); + + virtual bool authorise( + const std::string& id, + const Action& action, + const ObjectType& objType, + const std::string& ExchangeName, + const std::string& RoutingKey); + + virtual ~Acl(); private: - bool result( - const AclResult& aclreslt, - const std::string& id, - const Action& action, - const ObjectType& objType, - const std::string& name); - bool readAclFile(std::string& errorText); - bool readAclFile(std::string& aclFile, std::string& errorText); - Manageable::status_t lookup (management::Args& args, std::string& text); - Manageable::status_t lookupPublish(management::Args& args, std::string& text); - virtual qpid::management::ManagementObject* GetManagementObject(void) const; - virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + bool result( + const AclResult& aclreslt, + const std::string& id, + const Action& action, + const ObjectType& objType, + const std::string& name); + bool readAclFile(std::string& errorText); + bool readAclFile(std::string& aclFile, std::string& errorText); + Manageable::status_t lookup (management::Args& args, std::string& text); + Manageable::status_t lookupPublish(management::Args& args, std::string& text); + virtual qpid::management::ManagementObject* GetManagementObject(void) const; + virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; - - }} // namespace qpid::acl #endif // QPID_ACL_ACL_H diff --git a/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp b/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp new file mode 100644 index 0000000000..9715c472c1 --- /dev/null +++ b/qpid/cpp/src/qpid/acl/AclConnectionCounter.cpp @@ -0,0 +1,211 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AclConnectionCounter.h" +#include "qpid/broker/Connection.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Mutex.h" +#include <assert.h> +#include <sstream> + +using namespace qpid::sys; + +namespace qpid { +namespace acl { + +// +// This module instantiates a broker::ConnectionObserver and limits client +// connections by counting connections per user name and per client IP address. +// + + +// +// +// +ConnectionCounter::ConnectionCounter(uint32_t nl, uint32_t hl) : + nameLimit(nl), hostLimit(hl) {} + +ConnectionCounter::~ConnectionCounter() {} + + +// +// limitCheckLH +// +// Increment the name's count in map and return a comparison against the limit. +// called with dataLock already taken +// +bool ConnectionCounter::limitCheckLH( + connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) { + + bool result(true); + if (theLimit > 0) { + connectCountsMap_t::iterator eRef = theMap.find(theName); + if (eRef != theMap.end()) { + uint32_t count = (uint32_t)(*eRef).second + 1; + (*eRef).second = count; + result = count <= theLimit; + } else { + theMap[theName] = 1; + } + } + return result; +} + + +// +// releaseLH +// +// Decrement the name's count in map. +// called with dataLock already taken +// +void ConnectionCounter::releaseLH( + connectCountsMap_t& theMap, const std::string& theName, uint32_t theLimit) { + + if (theLimit > 0) { + connectCountsMap_t::iterator eRef = theMap.find(theName); + if (eRef != theMap.end()) { + uint32_t count = (uint32_t) (*eRef).second; + assert (count > 0); + if (1 == count) { + theMap.erase (eRef); + } else { + (*eRef).second = count - 1; + } + } else { + // User had no connections. + QPID_LOG(notice, "ACL ConnectionCounter Connection for '" << theName + << "' not found in connection count pool"); + } + } +} + + +// +// connection - called during Connection's constructor +// +void ConnectionCounter::connection(broker::Connection& connection) { + QPID_LOG(trace, "ACL ConnectionCounter connection IP:" << connection.getMgmtId() + << ", user:" << connection.getUsername()); + + Mutex::ScopedLock locker(dataLock); + + connectProgressMap[connection.getMgmtId()] = C_CREATED; +} + + +// +// opened - called when first AMQP frame is received over Connection +// +void ConnectionCounter::opened(broker::Connection& connection) { + QPID_LOG(trace, "ACL ConnectionCounter Opened IP:" << connection.getMgmtId() + << ", user:" << connection.getUsername()); + + Mutex::ScopedLock locker(dataLock); + + const std::string& userName( connection.getUsername()); + const std::string& hostName(getClientHost(connection.getMgmtId())); + + // Bump state from CREATED to OPENED + (void) limitCheckLH(connectProgressMap, connection.getMgmtId(), C_OPENED); + + bool nameOk = limitCheckLH(connectByNameMap, userName, nameLimit); + bool hostOk = limitCheckLH(connectByHostMap, hostName, hostLimit); + + if (!nameOk) { + // User has too many + QPID_LOG(info, "ACL ConnectionCounter User '" << userName + << "' exceeded maximum allowed connections"); + throw Exception( + QPID_MSG("User '" << userName + << "' exceeded maximum allowed connections")); + } + + if (!hostOk) { + // Host has too many + QPID_LOG(info, "ACL ConnectionCounter Client host '" << hostName + << "' exceeded maximum allowed connections"); + throw Exception( + QPID_MSG("Client host '" << hostName + << "' exceeded maximum allowed connections")); + } +} + + +// +// closed - called during Connection's destructor +// +void ConnectionCounter::closed(broker::Connection& connection) { + QPID_LOG(trace, "ACL ConnectionCounter Closed IP:" << connection.getMgmtId() + << ", user:" << connection.getUsername()); + + Mutex::ScopedLock locker(dataLock); + + connectCountsMap_t::iterator eRef = connectProgressMap.find(connection.getMgmtId()); + if (eRef != connectProgressMap.end()) { + if ((*eRef).second == C_OPENED){ + // Normal case: connection was created and opened. + // Decrement in-use counts + releaseLH(connectByNameMap, + connection.getUsername(), + nameLimit); + + releaseLH(connectByHostMap, + getClientHost(connection.getMgmtId()), + hostLimit); + } else { + // Connection was created but not opened. + // Don't decrement any connection counts. + } + connectProgressMap.erase(eRef); + + } else { + // connection not found in progress map + QPID_LOG(notice, "ACL ConnectionCounter info for '" << connection.getMgmtId() + << "' not found in connection state pool"); + } +} + + +// +// getClientIp - given a connection's mgmtId return the client host part. +// +// TODO: Ideally this would be a method of the connection itself. +// +std::string ConnectionCounter::getClientHost(const std::string mgmtId) +{ + size_t hyphen = mgmtId.find('-'); + if (std::string::npos != hyphen) { + size_t colon = mgmtId.find_last_of(':'); + if (std::string::npos != colon) { + // trailing colon found + return mgmtId.substr(hyphen+1, colon - hyphen - 1); + } else { + // colon not found - use everything after hyphen + return mgmtId.substr(hyphen+1); + } + } + + // no hyphen found - use whole string + assert(false); + return mgmtId; +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/acl/AclConnectionCounter.h b/qpid/cpp/src/qpid/acl/AclConnectionCounter.h new file mode 100644 index 0000000000..b78eea022e --- /dev/null +++ b/qpid/cpp/src/qpid/acl/AclConnectionCounter.h @@ -0,0 +1,79 @@ +#ifndef QPID_ACL_CONNECTIONCOUNTER_H +#define QPID_ACL_CONNECTIONCOUNTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/ConnectionObserver.h" +#include "qpid/sys/Mutex.h" +#include <boost/iterator/iterator_concepts.hpp> + +#include <map> + +namespace qpid { + +namespace broker { +class Connection; +} + +namespace acl { + + /** + * Terminate client connections when a user tries to create 'too many'. + * Terminate hostIp connections when an IP host tries to create 'too many'. + */ +class ConnectionCounter : public broker::ConnectionObserver +{ +private: + typedef std::map<std::string, uint32_t> connectCountsMap_t; + enum CONNECTION_PROGRESS { C_CREATED=1, C_OPENED=2 }; + + uint32_t nameLimit; + uint32_t hostLimit; + qpid::sys::Mutex dataLock; + + connectCountsMap_t connectProgressMap; + connectCountsMap_t connectByNameMap; + connectCountsMap_t connectByHostMap; + + std::string getClientHost(const std::string mgmtId); + + bool limitCheckLH(connectCountsMap_t& theMap, + const std::string& theName, + uint32_t theLimit); + + void releaseLH(connectCountsMap_t& theMap, + const std::string& theName, + uint32_t theLimit); + +public: + ConnectionCounter(uint32_t nl, uint32_t hl); + ~ConnectionCounter(); + + void connection(broker::Connection& connection); + void opened(broker::Connection& connection); + void closed(broker::Connection& connection); + +}; + +}} // namespace qpid::ha + +#endif /*!QPID_ACL_CONNECTIONCOUNTER_H*/ diff --git a/qpid/cpp/src/qpid/acl/AclPlugin.cpp b/qpid/cpp/src/qpid/acl/AclPlugin.cpp index d611797c49..6c18cd2749 100644 --- a/qpid/cpp/src/qpid/acl/AclPlugin.cpp +++ b/qpid/cpp/src/qpid/acl/AclPlugin.cpp @@ -40,7 +40,9 @@ struct AclOptions : public Options { AclOptions(AclValues& v) : Options("ACL Options"), values(v) { addOptions() - ("acl-file", optValue(values.aclFile, "FILE"), "The policy file to load from, loaded from data dir"); + ("acl-file", optValue(values.aclFile, "FILE"), "The policy file to load from, loaded from data dir") + ("acl-max-connect-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user") + ("acl-max-connect-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address"); } }; |