summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-09 21:35:38 +0000
committerAlan Conway <aconway@apache.org>2012-07-09 21:35:38 +0000
commitabc2296a258c83e69c5b64ac18f33c4cc2b172c7 (patch)
tree835b541a034ae36b214c3ff564c87cf913268707 /cpp/src/tests
parent1e06f5f3365899f093d7b6b1c62784f7895b4d0b (diff)
downloadqpid-python-abc2296a258c83e69c5b64ac18f33c4cc2b172c7.tar.gz
QPID-4118: HA does not work with authentication and authorization.
- Updated test framework to use credentials - Updated BrokerReplicator to use HA identity to create configuration - Updated documentation with a HA security section. - Updated qpid-ha to take --sasl-mechanism git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1359412 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rwxr-xr-xcpp/src/tests/ha_tests.py97
1 files changed, 86 insertions, 11 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index f7f48fc027..3612837214 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -18,7 +18,7 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
from qpid.datatypes import uuid4
@@ -32,9 +32,9 @@ log = getLogger(__name__)
class QmfAgent(object):
"""Access to a QMF broker agent."""
- def __init__(self, address):
+ def __init__(self, address, **kwargs):
self._connection = Connection.establish(
- address, client_properties={"qpid.ha-admin":1})
+ address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
@@ -42,10 +42,23 @@ class QmfAgent(object):
a = getattr(self._agent, name)
return a
+class Credentials(object):
+ """SASL credentials: username, password, and mechanism"""
+ def __init__(self, username, password, mechanism):
+ (self.username, self.password, self.mechanism) = (username, password, mechanism)
+
+ def __str__(self): return "Credentials%s"%(self.tuple(),)
+
+ def tuple(self): return (self.username, self.password, self.mechanism)
+
+ def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+
class HaBroker(Broker):
- """Start a broker with HA enabled"""
- def __init__(self, test, args=[], brokers_url=None, ha_cluster=True,
- ha_replicate="all", **kwargs):
+ """Start a broker with HA enabled
+ @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
+ """
+ def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
+ client_credentials=None, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
@@ -64,19 +77,30 @@ class HaBroker(Broker):
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
self.qpid_ha_script=import_script(self.qpid_ha_path)
self._agent = None
+ self.client_credentials = client_credentials
def __str__(self): return Broker.__str__(self)
- # FIXME aconway 2012-06-26: check exit status from script.
def qpid_ha(self, args):
- self.qpid_ha_script.main_except(["", "-b", self.host_port()]+args)
+ cred = self.client_credentials
+ url = self.host_port()
+ if cred:
+ url =cred.add_user(url)
+ args = args + ["--sasl-mechanism", cred.mechanism]
+ self.qpid_ha_script.main_except(["", "-b", url]+args)
def promote(self): self.qpid_ha(["promote"])
def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+
def agent(self):
- if not self._agent: self._agent = QmfAgent(self.host_port())
+ if not self._agent:
+ cred = self.client_credentials
+ if cred:
+ self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
+ else:
+ self._agent = QmfAgent(self.host_port())
return self._agent
def ha_status(self):
@@ -103,7 +127,14 @@ class HaBroker(Broker):
self.qpid_config(["add", "queue", queue, "--replicate", replication])
def connect_admin(self, **kwargs):
- return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+ cred = self.client_credentials
+ if cred:
+ return Broker.connect(
+ self, client_properties={"qpid.ha-admin":1},
+ username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
+ **kwargs)
+ else:
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
def wait_backup(self, address):
"""Wait for address to become valid on a backup broker."""
@@ -140,7 +171,7 @@ class HaBroker(Broker):
class HaCluster(object):
_cluster_count = 0
- def __init__(self, test, n, **kwargs):
+ def __init__(self, test, n, promote=True, **kwargs):
"""Start a cluster of n brokers"""
self.test = test
self.kwargs = kwargs
@@ -685,6 +716,48 @@ class ReplicationTests(BrokerTest):
qmf = broker.agent().getHaBroker()
assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ def test_auth(self):
+ """Verify that authentication does not interfere with replication."""
+ # FIXME aconway 2012-07-09: generate test sasl config portably for cmake
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ if not os.path.exists(sasl_config):
+ print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
+ return
+ acl=os.path.join(os.getcwd(), "policy.acl")
+ aclf=file(acl,"w")
+ # Verify that replication works with auth=yes and HA user has at least the following
+ # privileges:
+ aclf.write("""
+acl allow zag@QPID access queue
+acl allow zag@QPID create queue
+acl allow zag@QPID consume queue
+acl allow zag@QPID delete queue
+acl allow zag@QPID access exchange
+acl allow zag@QPID create exchange
+acl allow zag@QPID bind exchange
+acl allow zag@QPID publish exchange
+acl allow zag@QPID delete exchange
+acl allow zag@QPID access method
+acl allow zag@QPID create link
+acl deny all all
+ """)
+ aclf.close()
+ cluster = HaCluster(
+ self, 2,
+ args=["--auth", "yes", "--sasl-config", sasl_config,
+ "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"),
+ "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
+ ],
+ client_credentials=Credentials("zag", "zag", "PLAIN"))
+ s0 = cluster[0].connect(username="zag", password="zag").session();
+ s0.receiver("q;{create:always}")
+ s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ cluster[1].wait_backup("q")
+ cluster[1].wait_backup("ex")
+ s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
+ s1.sender("ex").send("foo");
+ self.assertEqual(s1.receiver("q").fetch().content, "foo")
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -831,6 +904,8 @@ class RecoveryTests(BrokerTest):
s1.session.connection.close()
s2.session.connection.close()
+
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")