summaryrefslogtreecommitdiff
path: root/cpp/src/tests/acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/acl.py')
-rwxr-xr-xcpp/src/tests/acl.py148
1 files changed, 147 insertions, 1 deletions
diff --git a/cpp/src/tests/acl.py b/cpp/src/tests/acl.py
index 720b3b4216..0e096a6f5b 100755
--- a/cpp/src/tests/acl.py
+++ b/cpp/src/tests/acl.py
@@ -285,10 +285,38 @@ class ACLTests(TestBase010):
if (result):
self.fail(result)
+ def test_nested_groups(self):
+ """
+ Test nested groups
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('group user-consume martin@QPID ted@QPID\n')
+ aclf.write('group group2 kim@QPID user-consume rob@QPID \n')
+ aclf.write('acl allow anonymous all all \n')
+ aclf.write('acl allow group2 create queue \n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('rob','rob')
+ try:
+ session.queue_declare(queue="rob_queue")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request");
+ self.fail("Error during queue create request");
+
+
+
def test_user_realm(self):
"""
Test a user defined without a realm
Ex. group admin rajith
+ Note: a user name without a realm is interpreted as a group name
"""
aclf = self.get_acl_file()
aclf.write('group admin bob\n') # shouldn't be allowed
@@ -297,7 +325,7 @@ class ACLTests(TestBase010):
aclf.close()
result = self.reload_acl()
- if (result.find("Username 'bob' must contain a realm",0,len(result)) == -1):
+ if (result.find("not defined yet.",0,len(result)) == -1):
self.fail(result)
def test_allowed_chars_for_username(self):
@@ -1509,6 +1537,124 @@ class ACLTests(TestBase010):
#=====================================
+ # QMF Topic Exchange tests
+ #=====================================
+
+ def test_qmf_topic_exchange_tests(self):
+ """
+ Test using QMF method hooks into ACL logic
+ """
+ aclf = self.get_acl_file()
+ aclf.write('# begin hack alert: allow anonymous to access the lookup debug functions\n')
+ aclf.write('acl allow-log anonymous create queue\n')
+ aclf.write('acl allow-log anonymous all exchange name=qmf.*\n')
+ aclf.write('acl allow-log anonymous all exchange name=amq.direct\n')
+ aclf.write('acl allow-log anonymous all exchange name=qpid.management\n')
+ aclf.write('acl allow-log anonymous access method name=*\n')
+ aclf.write('# end hack alert\n')
+ aclf.write('acl allow-log uPlain1@COMPANY publish exchange name=X routingkey=ab.cd.e\n')
+ aclf.write('acl allow-log uPlain2@COMPANY publish exchange name=X routingkey=.\n')
+ aclf.write('acl allow-log uStar1@COMPANY publish exchange name=X routingkey=a.*.b\n')
+ aclf.write('acl allow-log uStar2@COMPANY publish exchange name=X routingkey=*.x\n')
+ aclf.write('acl allow-log uStar3@COMPANY publish exchange name=X routingkey=x.x.*\n')
+ aclf.write('acl allow-log uHash1@COMPANY publish exchange name=X routingkey=a.#.b\n')
+ aclf.write('acl allow-log uHash2@COMPANY publish exchange name=X routingkey=a.#\n')
+ aclf.write('acl allow-log uHash3@COMPANY publish exchange name=X routingkey=#.a\n')
+ aclf.write('acl allow-log uHash4@COMPANY publish exchange name=X routingkey=a.#.b.#.c\n')
+ aclf.write('acl allow-log uMixed1@COMPANY publish exchange name=X routingkey=*.x.#.y\n')
+ aclf.write('acl allow-log uMixed2@COMPANY publish exchange name=X routingkey=a.#.b.*\n')
+ aclf.write('acl allow-log uMixed3@COMPANY publish exchange name=X routingkey=*.*.*.#\n')
+
+ aclf.write('acl allow-log all publish exchange name=X routingkey=MN.OP.Q\n')
+ aclf.write('acl allow-log all publish exchange name=X routingkey=M.*.N\n')
+ aclf.write('acl allow-log all publish exchange name=X routingkey=M.#.N\n')
+ aclf.write('acl allow-log all publish exchange name=X routingkey=*.M.#.N\n')
+
+ aclf.write('acl deny-log all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ # aclKey: "ab.cd.e"
+ self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd.e", "allow-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", "abx.cd.e", "deny-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd", "deny-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd..e.", "deny-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd.e.", "deny-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", ".ab.cd.e", "deny-log")
+ # aclKey: "."
+ self.LookupPublish("uPlain2@COMPANY", "X", ".", "allow-log")
+
+ # aclKey: "a.*.b"
+ self.LookupPublish("uStar1@COMPANY", "X", "a.xx.b", "allow-log")
+ self.LookupPublish("uStar1@COMPANY", "X", "a.b", "deny-log")
+ # aclKey: "*.x"
+ self.LookupPublish("uStar2@COMPANY", "X", "y.x", "allow-log")
+ self.LookupPublish("uStar2@COMPANY", "X", ".x", "allow-log")
+ self.LookupPublish("uStar2@COMPANY", "X", "x", "deny-log")
+ # aclKey: "x.x.*"
+ self.LookupPublish("uStar3@COMPANY", "X", "x.x.y", "allow-log")
+ self.LookupPublish("uStar3@COMPANY", "X", "x.x.", "allow-log")
+ self.LookupPublish("uStar3@COMPANY", "X", "x.x", "deny-log")
+ self.LookupPublish("uStar3@COMPANY", "X", "q.x.y", "deny-log")
+
+ # aclKey: "a.#.b"
+ self.LookupPublish("uHash1@COMPANY", "X", "a.b", "allow-log")
+ self.LookupPublish("uHash1@COMPANY", "X", "a.x.b", "allow-log")
+ self.LookupPublish("uHash1@COMPANY", "X", "a..x.y.zz.b", "allow-log")
+ self.LookupPublish("uHash1@COMPANY", "X", "a.b.", "deny-log")
+ self.LookupPublish("uHash1@COMPANY", "X", "q.x.b", "deny-log")
+
+ # aclKey: "a.#"
+ self.LookupPublish("uHash2@COMPANY", "X", "a", "allow-log")
+ self.LookupPublish("uHash2@COMPANY", "X", "a.b", "allow-log")
+ self.LookupPublish("uHash2@COMPANY", "X", "a.b.c", "allow-log")
+
+ # aclKey: "#.a"
+ self.LookupPublish("uHash3@COMPANY", "X", "a", "allow-log")
+ self.LookupPublish("uHash3@COMPANY", "X", "x.y.a", "allow-log")
+
+ # aclKey: "a.#.b.#.c"
+ self.LookupPublish("uHash4@COMPANY", "X", "a.b.c", "allow-log")
+ self.LookupPublish("uHash4@COMPANY", "X", "a.x.b.y.c", "allow-log")
+ self.LookupPublish("uHash4@COMPANY", "X", "a.x.x.b.y.y.c", "allow-log")
+
+ # aclKey: "*.x.#.y"
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.x.y", "allow-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.x.p.qq.y", "allow-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.a.x.y", "deny-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "aa.x.b.c", "deny-log")
+
+ # aclKey: "a.#.b.*"
+ self.LookupPublish("uMixed2@COMPANY", "X", "a.b.x", "allow-log")
+ self.LookupPublish("uMixed2@COMPANY", "X", "a.x.x.x.b.x", "allow-log")
+
+ # aclKey: "*.*.*.#"
+ self.LookupPublish("uMixed3@COMPANY", "X", "x.y.z", "allow-log")
+ self.LookupPublish("uMixed3@COMPANY", "X", "x.y.z.a.b.c", "allow-log")
+ self.LookupPublish("uMixed3@COMPANY", "X", "x.y", "deny-log")
+ self.LookupPublish("uMixed3@COMPANY", "X", "x", "deny-log")
+
+ # Repeat the keys with wildcard user spec
+ self.LookupPublish("uPlain1@COMPANY", "X", "MN.OP.Q", "allow-log")
+ self.LookupPublish("uStar1@COMPANY" , "X", "M.xx.N", "allow-log")
+ self.LookupPublish("uHash1@COMPANY" , "X", "M.N", "allow-log")
+ self.LookupPublish("uHash1@COMPANY" , "X", "M.x.N", "allow-log")
+ self.LookupPublish("uHash1@COMPANY" , "X", "M..x.y.zz.N", "allow-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.M.N", "allow-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.M.p.qq.N", "allow-log")
+
+ self.LookupPublish("dev@QPID", "X", "MN.OP.Q", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "M.xx.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "M.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "M.x.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "M..x.y.zz.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "a.M.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "a.M.p.qq.N", "allow-log")
+
+ #=====================================
# Connection limits
#=====================================