diff options
| author | Alan Conway <aconway@apache.org> | 2012-07-09 21:35:38 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-07-09 21:35:38 +0000 |
| commit | abc2296a258c83e69c5b64ac18f33c4cc2b172c7 (patch) | |
| tree | 835b541a034ae36b214c3ff564c87cf913268707 /cpp/src/tests | |
| parent | 1e06f5f3365899f093d7b6b1c62784f7895b4d0b (diff) | |
| download | qpid-python-abc2296a258c83e69c5b64ac18f33c4cc2b172c7.tar.gz | |
QPID-4118: HA does not work with authentication and authorization.
- Updated test framework to use credentials
- Updated BrokerReplicator to use HA identity to create configuration
- Updated documentation with a HA security section.
- Updated qpid-ha to take --sasl-mechanism
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1359412 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
| -rwxr-xr-x | cpp/src/tests/ha_tests.py | 97 |
1 files changed, 86 insertions, 11 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index f7f48fc027..3612837214 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout from qpid.datatypes import uuid4 @@ -32,9 +32,9 @@ log = getLogger(__name__) class QmfAgent(object): """Access to a QMF broker agent.""" - def __init__(self, address): + def __init__(self, address, **kwargs): self._connection = Connection.establish( - address, client_properties={"qpid.ha-admin":1}) + address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address) @@ -42,10 +42,23 @@ class QmfAgent(object): a = getattr(self._agent, name) return a +class Credentials(object): + """SASL credentials: username, password, and mechanism""" + def __init__(self, username, password, mechanism): + (self.username, self.password, self.mechanism) = (username, password, mechanism) + + def __str__(self): return "Credentials%s"%(self.tuple(),) + + def tuple(self): return (self.username, self.password, self.mechanism) + + def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) + class HaBroker(Broker): - """Start a broker with HA enabled""" - def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, - ha_replicate="all", **kwargs): + """Start a broker with HA enabled + @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. + """ + def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", + client_credentials=None, **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, @@ -64,19 +77,30 @@ class HaBroker(Broker): getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. self.qpid_ha_script=import_script(self.qpid_ha_path) self._agent = None + self.client_credentials = client_credentials def __str__(self): return Broker.__str__(self) - # FIXME aconway 2012-06-26: check exit status from script. def qpid_ha(self, args): - self.qpid_ha_script.main_except(["", "-b", self.host_port()]+args) + cred = self.client_credentials + url = self.host_port() + if cred: + url =cred.add_user(url) + args = args + ["--sasl-mechanism", cred.mechanism] + self.qpid_ha_script.main_except(["", "-b", url]+args) def promote(self): self.qpid_ha(["promote"]) def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + def agent(self): - if not self._agent: self._agent = QmfAgent(self.host_port()) + if not self._agent: + cred = self.client_credentials + if cred: + self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) + else: + self._agent = QmfAgent(self.host_port()) return self._agent def ha_status(self): @@ -103,7 +127,14 @@ class HaBroker(Broker): self.qpid_config(["add", "queue", queue, "--replicate", replication]) def connect_admin(self, **kwargs): - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + cred = self.client_credentials + if cred: + return Broker.connect( + self, client_properties={"qpid.ha-admin":1}, + username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, + **kwargs) + else: + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) def wait_backup(self, address): """Wait for address to become valid on a backup broker.""" @@ -140,7 +171,7 @@ class HaBroker(Broker): class HaCluster(object): _cluster_count = 0 - def __init__(self, test, n, **kwargs): + def __init__(self, test, n, promote=True, **kwargs): """Start a cluster of n brokers""" self.test = test self.kwargs = kwargs @@ -685,6 +716,48 @@ class ReplicationTests(BrokerTest): qmf = broker.agent().getHaBroker() assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + def test_auth(self): + """Verify that authentication does not interfere with replication.""" + # FIXME aconway 2012-07-09: generate test sasl config portably for cmake + sasl_config=os.path.join(self.rootdir, "sasl_config") + if not os.path.exists(sasl_config): + print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config + return + acl=os.path.join(os.getcwd(), "policy.acl") + aclf=file(acl,"w") + # Verify that replication works with auth=yes and HA user has at least the following + # privileges: + aclf.write(""" +acl allow zag@QPID access queue +acl allow zag@QPID create queue +acl allow zag@QPID consume queue +acl allow zag@QPID delete queue +acl allow zag@QPID access exchange +acl allow zag@QPID create exchange +acl allow zag@QPID bind exchange +acl allow zag@QPID publish exchange +acl allow zag@QPID delete exchange +acl allow zag@QPID access method +acl allow zag@QPID create link +acl deny all all + """) + aclf.close() + cluster = HaCluster( + self, 2, + args=["--auth", "yes", "--sasl-config", sasl_config, + "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"), + "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN" + ], + client_credentials=Credentials("zag", "zag", "PLAIN")) + s0 = cluster[0].connect(username="zag", password="zag").session(); + s0.receiver("q;{create:always}") + s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") + cluster[1].wait_backup("q") + cluster[1].wait_backup("ex") + s1 = cluster[1].connect_admin().session(); # Uses Credentials above. + s1.sender("ex").send("foo"); + self.assertEqual(s1.receiver("q").fetch().content, "foo") + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -831,6 +904,8 @@ class RecoveryTests(BrokerTest): s1.session.connection.close() s2.session.connection.close() + + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) qpid_ha = os.getenv("QPID_HA_EXEC") |
