summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-11 15:26:28 +0000
committerAlan Conway <aconway@apache.org>2012-07-11 15:26:28 +0000
commiteb16524f0a84674378e66ceadbd30f5994d8da65 (patch)
tree81997deb6242f263ec31988e1d75f6c9b1c75cfd
parentef316307b867114becafeb2390b02f14e765b33f (diff)
downloadqpid-python-eb16524f0a84674378e66ceadbd30f5994d8da65.tar.gz
QPID-4118: HA does not work with authentication and authorization.
- Updated test framework to use credentials - Updated BrokerReplicator to use HA identity to create configuration - Updated documentation with a HA security section. - Updated qpid-ha to take --sasl-mechanism git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1360227 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h5
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp40
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h3
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py97
-rw-r--r--qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml53
-rwxr-xr-xqpid/tools/src/py/qpid-ha9
8 files changed, 179 insertions, 36 deletions
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index a22972ddd2..4af4692f78 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -172,7 +172,9 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
AclModule* acl = connection.getBroker().getAcl();
FieldTable properties;
if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
- proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
+ proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
+ QPID_MSG("ACL denied " << connection.getUserId()
+ << " creating a federation link"));
return;
}
QPID_LOG(info, "Connection is a federation link");
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index c92b368b0e..f0cb90e73b 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -196,6 +196,11 @@ class Link : public PersistableConfig, public management::Manageable {
static std::string createName(const std::string& transport,
const std::string& host,
uint16_t port);
+
+ /** The current connction for this link. Note returns 0 if the link is not
+ * presently connected.
+ */
+ Connection* getConnection() { return connection; }
};
}
}
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index bbdacb8aa9..8ffe411c91 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -72,7 +72,7 @@ Url Backup::removeSelf(const Url& brokers) const {
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers);
+ QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
Url url = removeSelf(brokers);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
types::Uuid uuid(true);
@@ -82,7 +82,7 @@ void Backup::initialize(const Url& brokers) {
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password,
- false); // amq.failover
+ true); // amq.failover
{
sys::Mutex::ScopedLock l(lock);
link = result.first;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 1fabff6a09..3eb30a9ec9 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -22,6 +22,7 @@
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/framing/FieldTable.h"
@@ -90,9 +91,7 @@ const string KEY("key");
const string NAME("name");
const string QNAME("qName");
const string QUEUE("queue");
-const string RHOST("rhost");
const string TYPE("type");
-const string USER("user");
const string HA_BROKER("habroker");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
@@ -202,6 +201,14 @@ BrokerReplicator::~BrokerReplicator() { }
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ // Use the credentials of the outgoing Link connection for creating queues,
+ // exchanges etc. We know link->getConnection() is non-zero because we are
+ // being called in the connections thread context.
+ //
+ assert(link->getConnection());
+ userId = link->getConnection()->getUserId();
+ remoteHost = link->getConnection()->getUrl();
+
qpid::Address primary;
link->getRemoteAddress(primary);
string queueName = bridge.getQueueName();
@@ -320,10 +327,11 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
values[DURABLE].asBool(),
autoDel,
0, // no owner regardless of exclusivity on primary
+ // FIXME aconway 2012-07-06: handle alternate exchange
values[ALTEX].asString(),
args,
- values[USER].asString(),
- values[RHOST].asString());
+ userId,
+ remoteHost);
assert(result.second); // Should be true since we destroyed existing queue above
startQueueReplicator(result.first);
}
@@ -345,7 +353,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
if (queue && replicationTest.replicateLevel(queue->getSettings())) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
stopQueueReplicator(name);
- broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+ broker.deleteQueue(name, userId, remoteHost);
}
}
@@ -368,10 +376,11 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
name,
values[EXTYPE].asString(),
values[DURABLE].asBool(),
+ // FIXME aconway 2012-07-06: handle alternate exchanges
values[ALTEX].asString(),
args,
- values[USER].asString(),
- values[RHOST].asString());
+ userId,
+ remoteHost);
assert(result.second);
}
}
@@ -385,10 +394,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
- broker.deleteExchange(
- name,
- values[USER].asString(),
- values[RHOST].asString());
+ broker.deleteExchange(name, userId, remoteHost);
}
}
@@ -458,8 +464,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
0 /*i.e. no owner regardless of exclusivity on master*/,
""/*TODO: need to include alternate-exchange*/,
args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/);
+ userId,
+ remoteHost);
+
// It is normal for the queue to already exist if we are failing over.
if (result.second)
startQueueReplicator(result.first);
@@ -478,10 +485,11 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
name,
values[TYPE].asString(),
values[DURABLE].asBool(),
- ""/*TODO: need to include alternate-exchange*/,
+ "", // FIXME aconway 2012-07-09: need to include alternate-exchange
args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second;
+ userId,
+ remoteHost
+ ).second;
QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name);
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 35ffdd0cd8..e2ca8f9e14 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -54,7 +54,7 @@ class QueueReplicator;
* exchanges and bindings to replicate the primary.
* It also creates QueueReplicators for newly replicated queues.
*
- * THREAD SAFE: Has no mutable state.
+ * THREAD UNSAFE: Only called in Link connection thread, no need for locking.
*
*/
class BrokerReplicator : public broker::Exchange,
@@ -96,6 +96,7 @@ class BrokerReplicator : public broker::Exchange,
void stopQueueReplicator(const std::string& name);
std::string logPrefix;
+ std::string userId, remoteHost;
ReplicationTest replicationTest;
HaBroker& haBroker;
broker::Broker& broker;
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f7f48fc027..3612837214 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -18,7 +18,7 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
from qpid.datatypes import uuid4
@@ -32,9 +32,9 @@ log = getLogger(__name__)
class QmfAgent(object):
"""Access to a QMF broker agent."""
- def __init__(self, address):
+ def __init__(self, address, **kwargs):
self._connection = Connection.establish(
- address, client_properties={"qpid.ha-admin":1})
+ address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
@@ -42,10 +42,23 @@ class QmfAgent(object):
a = getattr(self._agent, name)
return a
+class Credentials(object):
+ """SASL credentials: username, password, and mechanism"""
+ def __init__(self, username, password, mechanism):
+ (self.username, self.password, self.mechanism) = (username, password, mechanism)
+
+ def __str__(self): return "Credentials%s"%(self.tuple(),)
+
+ def tuple(self): return (self.username, self.password, self.mechanism)
+
+ def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+
class HaBroker(Broker):
- """Start a broker with HA enabled"""
- def __init__(self, test, args=[], brokers_url=None, ha_cluster=True,
- ha_replicate="all", **kwargs):
+ """Start a broker with HA enabled
+ @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
+ """
+ def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
+ client_credentials=None, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
@@ -64,19 +77,30 @@ class HaBroker(Broker):
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
self.qpid_ha_script=import_script(self.qpid_ha_path)
self._agent = None
+ self.client_credentials = client_credentials
def __str__(self): return Broker.__str__(self)
- # FIXME aconway 2012-06-26: check exit status from script.
def qpid_ha(self, args):
- self.qpid_ha_script.main_except(["", "-b", self.host_port()]+args)
+ cred = self.client_credentials
+ url = self.host_port()
+ if cred:
+ url =cred.add_user(url)
+ args = args + ["--sasl-mechanism", cred.mechanism]
+ self.qpid_ha_script.main_except(["", "-b", url]+args)
def promote(self): self.qpid_ha(["promote"])
def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+
def agent(self):
- if not self._agent: self._agent = QmfAgent(self.host_port())
+ if not self._agent:
+ cred = self.client_credentials
+ if cred:
+ self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
+ else:
+ self._agent = QmfAgent(self.host_port())
return self._agent
def ha_status(self):
@@ -103,7 +127,14 @@ class HaBroker(Broker):
self.qpid_config(["add", "queue", queue, "--replicate", replication])
def connect_admin(self, **kwargs):
- return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+ cred = self.client_credentials
+ if cred:
+ return Broker.connect(
+ self, client_properties={"qpid.ha-admin":1},
+ username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
+ **kwargs)
+ else:
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
def wait_backup(self, address):
"""Wait for address to become valid on a backup broker."""
@@ -140,7 +171,7 @@ class HaBroker(Broker):
class HaCluster(object):
_cluster_count = 0
- def __init__(self, test, n, **kwargs):
+ def __init__(self, test, n, promote=True, **kwargs):
"""Start a cluster of n brokers"""
self.test = test
self.kwargs = kwargs
@@ -685,6 +716,48 @@ class ReplicationTests(BrokerTest):
qmf = broker.agent().getHaBroker()
assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ def test_auth(self):
+ """Verify that authentication does not interfere with replication."""
+ # FIXME aconway 2012-07-09: generate test sasl config portably for cmake
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ if not os.path.exists(sasl_config):
+ print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
+ return
+ acl=os.path.join(os.getcwd(), "policy.acl")
+ aclf=file(acl,"w")
+ # Verify that replication works with auth=yes and HA user has at least the following
+ # privileges:
+ aclf.write("""
+acl allow zag@QPID access queue
+acl allow zag@QPID create queue
+acl allow zag@QPID consume queue
+acl allow zag@QPID delete queue
+acl allow zag@QPID access exchange
+acl allow zag@QPID create exchange
+acl allow zag@QPID bind exchange
+acl allow zag@QPID publish exchange
+acl allow zag@QPID delete exchange
+acl allow zag@QPID access method
+acl allow zag@QPID create link
+acl deny all all
+ """)
+ aclf.close()
+ cluster = HaCluster(
+ self, 2,
+ args=["--auth", "yes", "--sasl-config", sasl_config,
+ "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"),
+ "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
+ ],
+ client_credentials=Credentials("zag", "zag", "PLAIN"))
+ s0 = cluster[0].connect(username="zag", password="zag").session();
+ s0.receiver("q;{create:always}")
+ s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ cluster[1].wait_backup("q")
+ cluster[1].wait_backup("ex")
+ s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
+ s1.sender("ex").send("foo");
+ self.assertEqual(s1.receiver("q").fetch().content, "foo")
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -831,6 +904,8 @@ class RecoveryTests(BrokerTest):
s1.session.connection.close()
s2.session.connection.close()
+
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")
diff --git a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
index d00464c92c..9fcadbcbe9 100644
--- a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
+++ b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
@@ -221,7 +221,10 @@ under the License.
<para><literal>--ha-mechanism <replaceable>MECH</replaceable></literal></para>
</entry>
<entry>
- Authentication settings used by brokers to connect to each other.
+ Authentication settings used by HA brokers to connect to each other.
+ If you are using authorization
+ (<xref linkend="sect-Messaging_User_Guide-Security-Authorization"/>)
+ then this user must have all permissions.
</entry>
</row>
</tbody>
@@ -630,7 +633,53 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl
</section>
</section>
- <section>
+ <section>
+ <title>Security.</title>
+ <para>
+ You can secure your cluster using the authenticiation and authorization features
+ described in <xref linkend="chap-Messaging_User_Guide-Security"/>.
+ </para>
+ <para>
+ Backup brokers connect to the primary broker and subscribe for management
+ events and queue contents. You can specifiy the identity used to connect
+ to the primary with the following options:
+ </para>
+ <table frame="all" id="ha-broker-security-options">
+ <title>Security options for High Availability Messaging Cluster</title>
+ <tgroup align="left" cols="2" colsep="1" rowsep="1">
+ <colspec colname="c1" colwidth="1*"/>
+ <colspec colname="c2" colwidth="3*"/>
+ <thead>
+ <row>
+ <entry align="center" nameend="c2" namest="c1">
+ Security options for High Availability Messaging Cluster
+ </entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry>
+ <para><literal>--ha-username <replaceable>USER</replaceable></literal></para>
+ <para><literal>--ha-password <replaceable>PASS</replaceable></literal></para>
+ <para><literal>--ha-mechanism <replaceable>MECH</replaceable></literal></para>
+ </entry>
+ <entry>
+ Authentication settings used by HA brokers to connect to each other.
+ If you are using authorization
+ (<xref linkend="sect-Messaging_User_Guide-Security-Authorization"/>)
+ then this user must have all permissions.
+ </entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ <para>
+ This identity is also used to authorize actions taken on the backup broker to replicate
+ from the primary, for example to create queues or exchanges.
+ </para>
+ </section>
+
+ <section>
<title>Integrating with other Cluster Resource Managers</title>
<para>
To integrate with a different resource manager you must configure it to:
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
index 38609eef43..6ddde93967 100755
--- a/qpid/tools/src/py/qpid-ha
+++ b/qpid/tools/src/py/qpid-ha
@@ -46,15 +46,18 @@ class Command:
usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
self.help = help
self.op=optparse.OptionParser(usage)
- self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>")
+ self.op.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ self.op.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
def execute(self, args):
opts, args = self.op.parse_args(args)
if len(args) != len(self.arg_names)+1:
self.op.print_help()
raise Exception("Wrong number of arguments")
- broker = opts.broker or "localhost:5672"
- connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
+ connection = Connection.establish(
+ opts.broker,
+ sasl_mechanisms=opts.sasl_mechanism,
+ client_properties={"qpid.ha-admin":1})
qmf_broker = BrokerAgent(connection)
ha_broker = qmf_broker.getHaBroker()
if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker)