summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-12-06 15:56:40 +0000
committerAlan Conway <aconway@apache.org>2011-12-06 15:56:40 +0000
commitae0f67263950f41ce6078a9fde79be78d47f4a11 (patch)
treec2f1105dc677a6739d3faca8e2bb860e12209329 /qpid/cpp/src
parent03d03c025427c234fedcfae3126f0092afa0e1e7 (diff)
downloadqpid-python-ae0f67263950f41ce6078a9fde79be78d47f4a11.tar.gz
QPID-3652: Fix cluster authentication.
Only allow brokers that authenticate as the cluster-username to join a cluster. New broker first connects to a cluster broker authenticates as the cluster-username and sends its CPG member ID to the qpid.cluster-credentials exchange. The cluster broker that subsequently acts as updater verifies that the credentials are valid before connecting to give the update. NOTE 1: If you are using an ACL, the cluster-username must be allowed to publish to the qpid.cluster-credentials exchange. E.g. in your ACL file: acl allow foo@QPID publish exchange name=qpid.cluster-credentials NOTE 2: This changes the cluster initialization protocol, you will need to restart the cluster with all new version brokers. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1210989 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/cluster.mk2
-rw-r--r--qpid/cpp/src/qpid/UrlArray.cpp41
-rw-r--r--qpid/cpp/src/qpid/UrlArray.h37
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.h28
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--qpid/cpp/src/qpid/client/FailoverListener.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp54
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp94
-rw-r--r--qpid/cpp/src/qpid/cluster/CredentialsExchange.h72
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/InitialStatusMap.h4
-rw-r--r--qpid/cpp/src/tests/InitialStatusMap.cpp18
-rw-r--r--qpid/cpp/src/tests/brokertest.py8
-rw-r--r--qpid/cpp/src/tests/cluster_authentication_soak.cpp2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py50
19 files changed, 397 insertions, 50 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index cd038a96d5..fb26251da0 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -360,6 +360,8 @@ libqpidcommon_la_SOURCES += \
qpid/StringUtils.cpp \
qpid/StringUtils.h \
qpid/Url.cpp \
+ qpid/UrlArray.cpp \
+ qpid/UrlArray.h \
qpid/Version.h \
qpid/amqp_0_10/Exception.h \
qpid/amqp_0_10/SessionHandler.cpp \
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 3ce4ce25b3..632522e84f 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -55,6 +55,8 @@ cluster_la_SOURCES = \
qpid/cluster/ConnectionCodec.h \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
+ qpid/cluster/CredentialsExchange.cpp \
+ qpid/cluster/CredentialsExchange.h \
qpid/cluster/Dispatchable.h \
qpid/cluster/UpdateClient.cpp \
qpid/cluster/UpdateClient.h \
diff --git a/qpid/cpp/src/qpid/UrlArray.cpp b/qpid/cpp/src/qpid/UrlArray.cpp
new file mode 100644
index 0000000000..489309c8ad
--- /dev/null
+++ b/qpid/cpp/src/qpid/UrlArray.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "UrlArray.h"
+
+namespace qpid {
+
+std::vector<Url> urlArrayToVector(const framing::Array& array) {
+ std::vector<Url> urls;
+ for (framing::Array::ValueVector::const_iterator i = array.begin();
+ i != array.end();
+ ++i )
+ urls.push_back(Url((*i)->get<std::string>()));
+ return urls;
+}
+
+framing::Array vectorToUrlArray(const std::vector<Url>& urls) {
+ framing::Array array(0x95);
+ for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ array.add(boost::shared_ptr<framing::Str16Value>(new framing::Str16Value(i->str())));
+ return array;
+}
+
+} // namespace qpid
diff --git a/qpid/cpp/src/qpid/UrlArray.h b/qpid/cpp/src/qpid/UrlArray.h
new file mode 100644
index 0000000000..c82d5639f1
--- /dev/null
+++ b/qpid/cpp/src/qpid/UrlArray.h
@@ -0,0 +1,37 @@
+#ifndef QPID_CLUSTER_URLARRAY_H
+#define QPID_CLUSTER_URLARRAY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Array.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/Url.h"
+#include <vector>
+
+namespace qpid {
+
+/** @file Functions to encode/decode an array of URLs. */
+std::vector<Url> urlArrayToVector(const framing::Array& array);
+framing::Array vectorToUrlArray(const std::vector<Url>& urls);
+} // namespace qpid
+
+#endif /*!QPID_CLUSTER_URLARRAY_H*/
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h
index fdd3c4ddc0..13205e3a3d 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionState.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionState.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,7 +49,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable
userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering)
federationLink(true),
clientSupportsThrottling(false),
- clusterOrderOut(0)
+ clusterOrderOut(0),
+ isDefaultRealm(false)
{}
virtual ~ConnectionState () {}
@@ -62,7 +63,15 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
- virtual void setUserId(const std::string& uid) { userId = uid; }
+ virtual void setUserId(const std::string& uid) {
+ userId = uid;
+ size_t at = userId.find('@');
+ userName = userId.substr(0, at);
+ isDefaultRealm = (
+ at!= std::string::npos &&
+ getBroker().getOptions().realm == userId.substr(at+1,userId.size()));
+ }
+
const std::string& getUserId() const { return userId; }
void setUrl(const std::string& _url) { url = _url; }
@@ -75,7 +84,14 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
const std::string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
-
+
+ /**@return true if user is the authenticated user on this connection.
+ * If id has the default realm will also compare plain username.
+ */
+ bool isAuthenticatedUser(const std::string& id) const {
+ return (id == userId || (isDefaultRealm && id == userName));
+ }
+
void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
bool getClientThrottling() const { return clientSupportsThrottling; }
@@ -114,6 +130,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable
std::vector<Url> knownHosts;
bool clientSupportsThrottling;
framing::FrameHandler* clusterOrderOut;
+ std::string userName;
+ bool isDefaultRealm;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index aa1face18d..0956501e76 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -72,8 +72,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
- userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
- isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
closeComplete(false)
{}
@@ -467,7 +465,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
/* verify the userid if specified: */
std::string id =
msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
- if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
+ if (authMsg && !id.empty() && !session.getConnection().isAuthenticatedUser(id))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id));
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 7e97a5c4cf..c08aaba07e 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -164,8 +164,6 @@ class SemanticState : private boost::noncopyable {
boost::shared_ptr<Exchange> cacheExchange;
const bool authMsg;
const std::string userID;
- const std::string userName;
- const bool isDefaultRealm;
bool closeComplete;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp
index bf4fa91d49..1a69182c90 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Helpers.h"
+#include "qpid/UrlArray.h"
namespace qpid {
namespace client {
@@ -83,14 +84,9 @@ std::vector<Url> FailoverListener::getKnownBrokers() const {
}
std::vector<Url> FailoverListener::getKnownBrokers(const Message& msg) {
- std::vector<Url> knownBrokers;
framing::Array urlArray;
msg.getHeaders().getArray("amq.failover", urlArray);
- for (framing::Array::ValueVector::const_iterator i = urlArray.begin();
- i != urlArray.end();
- ++i )
- knownBrokers.push_back(Url((*i)->get<std::string>()));
- return knownBrokers;
+ return urlArrayToVector(urlArray);
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 0241b0946b..40bfcd9285 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -130,6 +130,7 @@
#include "qpid/cluster/UpdateDataExchange.h"
#include "qpid/cluster/UpdateExchange.h"
#include "qpid/cluster/ClusterTimer.h"
+#include "qpid/cluster/CredentialsExchange.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -162,6 +163,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
+#include "qpid/UrlArray.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/memory.h"
#include "qpid/sys/Thread.h"
@@ -189,6 +191,7 @@ using management::ManagementObject;
using management::Manageable;
using management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
+namespace arg=client::arg;
/**
* NOTE: must increment this number whenever any incompatible changes in
@@ -199,7 +202,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1159330;
+const uint32_t Cluster::CLUSTER_VERSION = 1207877;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -211,12 +214,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
uint8_t storeState, const Uuid& shutdownId,
- const std::string& firstConfig)
+ const std::string& firstConfig, const framing::Array& urls)
{
cluster.initialStatus(
member, version, active, clusterId,
framing::cluster::StoreState(storeState), shutdownId,
- firstConfig, l);
+ firstConfig, urls, l);
}
void ready(const std::string& url) {
cluster.ready(member, url, l);
@@ -267,6 +270,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
poller),
failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
updateDataExchange(new UpdateDataExchange(*this)),
+ credentialsExchange(new CredentialsExchange(*this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -300,6 +304,9 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// for single control frame.
broker.getExchanges().registerExchange(updateDataExchange);
+ // CredentialsExchange is used to authenticate new cluster members
+ broker.getExchanges().registerExchange(credentialsExchange);
+
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
@@ -661,6 +668,7 @@ void Cluster::initMapCompleted(Lock& l) {
setClusterId(initMap.getClusterId(), l);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
+ authenticate();
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
@@ -711,7 +719,8 @@ void Cluster::configChange(const MemberId&,
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
store.getState(), store.getShutdownId(),
- initMap.getFirstConfigStr()
+ initMap.getFirstConfigStr(),
+ vectorToUrlArray(getUrls(l))
),
self);
}
@@ -803,6 +812,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
const std::string& firstConfig,
+ const framing::Array& urls,
Lock& l)
{
if (version != CLUSTER_VERSION) {
@@ -816,7 +826,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
initMap.received(
member,
ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
- store, shutdownId, firstConfig)
+ store, shutdownId, firstConfig, urls)
);
if (initMap.transitionToComplete()) initMapCompleted(l);
}
@@ -903,6 +913,11 @@ void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+ // Check for credentials if authentication is enabled.
+ if (broker.getOptions().auth && !credentialsExchange->check(updatee)) {
+ QPID_LOG(error, "Un-authenticated attempt to join the cluster");
+ return;
+ }
// NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
if (state == LEFT) return;
assert(state == OFFER);
@@ -1115,6 +1130,35 @@ void Cluster::updateMgmtMembership(Lock& l) {
mgmtObject->set_memberIDs(idstr);
}
+namespace {
+template <class T> struct AutoClose {
+ T closeme;
+ AutoClose(T t) : closeme(t) {}
+ ~AutoClose() { closeme.close(); }
+};
+}
+
+// Updatee connects to established member and stores credentials
+// in the qpid.cluster-credentials exchange to prove it
+// is safe for updater to connect and give an update.
+void Cluster::authenticate() {
+ if (!broker.getOptions().auth) return;
+ std::vector<Url> urls = initMap.getUrls();
+ for (std::vector<Url>::iterator i = urls.begin(); i != urls.end(); ++i) {
+ if (!i->empty()) {
+ client::Connection c;
+ c.open(*i, connectionSettings(settings));
+ AutoClose<client::Connection> closeConnection(c);
+ client::Session s = c.newSession(CredentialsExchange::NAME);
+ AutoClose<client::Session> closeSession(s);
+ client::Message credentials;
+ credentials.getHeaders().setUInt64(CredentialsExchange::NAME, getId());
+ s.messageTransfer(arg::content=credentials, arg::destination=CredentialsExchange::NAME);
+ s.sync();
+ }
+ }
+}
+
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = {
"PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index ccec4948e6..f517c1b8d0 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -77,6 +77,7 @@ class Connection;
struct EventFrame;
class ClusterTimer;
class UpdateDataExchange;
+class CredentialsExchange;
/**
* Connection to the cluster
@@ -187,6 +188,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
framing::cluster::StoreState,
const framing::Uuid& shutdownId,
const std::string& firstConfig,
+ const framing::Array& urls,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&,
@@ -215,6 +217,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void becomeElder(Lock&);
void setMgmtStatus(Lock&);
void updateMgmtMembership(Lock&);
+ void authenticate();
// == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
@@ -271,6 +274,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
PollableFrameQueue deliverFrameQueue;
boost::shared_ptr<FailoverExchange> failoverExchange;
boost::shared_ptr<UpdateDataExchange> updateDataExchange;
+ boost::shared_ptr<CredentialsExchange> credentialsExchange;
Quorum quorum;
LockedConnectionMap localConnections;
diff --git a/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp b/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
new file mode 100644
index 0000000000..0fafc521cd
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "CredentialsExchange.h"
+#include "Cluster.h"
+#include "qpid/broker/ConnectionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+
+const string CredentialsExchange::NAME=("qpid.cluster-credentials");
+
+namespace {
+const string ANONYMOUS_MECH("ANONYMOUS");
+const string ANONYMOUS_USER("anonymous");
+
+string effectiveUserId(const string& username, const string& mechanism) {
+ if (mechanism == ANONYMOUS_MECH && username.empty())
+ return ANONYMOUS_USER;
+ else
+ return username;
+}
+}
+
+CredentialsExchange::CredentialsExchange(Cluster& cluster)
+ : broker::Exchange(NAME, &cluster),
+ username(effectiveUserId(cluster.getSettings().username,
+ cluster.getSettings().mechanism)),
+ timeout(120*sys::TIME_SEC),
+ authenticate(cluster.getBroker().getOptions().auth)
+{}
+
+static const string anonymous("anonymous");
+
+bool CredentialsExchange::check(MemberId member) {
+ sys::Mutex::ScopedLock l(lock);
+ Map::iterator i = map.find(member);
+ if (i == map.end()) return false;
+ bool valid = (sys::Duration(i->second, sys::AbsTime::now()) < timeout);
+ map.erase(i);
+ return valid;
+}
+
+void CredentialsExchange::route(broker::Deliverable& msg, const string& /*routingKey*/, const framing::FieldTable* args) {
+ sys::Mutex::ScopedLock l(lock);
+ const broker::ConnectionState* connection =
+ static_cast<const broker::ConnectionState*>(msg.getMessage().getPublisher());
+ if (authenticate && !connection->isAuthenticatedUser(username))
+ throw framing::UnauthorizedAccessException(
+ QPID_MSG("Unauthorized user " << connection->getUserId() << " for " << NAME
+ << ", should be " << username));
+ if (!args || !args->isSet(NAME))
+ throw framing::InvalidArgumentException(
+ QPID_MSG("Invalid message received by " << NAME));
+ MemberId member(args->getAsUInt64(NAME));
+ map[member] = sys::AbsTime::now();
+}
+
+string CredentialsExchange::getType() const { return NAME; }
+
+namespace {
+void throwIllegal() {
+ throw framing::NotAllowedException(
+ QPID_MSG("Illegal use of " << CredentialsExchange::NAME+" exchange"));
+}
+}
+
+bool CredentialsExchange::bind(boost::shared_ptr<broker::Queue> , const string& /*routingKey*/, const framing::FieldTable* ) { throwIllegal(); return false; }
+bool CredentialsExchange::unbind(boost::shared_ptr<broker::Queue> , const string& /*routingKey*/, const framing::FieldTable* ) { throwIllegal(); return false; }
+bool CredentialsExchange::isBound(boost::shared_ptr<broker::Queue>, const string* const /*routingKey*/, const framing::FieldTable* const ) { throwIllegal(); return false; }
+
+
+}} // Namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/CredentialsExchange.h b/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
new file mode 100644
index 0000000000..90fd188271
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
@@ -0,0 +1,72 @@
+#ifndef QPID_CLUSTER_CREDENTIALSEXCHANGE_H
+#define QPID_CLUSTER_CREDENTIALSEXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include <qpid/broker/Exchange.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
+#include <string>
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+class Cluster;
+
+/**
+ * New members joining the cluster send their identity information to this
+ * exchange to prove they are authenticated as the cluster user.
+ * The exchange rejects messages that are not properly authenticated
+ */
+class CredentialsExchange : public broker::Exchange
+{
+ public:
+ static const std::string NAME;
+
+ CredentialsExchange(Cluster&);
+
+ /** Check if this member has credentials. The credentials are deleted. */
+ bool check(MemberId member);
+
+ /** Throw an exception if the calling connection is not the cluster user. Store credentials in msg. */
+ void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+
+ // Exchange overrides
+ std::string getType() const;
+ bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args);
+
+ private:
+ typedef std::map<MemberId, sys::AbsTime> Map;
+ sys::Mutex lock;
+ Map map;
+ std::string username;
+ sys::Duration timeout;
+ bool authenticate;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CREDENTIALSEXCHANGE_H*/
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
index cfbe34a460..43ec27cf2c 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -28,6 +28,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/Array.h"
+#include "qpid/UrlArray.h"
#include <boost/bind.hpp>
#include <algorithm>
@@ -86,9 +87,7 @@ void FailoverExchange::route(Deliverable&, const string& , const framing::FieldT
void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
// Called with lock held.
if (urls.empty()) return;
- framing::Array array(0x95);
- for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
- array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
+ framing::Array array = vectorToUrlArray(urls);
const ProtocolVersion v;
boost::intrusive_ptr<Message> msg(new Message);
AMQFrame command(MessageTransferBody(v, typeName, 1, 0));
diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
index c8ecc13f2c..eb65005a9e 100644
--- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -21,6 +21,7 @@
#include "InitialStatusMap.h"
#include "StoreStatus.h"
#include "qpid/log/Statement.h"
+#include "qpid/UrlArray.h"
#include <algorithm>
#include <vector>
#include <boost/bind.hpp>
@@ -218,6 +219,17 @@ void InitialStatusMap::checkConsistent() {
}
}
+std::vector<Url> InitialStatusMap::getUrls() const {
+ std::vector<Url> urls;
+ for (Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ if (i->second) {
+ std::vector<Url> urls = urlArrayToVector(i->second->getUrls());
+ if (!urls.empty()) return urls;
+ }
+ }
+ return std::vector<Url>();
+}
+
std::string InitialStatusMap::getFirstConfigStr() const {
assert(!firstConfig.empty());
return encodeMemberSet(firstConfig);
diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
index a5a600365e..afa0110836 100644
--- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
+++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -23,8 +23,10 @@
*/
#include "MemberSet.h"
+#include "qpid/Url.h"
#include <qpid/framing/ClusterInitialStatusBody.h>
#include <boost/optional.hpp>
+#include <vector>
namespace qpid {
namespace cluster {
@@ -69,6 +71,8 @@ class InitialStatusMap
framing::Uuid getClusterId();
/**@pre isComplete(). @throw Exception if there are any inconsistencies. */
void checkConsistent();
+ /*@return cluster URLs */
+ std::vector<Url> getUrls() const;
/** Get first config-change for this member, encoded as a string.
*@pre configChange has been called at least once.
diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp
index ecbe2d4161..95806737e3 100644
--- a/qpid/cpp/src/tests/InitialStatusMap.cpp
+++ b/qpid/cpp/src/tests/InitialStatusMap.cpp
@@ -36,21 +36,25 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite)
typedef InitialStatusMap::Status Status;
-Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
+Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet(),
+ const framing::Array& urls=framing::Array())
+{
return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(),
- encodeMemberSet(ms));
+ encodeMemberSet(ms), urls);
}
-Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
+Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet(),
+ const framing::Array& urls=framing::Array())
+{
return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(),
- encodeMemberSet(ms));
+ encodeMemberSet(ms), urls);
}
Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid(),
- const MemberSet& ms=MemberSet())
+ const MemberSet& ms=MemberSet(), const framing::Array& urls=framing::Array())
{
- return Status(ProtocolVersion(), 0, active, start, state, stop,
- encodeMemberSet(ms));
+ return Status(ProtocolVersion(), 0, active, start, state, stop,
+ encodeMemberSet(ms), urls);
}
QPID_AUTO_TEST_CASE(testFirstInCluster) {
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 16d7fb0b78..12aed1f671 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -363,16 +363,20 @@ class Broker(Popen):
def host_port(self): return "%s:%s" % (self.host(), self.port())
+ def log_contains(self, str, timeout=1):
+ """Wait for str to appear in the log file up to timeout. Return true if found"""
+ return retry(lambda: find_in_file(str, self.log), timeout)
+
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
if not self._log_ready:
self._log_ready = find_in_file("notice Broker running", self.log)
return self._log_ready
- def ready(self, **kwargs):
+ def ready(self, timeout=5, **kwargs):
"""Wait till broker is ready to serve clients"""
# First make sure the broker is listening by checking the log.
- if not retry(self.log_ready, timeout=60):
+ if not retry(self.log_ready, timeout=timeout):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
# Create a connection and a session. For a cluster broker this will
diff --git a/qpid/cpp/src/tests/cluster_authentication_soak.cpp b/qpid/cpp/src/tests/cluster_authentication_soak.cpp
index b8e8a22693..a3271701c3 100644
--- a/qpid/cpp/src/tests/cluster_authentication_soak.cpp
+++ b/qpid/cpp/src/tests/cluster_authentication_soak.cpp
@@ -96,7 +96,7 @@ startBroker ( brokerVector & brokers , int brokerNumber, string const & clusterN
argv.push_back (clusterArg.str());
argv.push_back ("--cluster-username=zig");
argv.push_back ("--cluster-password=zig");
- argv.push_back ("--cluster-mechanism=ANONYMOUS");
+ argv.push_back ("--cluster-mechanism=PLAIN");
argv.push_back ("--sasl-config=./sasl_config");
argv.push_back ("--auth=yes");
argv.push_back ("--mgmt-enable=yes");
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 0e80e06d34..2db2cdd433 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -114,7 +114,9 @@ class ShortTests(BrokerTest):
sasl_config=os.path.join(self.rootdir, "sasl_config")
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
+ # Must allow cluster-user (zag) access to credentials exchange.
aclf.write("""
+acl allow zag@QPID publish exchange name=qpid.cluster-credentials
acl allow zig@QPID all all
acl deny all all
""")
@@ -122,7 +124,11 @@ acl deny all all
cluster = self.cluster(1, args=["--auth", "yes",
"--sasl-config", sasl_config,
"--load-module", os.getenv("ACL_LIB"),
- "--acl-file", acl])
+ "--acl-file", acl,
+ "--cluster-username=zag",
+ "--cluster-password=zag",
+ "--cluster-mechanism=PLAIN"
+ ])
# Valid user/password, ensure queue is created.
c = cluster[0].connect(username="zig", password="zig")
@@ -167,39 +173,51 @@ acl deny all all
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
- def test_sasl_join(self):
+ def test_sasl_join_good(self):
"""Verify SASL authentication between brokers when joining a cluster."""
sasl_config=os.path.join(self.rootdir, "sasl_config")
# Test with a valid username/password
cluster = self.cluster(1, args=["--auth", "yes",
"--sasl-config", sasl_config,
- "--load-module", os.getenv("ACL_LIB"),
"--cluster-username=zig",
"--cluster-password=zig",
"--cluster-mechanism=PLAIN"
])
cluster.start()
- cluster.ready()
- c = cluster[1].connect(username="zag", password="zag")
+ c = cluster[1].connect(username="zag", password="zag", mechanism="PLAIN")
- # Test with an invalid username/password
+ def test_sasl_join_bad_password(self):
+ # Test with an invalid password
cluster = self.cluster(1, args=["--auth", "yes",
- "--sasl-config", sasl_config,
- "--load-module", os.getenv("ACL_LIB"),
- "--cluster-username=x",
- "--cluster-password=y",
+ "--sasl-config", os.path.join(self.rootdir, "sasl_config"),
+ "--cluster-username=zig",
+ "--cluster-password=bad",
"--cluster-mechanism=PLAIN"
])
- try:
- cluster.start(expect=EXPECT_EXIT_OK)
- cluster[1].ready()
- self.fail("Expected exception")
- except: pass
+ cluster.start(wait=False, expect=EXPECT_EXIT_FAIL)
+ assert cluster[1].log_contains("critical Unexpected error: connection-forced: Authentication failed")
+
+ def test_sasl_join_wrong_user(self):
+ # Test with a valid user that is not the cluster user.
+ cluster = self.cluster(0, args=["--auth", "yes",
+ "--sasl-config", os.path.join(self.rootdir, "sasl_config")])
+ cluster.start(args=["--cluster-username=zig",
+ "--cluster-password=zig",
+ "--cluster-mechanism=PLAIN"
+ ])
+
+ cluster.start(wait=False, expect=EXPECT_EXIT_FAIL,
+ args=["--cluster-username=zag",
+ "--cluster-password=zag",
+ "--cluster-mechanism=PLAIN"
+ ])
+ assert cluster[1].log_contains("critical Unexpected error: unauthorized-access: unauthorized-access: Unauthorized user zag@QPID for qpid.cluster-credentials, should be zig")
def test_user_id_update(self):
"""Ensure that user-id of an open session is updated to new cluster members"""
sasl_config=os.path.join(self.rootdir, "sasl_config")
- cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,])
+ cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,
+ "--cluster-mechanism=ANONYMOUS"])
c = cluster[0].connect(username="zig", password="zig")
s = c.session().sender("q;{create:always}")
s.send(Message("x", user_id="zig")) # Message sent before start new broker