diff options
author | Alan Conway <aconway@apache.org> | 2012-07-11 15:26:28 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-07-11 15:26:28 +0000 |
commit | eb16524f0a84674378e66ceadbd30f5994d8da65 (patch) | |
tree | 81997deb6242f263ec31988e1d75f6c9b1c75cfd | |
parent | ef316307b867114becafeb2390b02f14e765b33f (diff) | |
download | qpid-python-eb16524f0a84674378e66ceadbd30f5994d8da65.tar.gz |
QPID-4118: HA does not work with authentication and authorization.
- Updated test framework to use credentials
- Updated BrokerReplicator to use HA identity to create configuration
- Updated documentation with a HA security section.
- Updated qpid-ha to take --sasl-mechanism
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1360227 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 40 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 97 | ||||
-rw-r--r-- | qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml | 53 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-ha | 9 |
8 files changed, 179 insertions, 36 deletions
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index a22972ddd2..4af4692f78 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -172,7 +172,9 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) AclModule* acl = connection.getBroker().getAcl(); FieldTable properties; if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){ - proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); + proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, + QPID_MSG("ACL denied " << connection.getUserId() + << " creating a federation link")); return; } QPID_LOG(info, "Connection is a federation link"); diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index c92b368b0e..f0cb90e73b 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -196,6 +196,11 @@ class Link : public PersistableConfig, public management::Manageable { static std::string createName(const std::string& transport, const std::string& host, uint16_t port); + + /** The current connction for this link. Note returns 0 if the link is not + * presently connected. + */ + Connection* getConnection() { return connection; } }; } } diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index bbdacb8aa9..8ffe411c91 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -72,7 +72,7 @@ Url Backup::removeSelf(const Url& brokers) const { void Backup::initialize(const Url& brokers) { if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers); + QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); Url url = removeSelf(brokers); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; types::Uuid uuid(true); @@ -82,7 +82,7 @@ void Backup::initialize(const Url& brokers) { url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password, - false); // amq.failover + true); // amq.failover { sys::Mutex::ScopedLock l(lock); link = result.first; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 1fabff6a09..3eb30a9ec9 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -22,6 +22,7 @@ #include "HaBroker.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/framing/FieldTable.h" @@ -90,9 +91,7 @@ const string KEY("key"); const string NAME("name"); const string QNAME("qName"); const string QUEUE("queue"); -const string RHOST("rhost"); const string TYPE("type"); -const string USER("user"); const string HA_BROKER("habroker"); const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); @@ -202,6 +201,14 @@ BrokerReplicator::~BrokerReplicator() { } // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + // Use the credentials of the outgoing Link connection for creating queues, + // exchanges etc. We know link->getConnection() is non-zero because we are + // being called in the connections thread context. + // + assert(link->getConnection()); + userId = link->getConnection()->getUserId(); + remoteHost = link->getConnection()->getUrl(); + qpid::Address primary; link->getRemoteAddress(primary); string queueName = bridge.getQueueName(); @@ -320,10 +327,11 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[DURABLE].asBool(), autoDel, 0, // no owner regardless of exclusivity on primary + // FIXME aconway 2012-07-06: handle alternate exchange values[ALTEX].asString(), args, - values[USER].asString(), - values[RHOST].asString()); + userId, + remoteHost); assert(result.second); // Should be true since we destroyed existing queue above startQueueReplicator(result.first); } @@ -345,7 +353,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { if (queue && replicationTest.replicateLevel(queue->getSettings())) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); stopQueueReplicator(name); - broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); + broker.deleteQueue(name, userId, remoteHost); } } @@ -368,10 +376,11 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { name, values[EXTYPE].asString(), values[DURABLE].asBool(), + // FIXME aconway 2012-07-06: handle alternate exchanges values[ALTEX].asString(), args, - values[USER].asString(), - values[RHOST].asString()); + userId, + remoteHost); assert(result.second); } } @@ -385,10 +394,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); - broker.deleteExchange( - name, - values[USER].asString(), - values[RHOST].asString()); + broker.deleteExchange(name, userId, remoteHost); } } @@ -458,8 +464,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { 0 /*i.e. no owner regardless of exclusivity on master*/, ""/*TODO: need to include alternate-exchange*/, args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/); + userId, + remoteHost); + // It is normal for the queue to already exist if we are failing over. if (result.second) startQueueReplicator(result.first); @@ -478,10 +485,11 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { name, values[TYPE].asString(), values[DURABLE].asBool(), - ""/*TODO: need to include alternate-exchange*/, + "", // FIXME aconway 2012-07-09: need to include alternate-exchange args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second; + userId, + remoteHost + ).second; QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 35ffdd0cd8..e2ca8f9e14 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -54,7 +54,7 @@ class QueueReplicator; * exchanges and bindings to replicate the primary. * It also creates QueueReplicators for newly replicated queues. * - * THREAD SAFE: Has no mutable state. + * THREAD UNSAFE: Only called in Link connection thread, no need for locking. * */ class BrokerReplicator : public broker::Exchange, @@ -96,6 +96,7 @@ class BrokerReplicator : public broker::Exchange, void stopQueueReplicator(const std::string& name); std::string logPrefix; + std::string userId, remoteHost; ReplicationTest replicationTest; HaBroker& haBroker; broker::Broker& broker; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f7f48fc027..3612837214 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout from qpid.datatypes import uuid4 @@ -32,9 +32,9 @@ log = getLogger(__name__) class QmfAgent(object): """Access to a QMF broker agent.""" - def __init__(self, address): + def __init__(self, address, **kwargs): self._connection = Connection.establish( - address, client_properties={"qpid.ha-admin":1}) + address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address) @@ -42,10 +42,23 @@ class QmfAgent(object): a = getattr(self._agent, name) return a +class Credentials(object): + """SASL credentials: username, password, and mechanism""" + def __init__(self, username, password, mechanism): + (self.username, self.password, self.mechanism) = (username, password, mechanism) + + def __str__(self): return "Credentials%s"%(self.tuple(),) + + def tuple(self): return (self.username, self.password, self.mechanism) + + def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) + class HaBroker(Broker): - """Start a broker with HA enabled""" - def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, - ha_replicate="all", **kwargs): + """Start a broker with HA enabled + @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. + """ + def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", + client_credentials=None, **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, @@ -64,19 +77,30 @@ class HaBroker(Broker): getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. self.qpid_ha_script=import_script(self.qpid_ha_path) self._agent = None + self.client_credentials = client_credentials def __str__(self): return Broker.__str__(self) - # FIXME aconway 2012-06-26: check exit status from script. def qpid_ha(self, args): - self.qpid_ha_script.main_except(["", "-b", self.host_port()]+args) + cred = self.client_credentials + url = self.host_port() + if cred: + url =cred.add_user(url) + args = args + ["--sasl-mechanism", cred.mechanism] + self.qpid_ha_script.main_except(["", "-b", url]+args) def promote(self): self.qpid_ha(["promote"]) def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + def agent(self): - if not self._agent: self._agent = QmfAgent(self.host_port()) + if not self._agent: + cred = self.client_credentials + if cred: + self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) + else: + self._agent = QmfAgent(self.host_port()) return self._agent def ha_status(self): @@ -103,7 +127,14 @@ class HaBroker(Broker): self.qpid_config(["add", "queue", queue, "--replicate", replication]) def connect_admin(self, **kwargs): - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + cred = self.client_credentials + if cred: + return Broker.connect( + self, client_properties={"qpid.ha-admin":1}, + username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, + **kwargs) + else: + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) def wait_backup(self, address): """Wait for address to become valid on a backup broker.""" @@ -140,7 +171,7 @@ class HaBroker(Broker): class HaCluster(object): _cluster_count = 0 - def __init__(self, test, n, **kwargs): + def __init__(self, test, n, promote=True, **kwargs): """Start a cluster of n brokers""" self.test = test self.kwargs = kwargs @@ -685,6 +716,48 @@ class ReplicationTests(BrokerTest): qmf = broker.agent().getHaBroker() assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + def test_auth(self): + """Verify that authentication does not interfere with replication.""" + # FIXME aconway 2012-07-09: generate test sasl config portably for cmake + sasl_config=os.path.join(self.rootdir, "sasl_config") + if not os.path.exists(sasl_config): + print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config + return + acl=os.path.join(os.getcwd(), "policy.acl") + aclf=file(acl,"w") + # Verify that replication works with auth=yes and HA user has at least the following + # privileges: + aclf.write(""" +acl allow zag@QPID access queue +acl allow zag@QPID create queue +acl allow zag@QPID consume queue +acl allow zag@QPID delete queue +acl allow zag@QPID access exchange +acl allow zag@QPID create exchange +acl allow zag@QPID bind exchange +acl allow zag@QPID publish exchange +acl allow zag@QPID delete exchange +acl allow zag@QPID access method +acl allow zag@QPID create link +acl deny all all + """) + aclf.close() + cluster = HaCluster( + self, 2, + args=["--auth", "yes", "--sasl-config", sasl_config, + "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"), + "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN" + ], + client_credentials=Credentials("zag", "zag", "PLAIN")) + s0 = cluster[0].connect(username="zag", password="zag").session(); + s0.receiver("q;{create:always}") + s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") + cluster[1].wait_backup("q") + cluster[1].wait_backup("ex") + s1 = cluster[1].connect_admin().session(); # Uses Credentials above. + s1.sender("ex").send("foo"); + self.assertEqual(s1.receiver("q").fetch().content, "foo") + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -831,6 +904,8 @@ class RecoveryTests(BrokerTest): s1.session.connection.close() s2.session.connection.close() + + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) qpid_ha = os.getenv("QPID_HA_EXEC") diff --git a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml index d00464c92c..9fcadbcbe9 100644 --- a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml +++ b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml @@ -221,7 +221,10 @@ under the License. <para><literal>--ha-mechanism <replaceable>MECH</replaceable></literal></para> </entry> <entry> - Authentication settings used by brokers to connect to each other. + Authentication settings used by HA brokers to connect to each other. + If you are using authorization + (<xref linkend="sect-Messaging_User_Guide-Security-Authorization"/>) + then this user must have all permissions. </entry> </row> </tbody> @@ -630,7 +633,53 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl </section> </section> - <section> + <section> + <title>Security.</title> + <para> + You can secure your cluster using the authenticiation and authorization features + described in <xref linkend="chap-Messaging_User_Guide-Security"/>. + </para> + <para> + Backup brokers connect to the primary broker and subscribe for management + events and queue contents. You can specifiy the identity used to connect + to the primary with the following options: + </para> + <table frame="all" id="ha-broker-security-options"> + <title>Security options for High Availability Messaging Cluster</title> + <tgroup align="left" cols="2" colsep="1" rowsep="1"> + <colspec colname="c1" colwidth="1*"/> + <colspec colname="c2" colwidth="3*"/> + <thead> + <row> + <entry align="center" nameend="c2" namest="c1"> + Security options for High Availability Messaging Cluster + </entry> + </row> + </thead> + <tbody> + <row> + <entry> + <para><literal>--ha-username <replaceable>USER</replaceable></literal></para> + <para><literal>--ha-password <replaceable>PASS</replaceable></literal></para> + <para><literal>--ha-mechanism <replaceable>MECH</replaceable></literal></para> + </entry> + <entry> + Authentication settings used by HA brokers to connect to each other. + If you are using authorization + (<xref linkend="sect-Messaging_User_Guide-Security-Authorization"/>) + then this user must have all permissions. + </entry> + </row> + </tbody> + </tgroup> + </table> + <para> + This identity is also used to authorize actions taken on the backup broker to replicate + from the primary, for example to create queues or exchanges. + </para> + </section> + + <section> <title>Integrating with other Cluster Resource Managers</title> <para> To integrate with a different resource manager you must configure it to: diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha index 38609eef43..6ddde93967 100755 --- a/qpid/tools/src/py/qpid-ha +++ b/qpid/tools/src/py/qpid-ha @@ -46,15 +46,18 @@ class Command: usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help) self.help = help self.op=optparse.OptionParser(usage) - self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>") + self.op.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + self.op.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") def execute(self, args): opts, args = self.op.parse_args(args) if len(args) != len(self.arg_names)+1: self.op.print_help() raise Exception("Wrong number of arguments") - broker = opts.broker or "localhost:5672" - connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) + connection = Connection.establish( + opts.broker, + sasl_mechanisms=opts.sasl_mechanism, + client_properties={"qpid.ha-admin":1}) qmf_broker = BrokerAgent(connection) ha_broker = qmf_broker.getHaBroker() if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker) |