summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-12 19:01:51 +0000
committerAlan Conway <aconway@apache.org>2009-02-12 19:01:51 +0000
commit55946250faf17c32ef8f49c724e622e9eedfbdfa (patch)
treeec6f972bbb5985eb1803f50584eecce606781631
parent64b26d7ac98b821e176f97d96a6580dafac353f6 (diff)
downloadqpid-python-55946250faf17c32ef8f49c724e622e9eedfbdfa.tar.gz
Cluster security support:
- Set correct user ID on update connections. - Allow configuration of user, pass and mechanism used for update connections. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743839 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp22
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp21
-rw-r--r--qpid/cpp/src/qpid/client/Connection.h12
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp21
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp51
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterSettings.h48
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp15
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h4
-rw-r--r--qpid/cpp/src/tests/BrokerFixture.h8
-rw-r--r--qpid/cpp/src/tests/ClusterFixture.cpp5
-rw-r--r--qpid/cpp/src/tests/ClusterFixture.h4
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp58
-rw-r--r--qpid/cpp/xml/cluster.xml1
17 files changed, 200 insertions, 82 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index d02cad0140..0d34788d2e 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -44,6 +44,7 @@ cluster_la_SOURCES = \
qpid/cluster/ClusterMap.cpp \
qpid/cluster/ClusterMap.h \
qpid/cluster/ClusterPlugin.cpp \
+ qpid/cluster/ClusterSettings.h \
qpid/cluster/Connection.cpp \
qpid/cluster/Connection.h \
qpid/cluster/ConnectionCodec.cpp \
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 0966db8162..ae160fabc7 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -70,7 +70,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? "true" : "false") ));
params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? "true" : "false")));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
- throw NotAllowedException("ACL denied exhange declare request");
+ throw NotAllowedException(QPID_MSG("ACL denied exhange declare request from " << getConnection().getUserId()));
}
//TODO: implement autoDelete
@@ -130,7 +130,7 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU
AclModule* acl = getBroker().getAcl();
if (acl) {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
- throw NotAllowedException("ACL denied exhange delete request");
+ throw NotAllowedException(QPID_MSG("ACL denied exhange delete request from " << getConnection().getUserId()));
}
//TODO: implement unused
@@ -150,7 +150,7 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam
AclModule* acl = getBroker().getAcl();
if (acl) {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) )
- throw NotAllowedException("ACL denied exhange query request");
+ throw NotAllowedException(QPID_MSG("ACL denied exhange query request from " << getConnection().getUserId()));
}
try {
@@ -168,7 +168,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
AclModule* acl = getBroker().getAcl();
if (acl) {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,routingKey) )
- throw NotAllowedException("ACL denied exhange bind request");
+ throw NotAllowedException(QPID_MSG("ACL denied exhange bind request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
@@ -200,7 +200,7 @@ void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
- throw NotAllowedException("ACL denied exchange unbind request");
+ throw NotAllowedException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
@@ -231,7 +231,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchangeName,&params) )
- throw NotAllowedException("ACL denied exhange bound request");
+ throw NotAllowedException(QPID_MSG("ACL denied exhange bound request from " << getConnection().getUserId()));
}
Exchange::shared_ptr exchange;
@@ -297,7 +297,7 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
AclModule* acl = getBroker().getAcl();
if (acl) {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) )
- throw NotAllowedException("ACL denied queue query request");
+ throw NotAllowedException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
@@ -331,7 +331,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? "true" : "false")));
params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? "true" : "false")));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
- throw NotAllowedException("ACL denied queue create request");
+ throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
}
Exchange::shared_ptr alternate;
@@ -394,7 +394,7 @@ void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
if (acl)
{
if (!acl->authorise(getConnection().getUserId(),acl::ACT_PURGE,acl::OBJ_QUEUE,queue,NULL) )
- throw NotAllowedException("ACL denied queue purge request");
+ throw NotAllowedException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
}
getQueue(queue)->purge();
}
@@ -405,7 +405,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
if (acl)
{
if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
- throw NotAllowedException("ACL denied queue delete request");
+ throw NotAllowedException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
}
Queue::shared_ptr q = getQueue(queue);
@@ -474,7 +474,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
{
// add flags as needed
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
- throw NotAllowedException("ACL denied Queue subscribe request");
+ throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index 3562df4a74..cc62d724cb 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -54,6 +54,15 @@ void Connection::open(
const std::string& vhost,
uint16_t maxFrameSize)
{
+ ConnectionSettings settings;
+ settings.username = uid;
+ settings.password = pwd;
+ settings.virtualhost = vhost;
+ settings.maxFrameSize = maxFrameSize;
+ open(url, settings);
+}
+
+void Connection::open(const Url& url, const ConnectionSettings& settings) {
if (url.empty())
throw Exception(QPID_MSG("Attempt to open URL with no addresses."));
Url::const_iterator i = url.begin();
@@ -62,14 +71,10 @@ void Connection::open(
i++;
if (tcp) {
try {
- ConnectionSettings settings;
- settings.host = tcp->host;
- settings.port = tcp->port;
- settings.username = uid;
- settings.password = pwd;
- settings.virtualhost = vhost;
- settings.maxFrameSize = maxFrameSize;
- open(settings);
+ ConnectionSettings cs(settings);
+ cs.host = tcp->host;
+ cs.port = tcp->port;
+ open(cs);
break;
}
catch (const Exception& /*e*/) {
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index ed984ccb42..03631ef56f 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -107,6 +107,18 @@ class Connection
const std::string& virtualhost = "/", uint16_t maxFrameSize=65535);
/**
+ * Opens a connection to a broker using a URL.
+ * If the URL contains multiple addresses, try each in turn
+ * till connection is successful.
+ *
+ * @url address of the broker to connect to.
+ *
+ * @param settings used for any settings not provided by the URL.
+ * Settings provided by the url (e.g. host, port) are ignored.
+ */
+ void open(const Url& url, const ConnectionSettings& settings);
+
+ /**
* Opens a connection to a broker.
*
* @param the settings to use (host, port etc). @see ConnectionSettings.
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index d9a5125760..f845492dbc 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,6 +17,7 @@
*/
#include "Cluster.h"
+#include "ClusterSettings.h"
#include "Connection.h"
#include "UpdateClient.h"
#include "FailoverExchange.h"
@@ -82,16 +83,17 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) :
+Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
+ settings(set),
broker(b),
mgmtObject(0),
poller(b.getPoller()),
cpg(*this),
- name(name_),
- myUrl(url_),
+ name(settings.name),
+ myUrl(settings.url.empty() ? Url() : Url(settings.url)),
myId(cpg.self()),
- readMax(readMax_),
- writeEstimate(writeEstimate_),
+ readMax(settings.readMax),
+ writeEstimate(settings.writeEstimate),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -121,7 +123,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
}
failoverExchange.reset(new FailoverExchange(this));
- if (quorum_) quorum.init();
+ if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
while (!initialized)
@@ -425,10 +427,15 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
deliverFrameQueue.stop();
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+ client::ConnectionSettings cs;
+ cs.username = settings.username;
+ cs.password = settings.password;
+ cs.mechanism = settings.mechanism;
updateThread = Thread(
new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
- boost::bind(&Cluster::updateOutError, this, _1)));
+ boost::bind(&Cluster::updateOutError, this, _1),
+ cs));
}
// Called in update thread.
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 4d994943f7..8c5eb06ff7 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -19,6 +19,7 @@
*
*/
+#include "ClusterSettings.h"
#include "ClusterMap.h"
#include "ConnectionMap.h"
#include "Cpg.h"
@@ -67,9 +68,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
typedef std::vector<ConnectionPtr> Connections;
/** Construct the cluster in plugin earlyInitialize */
- Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
- size_t readMax, size_t writeEstimate);
-
+ Cluster(const ClusterSettings&, broker::Broker&);
virtual ~Cluster();
/** Join the cluster in plugin initialize. Requires transport
@@ -178,6 +177,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void setClusterId(const framing::Uuid&);
// Immutable members set on construction, never changed.
+ ClusterSettings settings;
broker::Broker& broker;
qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
boost::shared_ptr<sys::Poller> poller;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index d54d8389e0..266e7f00b0 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -18,6 +18,7 @@
#include "Connection.h"
#include "ConnectionCodec.h"
+#include "ClusterSettings.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ConnectionCodec.h"
@@ -35,6 +36,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/client/ConnectionSettings.h"
#include <boost/utility/in_place_factory.hpp>
#include <boost/scoped_ptr.hpp>
@@ -48,40 +50,31 @@ using management::IdAllocator;
using management::ManagementAgent;
using management::ManagementBroker;
-struct ClusterValues {
- string name;
- string url;
- bool quorum;
- size_t readMax, writeEstimate;
-
- ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {}
-
- Url getUrl(uint16_t port) const {
- if (url.empty()) return Url::getIpAddressesUrl(port);
- return Url(url);
- }
-};
-/** Note separating options from values to work around boost version differences.
+/** Note separating options from settings to work around boost version differences.
* Old boost takes a reference to options objects, but new boost makes a copy.
* New boost allows a shared_ptr but that's not compatible with old boost.
*/
struct ClusterOptions : public Options {
- ClusterValues& values;
+ ClusterSettings& settings;
- ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) {
+ ClusterOptions(ClusterSettings& v) : Options("Cluster Options"), settings(v) {
addOptions()
- ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join")
- ("cluster-url", optValue(values.url,"URL"),
+ ("cluster-name", optValue(settings.name, "NAME"), "Name of cluster to join")
+ ("cluster-url", optValue(settings.url,"URL"),
"URL of this broker, advertized to the cluster.\n"
"Defaults to a URL listing all the local IP addresses\n")
+ ("cluster-username", optValue(settings.username, ""), "Username for connections between brokers")
+ ("cluster-password", optValue(settings.password, ""), "Password for connections between brokers")
+ ("cluster-mechanism", optValue(settings.mechanism, ""), "Authentication mechanism for connections between brokers")
#if HAVE_LIBCMAN
- ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
+ ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
- ("cluster-read-max", optValue(values.readMax,"N"),
+ ("cluster-read-max", optValue(settings.readMax,"N"),
"Experimental: Limit per-client-connection queue of read buffers. 0=no limit.")
- ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
- "Experimental: initial estimate for connection write rate per multicast cycle");
+ ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"),
+ "Experimental: initial estimate for connection write rate per multicast cycle")
+ ;
}
};
@@ -127,26 +120,20 @@ struct UpdateClientIdAllocator : management::IdAllocator
struct ClusterPlugin : public Plugin {
- ClusterValues values;
+ ClusterSettings settings;
ClusterOptions options;
Cluster* cluster;
boost::scoped_ptr<ConnectionCodec::Factory> factory;
- ClusterPlugin() : options(values), cluster(0) {}
+ ClusterPlugin() : options(settings), cluster(0) {}
Options* getOptions() { return &options; }
void earlyInitialize(Plugin::Target& target) {
- if (values.name.empty()) return; // Only if --cluster-name option was specified.
+ if (settings.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- cluster = new Cluster(
- values.name,
- values.url.empty() ? Url() : Url(values.url),
- *broker,
- values.quorum,
- values.readMax, values.writeEstimate*1024
- );
+ cluster = new Cluster(settings, *broker);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h
new file mode 100644
index 0000000000..a8f33be75e
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h
@@ -0,0 +1,48 @@
+#ifndef QPID_CLUSTER_CLUSTERSETTINGS_H
+#define QPID_CLUSTER_CLUSTERSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Url.h>
+#include <string>
+
+namespace qpid {
+namespace cluster {
+
+struct ClusterSettings {
+ std::string name;
+ std::string url;
+ bool quorum;
+ size_t readMax, writeEstimate;
+ std::string username, password, mechanism;
+
+ ClusterSettings() : quorum(false), readMax(10), writeEstimate(64), username("guest"), password("guest") {}
+
+ Url getUrl(uint16_t port) const {
+ if (url.empty()) return Url::getIpAddressesUrl(port);
+ return Url(url);
+ }
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CLUSTERSETTINGS_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 4a13d24499..1a3f7c4ef7 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -253,10 +253,11 @@ void Connection::sessionState(
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) {
ConnectionId shadow = ConnectionId(memberId, connectionId);
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
self = shadow;
+ connection.setUserId(username);
}
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 1637b8609c..98b47e1bc0 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -117,7 +117,7 @@ class Connection :
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- void shadowReady(uint64_t memberId, uint64_t connectionId);
+ void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index e50c936b50..18746ccb7e 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -89,21 +89,22 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
const Cluster::Connections& cons,
const boost::function<void()>& ok,
- const boost::function<void(const std::exception&)>& fail)
+ const boost::function<void(const std::exception&)>& fail,
+ const client::ConnectionSettings& cs
+)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
- done(ok), failed(fail)
+ done(ok), failed(fail)
{
- connection.open(url);
+ connection.open(url, cs);
session = connection.newSession("update_shared");
}
UpdateClient::~UpdateClient() {}
// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-static const char UPDATE_CHARS[] = "qpid.qpid-update";
-const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS));
+const std::string UpdateClient::UPDATE("qpid.qpid-update");
void UpdateClient::update() {
QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
@@ -232,7 +233,9 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()));
+ reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()),
+ updateConnection->getBrokerConnection().getUserId()
+ );
shadowConnection.close();
QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 0819eb4cdb..23f647c820 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -66,7 +66,9 @@ class UpdateClient : public sys::Runnable {
broker::Broker& donor, const ClusterMap& map, uint64_t sequence,
const std::vector<boost::intrusive_ptr<Connection> >& ,
const boost::function<void()>& done,
- const boost::function<void(const std::exception&)>& fail);
+ const boost::function<void(const std::exception&)>& fail,
+ const client::ConnectionSettings&
+ );
~UpdateClient();
void update();
diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h
index 205b4d90ef..f55560739d 100644
--- a/qpid/cpp/src/tests/BrokerFixture.h
+++ b/qpid/cpp/src/tests/BrokerFixture.h
@@ -88,6 +88,7 @@ struct BrokerFixture : private boost::noncopyable {
/** Connection that opens in its constructor */
struct LocalConnection : public qpid::client::Connection {
LocalConnection(uint16_t port) { open("localhost", port); }
+ LocalConnection(const qpid::client::ConnectionSettings& s) { open(s); }
};
/** A local client connection via a socket proxy. */
@@ -96,6 +97,11 @@ struct ProxyConnection : public qpid::client::Connection {
ProxyConnection(int brokerPort) : proxy(brokerPort) {
open("localhost", proxy.getPort());
}
+ ProxyConnection(const qpid::client::ConnectionSettings& s) : proxy(s.port) {
+ qpid::client::ConnectionSettings proxySettings(s);
+ proxySettings.port = proxy.getPort();
+ open(proxySettings);
+ }
~ProxyConnection() { close(); }
};
@@ -110,6 +116,8 @@ struct ClientT {
qpid::client::LocalQueue lq;
ClientT(uint16_t port, const std::string& name=std::string())
: connection(port), session(connection.newSession(name)), subs(session) {}
+ ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string())
+ : connection(settings), session(connection.newSession(name)), subs(session) {}
~ClientT() { connection.close(); }
};
diff --git a/qpid/cpp/src/tests/ClusterFixture.cpp b/qpid/cpp/src/tests/ClusterFixture.cpp
index 4a373e3811..3a0ea74098 100644
--- a/qpid/cpp/src/tests/ClusterFixture.cpp
+++ b/qpid/cpp/src/tests/ClusterFixture.cpp
@@ -67,13 +67,14 @@ ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_)
add(n);
}
+const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
+ list_of<string>("--auth=no")("--no-data-dir");
+
ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) {
Args args = list_of<string>("qpidd " __FILE__)
("--no-module-dir")
("--load-module=../.libs/cluster.so")
("--cluster-name")(name)
- ("--auth=no")
- ("--no-data-dir")
("--log-prefix")(prefix);
args.insert(args.end(), userArgs.begin(), userArgs.end());
return args;
diff --git a/qpid/cpp/src/tests/ClusterFixture.h b/qpid/cpp/src/tests/ClusterFixture.h
index d1acfaa9cb..84fb9f2202 100644
--- a/qpid/cpp/src/tests/ClusterFixture.h
+++ b/qpid/cpp/src/tests/ClusterFixture.h
@@ -72,7 +72,7 @@ class ClusterFixture : public vector<uint16_t> {
/** @param localIndex can be -1 meaning don't automatically start a local broker.
* A local broker can be started with addLocal().
*/
- ClusterFixture(size_t n, int localIndex=0, const Args& args=Args());
+ ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS);
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add(); // Add a broker.
void setup();
@@ -86,6 +86,8 @@ class ClusterFixture : public vector<uint16_t> {
void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT);
private:
+ static const Args DEFAULT_ARGS;
+
void addLocal(); // Add a local broker.
Args makeArgs(const std::string& prefix);
string name;
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 14b7659b65..c880f30e6b 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -23,6 +23,7 @@
#include "ClusterFixture.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/Session.h"
#include "qpid/client/FailoverListener.h"
@@ -41,6 +42,7 @@
#include <string>
#include <iostream>
+#include <fstream>
#include <iterator>
#include <vector>
#include <set>
@@ -86,11 +88,11 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
template <class C> set<uint16_t> makeSet(const C& c) {
set<uint16_t> s;
- std::copy(c.begin(), c.end(), std::inserter(s, s.begin()));
+ copy(c.begin(), c.end(), inserter(s, s.begin()));
return s;
}
-template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
+template <class T> set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
vector<Url> urls = source.getKnownBrokers();
if (n >= 0 && unsigned(n) != urls.size()) {
BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
@@ -129,13 +131,13 @@ int64_t getMsgSequence(const Message& m) {
return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
}
-Message ttlMessage(const std::string& data, const std::string& key, uint64_t ttl) {
+Message ttlMessage(const string& data, const string& key, uint64_t ttl) {
Message m(data, key);
m.getDeliveryProperties().setTtl(ttl);
return m;
}
-vector<std::string> browse(Client& c, const std::string& q, int n) {
+vector<string> browse(Client& c, const string& q, int n) {
SubscriptionSettings browseSettings(
FlowControl::unlimited(),
ACCEPT_MODE_NONE,
@@ -144,7 +146,7 @@ vector<std::string> browse(Client& c, const std::string& q, int n) {
);
LocalQueue lq;
c.subs.subscribe(lq, q, browseSettings);
- vector<std::string> result;
+ vector<string> result;
for (int i = 0; i < n; ++i) {
result.push_back(lq.get(TIMEOUT).getData());
}
@@ -152,6 +154,44 @@ vector<std::string> browse(Client& c, const std::string& q, int n) {
return result;
}
+
+// FIXME aconway 2009-02-12: need to figure out how to test this properly.
+// Current problems:
+// - all brokers share the same data-dir (set ACL without data dir?)
+// - updater's user name not making it through to updatee for ACL checks.
+//
+// QPID_AUTO_TEST_CASE(testAcl) {
+// ofstream policyFile("cluster_test.acl");
+// // FIXME aconway 2009-02-12: guest -> qpidd?
+// policyFile << "acl allow guest@QPID all all" << endl
+// << "acl allow foo@QPID create queue name=foo" << endl
+// << "acl allow bar@QPID create queue name=bar" << endl
+// << "acl deny all create queue" << endl
+// << "acl allow all all" << endl;
+// policyFile.close();
+// ClusterFixture cluster(2,-1, list_of<string>
+// ("--data-dir=.") ("--auth=no")
+// ("--acl-file=cluster_test.acl")
+// ("--cluster-mechanism=PLAIN")
+// ("--load-module=../.libs/acl.so"));
+// Client c0(cluster[0], "c0");
+// Client c1(cluster[1], "c1");
+
+// ConnectionSettings settings;
+// settings.port = cluster[0];
+// settings.username = "foo";
+// Client foo(settings, "foo");
+
+// foo.session.queueDeclare("foo");
+// BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo");
+// BOOST_CHECK_EQUAL(c1.session.queueQuery("foo").getQueue(), "foo");
+
+// BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), int);
+// BOOST_CHECK_EQUAL(c0.session.queueQuery("bar").getQueue(), "");
+// BOOST_CHECK_EQUAL(c1.session.queueQuery("bar").getQueue(), "");
+// }
+
+
QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
// Note: this doesn't actually test for cluster race conditions around TTL,
// it just verifies that basic TTL functionality works.
@@ -162,10 +202,10 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
c0.session.messageTransfer(arg::content=Message("b", "q"));
- BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<std::string>("a")("b"));
+ BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<string>("a")("b"));
sys::usleep(300*1000);
- BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<std::string>("b"));
- BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<std::string>("b"));
+ BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b"));
+ BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b"));
}
QPID_AUTO_TEST_CASE(testSequenceOptions) {
@@ -349,7 +389,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
// Send first 2 frames of message.
MessageTransferBody transfer(
- ProtocolVersion(), std::string(), // default exchange.
+ ProtocolVersion(), string(), // default exchange.
framing::message::ACCEPT_MODE_NONE,
framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
sender.send(transfer, true, false, true, true);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 6dc0c722dd..2cf4e915b6 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -124,6 +124,7 @@
<control name="shadow-ready" code="0x20" label="End of shadow connection update.">
<field name="member-id" type="uint64"/>
<field name="connection-id" type="uint64"/>
+ <field name="user-name" type="str8"/>
</control>
<!-- Complete a cluster state update. -->