diff options
author | Alan Conway <aconway@apache.org> | 2009-02-12 19:01:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-12 19:01:51 +0000 |
commit | 55946250faf17c32ef8f49c724e622e9eedfbdfa (patch) | |
tree | ec6f972bbb5985eb1803f50584eecce606781631 | |
parent | 64b26d7ac98b821e176f97d96a6580dafac353f6 (diff) | |
download | qpid-python-55946250faf17c32ef8f49c724e622e9eedfbdfa.tar.gz |
Cluster security support:
- Set correct user ID on update connections.
- Allow configuration of user, pass and mechanism used for update connections.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743839 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/cluster.mk | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp | 51 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterSettings.h | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/BrokerFixture.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClusterFixture.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClusterFixture.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 58 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 1 |
17 files changed, 200 insertions, 82 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index d02cad0140..0d34788d2e 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -44,6 +44,7 @@ cluster_la_SOURCES = \ qpid/cluster/ClusterMap.cpp \ qpid/cluster/ClusterMap.h \ qpid/cluster/ClusterPlugin.cpp \ + qpid/cluster/ClusterSettings.h \ qpid/cluster/Connection.cpp \ qpid/cluster/Connection.h \ qpid/cluster/ConnectionCodec.cpp \ diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 0966db8162..ae160fabc7 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -70,7 +70,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true" : "false") )); params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true" : "false"))); if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) - throw NotAllowedException("ACL denied exhange declare request"); + throw NotAllowedException(QPID_MSG("ACL denied exhange declare request from " << getConnection().getUserId())); } //TODO: implement autoDelete @@ -130,7 +130,7 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU AclModule* acl = getBroker().getAcl(); if (acl) { if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) - throw NotAllowedException("ACL denied exhange delete request"); + throw NotAllowedException(QPID_MSG("ACL denied exhange delete request from " << getConnection().getUserId())); } //TODO: implement unused @@ -150,7 +150,7 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam AclModule* acl = getBroker().getAcl(); if (acl) { if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) ) - throw NotAllowedException("ACL denied exhange query request"); + throw NotAllowedException(QPID_MSG("ACL denied exhange query request from " << getConnection().getUserId())); } try { @@ -168,7 +168,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, AclModule* acl = getBroker().getAcl(); if (acl) { if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,routingKey) ) - throw NotAllowedException("ACL denied exhange bind request"); + throw NotAllowedException(QPID_MSG("ACL denied exhange bind request from " << getConnection().getUserId())); } Queue::shared_ptr queue = getQueue(queueName); @@ -200,7 +200,7 @@ void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey)); if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) - throw NotAllowedException("ACL denied exchange unbind request"); + throw NotAllowedException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId())); } Queue::shared_ptr queue = getQueue(queueName); @@ -231,7 +231,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) - throw NotAllowedException("ACL denied exhange bound request"); + throw NotAllowedException(QPID_MSG("ACL denied exhange bound request from " << getConnection().getUserId())); } Exchange::shared_ptr exchange; @@ -297,7 +297,7 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) AclModule* acl = getBroker().getAcl(); if (acl) { if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) ) - throw NotAllowedException("ACL denied queue query request"); + throw NotAllowedException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId())); } Queue::shared_ptr queue = session.getBroker().getQueues().find(name); @@ -331,7 +331,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? "true" : "false"))); params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? "true" : "false"))); if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) - throw NotAllowedException("ACL denied queue create request"); + throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); } Exchange::shared_ptr alternate; @@ -394,7 +394,7 @@ void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ if (acl) { if (!acl->authorise(getConnection().getUserId(),acl::ACT_PURGE,acl::OBJ_QUEUE,queue,NULL) ) - throw NotAllowedException("ACL denied queue purge request"); + throw NotAllowedException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId())); } getQueue(queue)->purge(); } @@ -405,7 +405,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse if (acl) { if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) ) - throw NotAllowedException("ACL denied queue delete request"); + throw NotAllowedException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId())); } Queue::shared_ptr q = getQueue(queue); @@ -474,7 +474,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, { // add flags as needed if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) ) - throw NotAllowedException("ACL denied Queue subscribe request"); + throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId())); } Queue::shared_ptr queue = getQueue(queueName); diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index 3562df4a74..cc62d724cb 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -54,6 +54,15 @@ void Connection::open( const std::string& vhost, uint16_t maxFrameSize) { + ConnectionSettings settings; + settings.username = uid; + settings.password = pwd; + settings.virtualhost = vhost; + settings.maxFrameSize = maxFrameSize; + open(url, settings); +} + +void Connection::open(const Url& url, const ConnectionSettings& settings) { if (url.empty()) throw Exception(QPID_MSG("Attempt to open URL with no addresses.")); Url::const_iterator i = url.begin(); @@ -62,14 +71,10 @@ void Connection::open( i++; if (tcp) { try { - ConnectionSettings settings; - settings.host = tcp->host; - settings.port = tcp->port; - settings.username = uid; - settings.password = pwd; - settings.virtualhost = vhost; - settings.maxFrameSize = maxFrameSize; - open(settings); + ConnectionSettings cs(settings); + cs.host = tcp->host; + cs.port = tcp->port; + open(cs); break; } catch (const Exception& /*e*/) { diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index ed984ccb42..03631ef56f 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -107,6 +107,18 @@ class Connection const std::string& virtualhost = "/", uint16_t maxFrameSize=65535); /** + * Opens a connection to a broker using a URL. + * If the URL contains multiple addresses, try each in turn + * till connection is successful. + * + * @url address of the broker to connect to. + * + * @param settings used for any settings not provided by the URL. + * Settings provided by the url (e.g. host, port) are ignored. + */ + void open(const Url& url, const ConnectionSettings& settings); + + /** * Opens a connection to a broker. * * @param the settings to use (host, port etc). @see ConnectionSettings. diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index d9a5125760..f845492dbc 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -17,6 +17,7 @@ */ #include "Cluster.h" +#include "ClusterSettings.h" #include "Connection.h" #include "UpdateClient.h" #include "FailoverExchange.h" @@ -82,16 +83,17 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; -Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) : +Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : + settings(set), broker(b), mgmtObject(0), poller(b.getPoller()), cpg(*this), - name(name_), - myUrl(url_), + name(settings.name), + myUrl(settings.url.empty() ? Url() : Url(settings.url)), myId(cpg.self()), - readMax(readMax_), - writeEstimate(writeEstimate_), + readMax(settings.readMax), + writeEstimate(settings.writeEstimate), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -121,7 +123,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b } failoverExchange.reset(new FailoverExchange(this)); - if (quorum_) quorum.init(); + if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. while (!initialized) @@ -425,10 +427,15 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + client::ConnectionSettings cs; + cs.username = settings.username; + cs.password = settings.password; + cs.mechanism = settings.mechanism; updateThread = Thread( new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1))); + boost::bind(&Cluster::updateOutError, this, _1), + cs)); } // Called in update thread. diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 4d994943f7..8c5eb06ff7 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -19,6 +19,7 @@ * */ +#include "ClusterSettings.h" #include "ClusterMap.h" #include "ConnectionMap.h" #include "Cpg.h" @@ -67,9 +68,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef std::vector<ConnectionPtr> Connections; /** Construct the cluster in plugin earlyInitialize */ - Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, - size_t readMax, size_t writeEstimate); - + Cluster(const ClusterSettings&, broker::Broker&); virtual ~Cluster(); /** Join the cluster in plugin initialize. Requires transport @@ -178,6 +177,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void setClusterId(const framing::Uuid&); // Immutable members set on construction, never changed. + ClusterSettings settings; broker::Broker& broker; qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle boost::shared_ptr<sys::Poller> poller; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index d54d8389e0..266e7f00b0 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -18,6 +18,7 @@ #include "Connection.h" #include "ConnectionCodec.h" +#include "ClusterSettings.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" @@ -35,6 +36,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionState.h" +#include "qpid/client/ConnectionSettings.h" #include <boost/utility/in_place_factory.hpp> #include <boost/scoped_ptr.hpp> @@ -48,40 +50,31 @@ using management::IdAllocator; using management::ManagementAgent; using management::ManagementBroker; -struct ClusterValues { - string name; - string url; - bool quorum; - size_t readMax, writeEstimate; - - ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {} - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } -}; -/** Note separating options from values to work around boost version differences. +/** Note separating options from settings to work around boost version differences. * Old boost takes a reference to options objects, but new boost makes a copy. * New boost allows a shared_ptr but that's not compatible with old boost. */ struct ClusterOptions : public Options { - ClusterValues& values; + ClusterSettings& settings; - ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) { + ClusterOptions(ClusterSettings& v) : Options("Cluster Options"), settings(v) { addOptions() - ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(values.url,"URL"), + ("cluster-name", optValue(settings.name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(settings.url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n") + ("cluster-username", optValue(settings.username, ""), "Username for connections between brokers") + ("cluster-password", optValue(settings.password, ""), "Password for connections between brokers") + ("cluster-mechanism", optValue(settings.mechanism, ""), "Authentication mechanism for connections between brokers") #if HAVE_LIBCMAN - ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.") + ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif - ("cluster-read-max", optValue(values.readMax,"N"), + ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.") - ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"), - "Experimental: initial estimate for connection write rate per multicast cycle"); + ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"), + "Experimental: initial estimate for connection write rate per multicast cycle") + ; } }; @@ -127,26 +120,20 @@ struct UpdateClientIdAllocator : management::IdAllocator struct ClusterPlugin : public Plugin { - ClusterValues values; + ClusterSettings settings; ClusterOptions options; Cluster* cluster; boost::scoped_ptr<ConnectionCodec::Factory> factory; - ClusterPlugin() : options(values), cluster(0) {} + ClusterPlugin() : options(settings), cluster(0) {} Options* getOptions() { return &options; } void earlyInitialize(Plugin::Target& target) { - if (values.name.empty()) return; // Only if --cluster-name option was specified. + if (settings.name.empty()) return; // Only if --cluster-name option was specified. Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; - cluster = new Cluster( - values.name, - values.url.empty() ? Url() : Url(values.url), - *broker, - values.quorum, - values.readMax, values.writeEstimate*1024 - ); + cluster = new Cluster(settings, *broker); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h new file mode 100644 index 0000000000..a8f33be75e --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h @@ -0,0 +1,48 @@ +#ifndef QPID_CLUSTER_CLUSTERSETTINGS_H +#define QPID_CLUSTER_CLUSTERSETTINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Url.h> +#include <string> + +namespace qpid { +namespace cluster { + +struct ClusterSettings { + std::string name; + std::string url; + bool quorum; + size_t readMax, writeEstimate; + std::string username, password, mechanism; + + ClusterSettings() : quorum(false), readMax(10), writeEstimate(64), username("guest"), password("guest") {} + + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CLUSTERSETTINGS_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 4a13d24499..1a3f7c4ef7 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -253,10 +253,11 @@ void Connection::sessionState( QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) { ConnectionId shadow = ConnectionId(memberId, connectionId); QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); self = shadow; + connection.setUserId(username); } void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 1637b8609c..98b47e1bc0 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -117,7 +117,7 @@ class Connection : const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - void shadowReady(uint64_t memberId, uint64_t connectionId); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username); void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index e50c936b50..18746ccb7e 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -89,21 +89,22 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, const Cluster::Connections& cons, const boost::function<void()>& ok, - const boost::function<void(const std::exception&)>& fail) + const boost::function<void(const std::exception&)>& fail, + const client::ConnectionSettings& cs +) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), - done(ok), failed(fail) + done(ok), failed(fail) { - connection.open(url); + connection.open(url, cs); session = connection.newSession("update_shared"); } UpdateClient::~UpdateClient() {} // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -static const char UPDATE_CHARS[] = "qpid.qpid-update"; -const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS)); +const std::string UpdateClient::UPDATE("qpid.qpid-update"); void UpdateClient::update() { QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); @@ -232,7 +233,9 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), - reinterpret_cast<uint64_t>(updateConnection->getId().getPointer())); + reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()), + updateConnection->getBrokerConnection().getUserId() + ); shadowConnection.close(); QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 0819eb4cdb..23f647c820 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -66,7 +66,9 @@ class UpdateClient : public sys::Runnable { broker::Broker& donor, const ClusterMap& map, uint64_t sequence, const std::vector<boost::intrusive_ptr<Connection> >& , const boost::function<void()>& done, - const boost::function<void(const std::exception&)>& fail); + const boost::function<void(const std::exception&)>& fail, + const client::ConnectionSettings& + ); ~UpdateClient(); void update(); diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h index 205b4d90ef..f55560739d 100644 --- a/qpid/cpp/src/tests/BrokerFixture.h +++ b/qpid/cpp/src/tests/BrokerFixture.h @@ -88,6 +88,7 @@ struct BrokerFixture : private boost::noncopyable { /** Connection that opens in its constructor */ struct LocalConnection : public qpid::client::Connection { LocalConnection(uint16_t port) { open("localhost", port); } + LocalConnection(const qpid::client::ConnectionSettings& s) { open(s); } }; /** A local client connection via a socket proxy. */ @@ -96,6 +97,11 @@ struct ProxyConnection : public qpid::client::Connection { ProxyConnection(int brokerPort) : proxy(brokerPort) { open("localhost", proxy.getPort()); } + ProxyConnection(const qpid::client::ConnectionSettings& s) : proxy(s.port) { + qpid::client::ConnectionSettings proxySettings(s); + proxySettings.port = proxy.getPort(); + open(proxySettings); + } ~ProxyConnection() { close(); } }; @@ -110,6 +116,8 @@ struct ClientT { qpid::client::LocalQueue lq; ClientT(uint16_t port, const std::string& name=std::string()) : connection(port), session(connection.newSession(name)), subs(session) {} + ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string()) + : connection(settings), session(connection.newSession(name)), subs(session) {} ~ClientT() { connection.close(); } }; diff --git a/qpid/cpp/src/tests/ClusterFixture.cpp b/qpid/cpp/src/tests/ClusterFixture.cpp index 4a373e3811..3a0ea74098 100644 --- a/qpid/cpp/src/tests/ClusterFixture.cpp +++ b/qpid/cpp/src/tests/ClusterFixture.cpp @@ -67,13 +67,14 @@ ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_) add(n); } +const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS = + list_of<string>("--auth=no")("--no-data-dir"); + ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) { Args args = list_of<string>("qpidd " __FILE__) ("--no-module-dir") ("--load-module=../.libs/cluster.so") ("--cluster-name")(name) - ("--auth=no") - ("--no-data-dir") ("--log-prefix")(prefix); args.insert(args.end(), userArgs.begin(), userArgs.end()); return args; diff --git a/qpid/cpp/src/tests/ClusterFixture.h b/qpid/cpp/src/tests/ClusterFixture.h index d1acfaa9cb..84fb9f2202 100644 --- a/qpid/cpp/src/tests/ClusterFixture.h +++ b/qpid/cpp/src/tests/ClusterFixture.h @@ -72,7 +72,7 @@ class ClusterFixture : public vector<uint16_t> { /** @param localIndex can be -1 meaning don't automatically start a local broker. * A local broker can be started with addLocal(). */ - ClusterFixture(size_t n, int localIndex=0, const Args& args=Args()); + ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); // Add a broker. void setup(); @@ -86,6 +86,8 @@ class ClusterFixture : public vector<uint16_t> { void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT); private: + static const Args DEFAULT_ARGS; + void addLocal(); // Add a local broker. Args makeArgs(const std::string& prefix); string name; diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 14b7659b65..c880f30e6b 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -23,6 +23,7 @@ #include "ClusterFixture.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/client/Session.h" #include "qpid/client/FailoverListener.h" @@ -41,6 +42,7 @@ #include <string> #include <iostream> +#include <fstream> #include <iterator> #include <vector> #include <set> @@ -86,11 +88,11 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { template <class C> set<uint16_t> makeSet(const C& c) { set<uint16_t> s; - std::copy(c.begin(), c.end(), std::inserter(s, s.begin())); + copy(c.begin(), c.end(), inserter(s, s.begin())); return s; } -template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) { +template <class T> set<uint16_t> knownBrokerPorts(T& source, int n=-1) { vector<Url> urls = source.getKnownBrokers(); if (n >= 0 && unsigned(n) != urls.size()) { BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); @@ -129,13 +131,13 @@ int64_t getMsgSequence(const Message& m) { return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); } -Message ttlMessage(const std::string& data, const std::string& key, uint64_t ttl) { +Message ttlMessage(const string& data, const string& key, uint64_t ttl) { Message m(data, key); m.getDeliveryProperties().setTtl(ttl); return m; } -vector<std::string> browse(Client& c, const std::string& q, int n) { +vector<string> browse(Client& c, const string& q, int n) { SubscriptionSettings browseSettings( FlowControl::unlimited(), ACCEPT_MODE_NONE, @@ -144,7 +146,7 @@ vector<std::string> browse(Client& c, const std::string& q, int n) { ); LocalQueue lq; c.subs.subscribe(lq, q, browseSettings); - vector<std::string> result; + vector<string> result; for (int i = 0; i < n; ++i) { result.push_back(lq.get(TIMEOUT).getData()); } @@ -152,6 +154,44 @@ vector<std::string> browse(Client& c, const std::string& q, int n) { return result; } + +// FIXME aconway 2009-02-12: need to figure out how to test this properly. +// Current problems: +// - all brokers share the same data-dir (set ACL without data dir?) +// - updater's user name not making it through to updatee for ACL checks. +// +// QPID_AUTO_TEST_CASE(testAcl) { +// ofstream policyFile("cluster_test.acl"); +// // FIXME aconway 2009-02-12: guest -> qpidd? +// policyFile << "acl allow guest@QPID all all" << endl +// << "acl allow foo@QPID create queue name=foo" << endl +// << "acl allow bar@QPID create queue name=bar" << endl +// << "acl deny all create queue" << endl +// << "acl allow all all" << endl; +// policyFile.close(); +// ClusterFixture cluster(2,-1, list_of<string> +// ("--data-dir=.") ("--auth=no") +// ("--acl-file=cluster_test.acl") +// ("--cluster-mechanism=PLAIN") +// ("--load-module=../.libs/acl.so")); +// Client c0(cluster[0], "c0"); +// Client c1(cluster[1], "c1"); + +// ConnectionSettings settings; +// settings.port = cluster[0]; +// settings.username = "foo"; +// Client foo(settings, "foo"); + +// foo.session.queueDeclare("foo"); +// BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); +// BOOST_CHECK_EQUAL(c1.session.queueQuery("foo").getQueue(), "foo"); + +// BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), int); +// BOOST_CHECK_EQUAL(c0.session.queueQuery("bar").getQueue(), ""); +// BOOST_CHECK_EQUAL(c1.session.queueQuery("bar").getQueue(), ""); +// } + + QPID_AUTO_TEST_CASE(testMessageTimeToLive) { // Note: this doesn't actually test for cluster race conditions around TTL, // it just verifies that basic TTL functionality works. @@ -162,10 +202,10 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); c0.session.messageTransfer(arg::content=Message("b", "q")); - BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<std::string>("a")("b")); + BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<string>("a")("b")); sys::usleep(300*1000); - BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<std::string>("b")); - BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<std::string>("b")); + BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b")); + BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b")); } QPID_AUTO_TEST_CASE(testSequenceOptions) { @@ -349,7 +389,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // Send first 2 frames of message. MessageTransferBody transfer( - ProtocolVersion(), std::string(), // default exchange. + ProtocolVersion(), string(), // default exchange. framing::message::ACCEPT_MODE_NONE, framing::message::ACQUIRE_MODE_PRE_ACQUIRED); sender.send(transfer, true, false, true, true); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 6dc0c722dd..2cf4e915b6 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -124,6 +124,7 @@ <control name="shadow-ready" code="0x20" label="End of shadow connection update."> <field name="member-id" type="uint64"/> <field name="connection-id" type="uint64"/> + <field name="user-name" type="str8"/> </control> <!-- Complete a cluster state update. --> |