summaryrefslogtreecommitdiff
path: root/cpp/src/tests/acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/acl.py')
-rwxr-xr-xcpp/src/tests/acl.py292
1 files changed, 259 insertions, 33 deletions
diff --git a/cpp/src/tests/acl.py b/cpp/src/tests/acl.py
index f6cd1b2669..fc53d2ce8b 100755
--- a/cpp/src/tests/acl.py
+++ b/cpp/src/tests/acl.py
@@ -61,7 +61,7 @@ class ACLTests(TestBase010):
# ACL general tests
#=====================================
- def test_deny_all(self):
+ def test_deny_mode(self):
"""
Test the deny all mode
"""
@@ -71,7 +71,9 @@ class ACLTests(TestBase010):
aclf.write('acl deny all all')
aclf.close()
- self.reload_acl()
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
session = self.get_session('bob','bob')
try:
@@ -87,7 +89,7 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- def test_allow_all(self):
+ def test_allow_mode(self):
"""
Test the allow all mode
"""
@@ -96,7 +98,9 @@ class ACLTests(TestBase010):
aclf.write('acl allow all all')
aclf.close()
- self.reload_acl()
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
session = self.get_session('bob','bob')
try:
@@ -124,7 +128,9 @@ class ACLTests(TestBase010):
aclf.write('acl allow all all')
aclf.close()
- self.reload_acl()
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
session = self.get_session('bob','bob')
try:
@@ -208,7 +214,7 @@ class ACLTests(TestBase010):
# ACL queue tests
#=====================================
- def test_queue_acl_deny(self):
+ def test_queue_allow_mode(self):
"""
Test cases for queue acl in allow mode
"""
@@ -221,33 +227,35 @@ class ACLTests(TestBase010):
aclf.write('acl allow all all')
aclf.close()
- self.reload_acl()
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
session = self.get_session('bob','bob')
try:
- session.queue_declare(queue="q1", durable='true', passive='true')
+ session.queue_declare(queue="q1", durable=True, passive=True)
self.fail("ACL should deny queue create request with name=q1 durable=true passive=true");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
- session.queue_declare(queue="q2", exclusive='true')
+ session.queue_declare(queue="q2", exclusive=True)
self.fail("ACL should deny queue create request with name=q2 exclusive=true");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
- session.queue_declare(queue="q2", durable='true')
+ session.queue_declare(queue="q2", durable=True)
except qpid.session.SessionException, e:
if (530 == e.args[0].error_code):
- self.fail("ACL should allow queue create request for q2 with any parameter other than exclusive");
+ self.fail("ACL should allow queue create request for q2 with any parameter other than exclusive=true");
try:
- session.queue_declare(queue="q3", exclusive='true')
- session.queue_declare(queue="q4", durable='true')
+ session.queue_declare(queue="q3", exclusive=True)
+ session.queue_declare(queue="q4", durable=True)
except qpid.session.SessionException, e:
if (530 == e.args[0].error_code):
self.fail("ACL should allow queue create request for q3 and q4 with any parameter");
@@ -285,11 +293,106 @@ class ACLTests(TestBase010):
if (530 == e.args[0].error_code):
self.fail("ACL should allow queue delete request for q3");
+
+ def test_queue_deny_mode(self):
+ """
+ Test cases for queue acl in deny mode
+ """
+ aclf = ACLFile()
+ aclf.write('acl allow bob@QPID create queue name=q1 durable=true passive=true\n')
+ aclf.write('acl allow bob@QPID create queue name=q2 exclusive=true\n')
+ aclf.write('acl allow bob@QPID access queue name=q3\n')
+ aclf.write('acl allow bob@QPID purge queue name=q3\n')
+ aclf.write('acl allow bob@QPID create queue name=q3\n')
+ aclf.write('acl allow bob@QPID create queue name=q4\n')
+ aclf.write('acl allow bob@QPID delete queue name=q4\n')
+ aclf.write('acl allow guest@QPID all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_declare(queue="q1", durable=True, passive=True)
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=q1 durable=true passive=true");
+
+ try:
+ session.queue_declare(queue="q1", durable=False, passive=False)
+ self.fail("ACL should deny queue create request with name=q1 durable=true passive=false");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_declare(queue="q2", exclusive=False)
+ self.fail("ACL should deny queue create request with name=q2 exclusive=false");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_declare(queue="q2", exclusive=True)
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request for q2 with exclusive=true");
+
+ try:
+ session.queue_declare(queue="q3")
+ session.queue_declare(queue="q4")
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request for q3 and q4");
+
+ try:
+ session.queue_query(queue="q4")
+ self.fail("ACL should deny queue query request for q4");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_purge(queue="q4")
+ self.fail("ACL should deny queue purge request for q4");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_purge(queue="q3")
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow queue purge request for q3");
+
+ try:
+ session.queue_query(queue="q3")
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow queue query request for q3");
+
+ try:
+ session.queue_delete(queue="q3")
+ self.fail("ACL should deny queue delete request for q3");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_delete(queue="q4")
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow queue delete request for q4");
+
#=====================================
# ACL exchange tests
#=====================================
- def test_exchange_acl_deny(self):
+ def test_exchange_acl_allow_mode(self):
session = self.get_session('bob','bob')
session.queue_declare(queue="baz")
@@ -299,14 +402,16 @@ class ACLTests(TestBase010):
aclf = ACLFile()
aclf.write('acl deny bob@QPID create exchange name=testEx durable=true passive=true\n')
aclf.write('acl deny bob@QPID create exchange name=ex1 type=direct\n')
- aclf.write('acl deny bob@QPID access exchange name=myEx\n')
+ aclf.write('acl deny bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
aclf.write('acl deny bob@QPID bind exchange name=myEx queuename=q1 routingkey=rk1\n')
aclf.write('acl deny bob@QPID unbind exchange name=myEx queuename=q1 routingkey=rk1\n')
aclf.write('acl deny bob@QPID delete exchange name=myEx\n')
aclf.write('acl allow all all')
aclf.close()
- self.reload_acl()
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
session = self.get_session('bob','bob')
session.queue_declare(queue='q1')
@@ -346,8 +451,27 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk1.*')
+ self.fail("ACL should deny exchange bound request for myEx with queuename=q1 and routing_key='rk1.*' ");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_query(name='amq.topic')
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow exchange query request for exchange='amq.topic'");
try:
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk2.*')
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow exchange bound request for myEx with queuename=q1 and binding_key='rk2.*'");
+
+ try:
session.exchange_bind(exchange='myEx', queue='q1', binding_key='rk1')
self.fail("ACL should deny exchange bind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
except qpid.session.SessionException, e:
@@ -356,8 +480,7 @@ class ACLTests(TestBase010):
try:
session.exchange_bind(exchange='myEx', queue='q1', binding_key='x')
- except qpid.session.SessionException, e:
- print e
+ except qpid.session.SessionException, e:
if (530 == e.args[0].error_code):
self.fail("ACL should allow exchange bind request for exchange='myEx', queue='q1', binding_key='x'");
@@ -400,7 +523,7 @@ class ACLTests(TestBase010):
self.fail("ACL should allow exchange delete request for myXml");
- def test_exchange_acl_allow(self):
+ def test_exchange_acl_deny_mode(self):
session = self.get_session('bob','bob')
session.queue_declare(queue='bar')
@@ -408,17 +531,35 @@ class ACLTests(TestBase010):
Test cases for exchange acl in deny mode
"""
aclf = ACLFile()
+ aclf.write('acl allow bob@QPID create exchange name=myEx durable=true passive=false\n')
aclf.write('acl allow bob@QPID bind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
aclf.write('acl allow bob@QPID unbind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
+ aclf.write('acl allow bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
+ aclf.write('acl allow bob@QPID delete exchange name=myEx\n')
aclf.write('acl allow guest@QPID all all\n')
aclf.write('acl deny all all')
aclf.close()
- self.reload_acl()
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_declare(exchange='myEx', type='direct', durable=True, passive=False)
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow exchange create request for myEx with durable=true and passive=false");
try:
+ session.exchange_declare(exchange='myEx', type='direct', durable=False)
+ self.fail("ACL should deny exchange create request with name=myEx durable=false");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
session.exchange_bind(exchange='amq.topic', queue='bar', binding_key='foo.bar')
except qpid.session.SessionException, e:
if (530 == e.args[0].error_code):
@@ -438,7 +579,6 @@ class ACLTests(TestBase010):
self.assertEqual(530,e.args[0].error_code)
session = self.get_session('bob','bob')
-
try:
session.exchange_unbind(exchange='amq.topic', queue='bar', binding_key='foo.bar')
except qpid.session.SessionException, e:
@@ -458,43 +598,84 @@ class ACLTests(TestBase010):
self.assertEqual(530,e.args[0].error_code)
session = self.get_session('bob','bob')
+ try:
+ session.exchange_query(name='amq.topic')
+ self.fail("ACL should deny exchange query request for amq.topic");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk2.*')
+ self.fail("ACL should deny exchange bound request for amq.topic with queuename=q1 and routing_key='rk2.*' ");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_query(name='myEx')
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow exchange query request for exchange='myEx'");
+
+ try:
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk1.*')
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow exchange bound request for myEx with queuename=q1 and binding_key='rk1.*'");
+
+ try:
+ session.exchange_delete(exchange='myXml')
+ self.fail("ACL should deny exchange delete request for myXml");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_delete(exchange='myEx')
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow exchange delete request for myEx");
+
#=====================================
# ACL consume tests
#=====================================
- def test_consume_acl(self):
+ def test_consume_allow_mode(self):
"""
- Test various consume acl
+ Test cases for consume in allow mode
"""
aclf = ACLFile()
- aclf.write('acl deny bob@QPID consume queue name=q1 durable=true\n')
- aclf.write('acl deny bob@QPID consume queue name=q2 exclusive=true\n')
+ aclf.write('acl deny bob@QPID consume queue name=q1\n')
+ aclf.write('acl deny bob@QPID consume queue name=q2\n')
aclf.write('acl allow all all')
aclf.close()
- self.reload_acl()
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
session = self.get_session('bob','bob')
try:
- session.queue_declare(queue='q1', durable='true')
- session.queue_declare(queue='q2', exclusive='true')
- session.queue_declare(queue='q3', durable='true')
+ session.queue_declare(queue='q1')
+ session.queue_declare(queue='q2')
+ session.queue_declare(queue='q3')
except qpid.session.SessionException, e:
if (530 == e.args[0].error_code):
self.fail("ACL should allow create queue request");
try:
session.message_subscribe(queue='q1', destination='myq1')
- self.fail("ACL should deny message subscriber request for queue='q1'");
+ self.fail("ACL should deny subscription for queue='q1'");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
session.message_subscribe(queue='q2', destination='myq1')
- self.fail("ACL should deny message subscriber request for queue='q2'");
+ self.fail("ACL should deny subscription for queue='q2'");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
session = self.get_session('bob','bob')
@@ -503,9 +684,51 @@ class ACLTests(TestBase010):
session.message_subscribe(queue='q3', destination='myq1')
except qpid.session.SessionException, e:
if (530 == e.args[0].error_code):
- self.fail("ACL should allow create message subscribe");
+ self.fail("ACL should allow subscription for q3");
+ def test_consume_deny_mode(self):
+ """
+ Test cases for consume in allow mode
+ """
+ aclf = ACLFile()
+ aclf.write('acl allow bob@QPID consume queue name=q1\n')
+ aclf.write('acl allow bob@QPID consume queue name=q2\n')
+ aclf.write('acl allow bob@QPID create queue\n')
+ aclf.write('acl allow guest@QPID all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+
+ try:
+ session.queue_declare(queue='q1')
+ session.queue_declare(queue='q2')
+ session.queue_declare(queue='q3')
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow create queue request");
+
+ try:
+ session.message_subscribe(queue='q1', destination='myq1')
+ session.message_subscribe(queue='q2', destination='myq2')
+ except qpid.session.SessionException, e:
+ if (530 == e.args[0].error_code):
+ self.fail("ACL should allow subscription for q1 and q2");
+
+ try:
+ session.message_subscribe(queue='q3', destination='myq3')
+ self.fail("ACL should deny subscription for queue='q3'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(530,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+
#=====================================
# ACL publish tests
#=====================================
@@ -521,7 +744,9 @@ class ACLTests(TestBase010):
aclf.write('acl allow all all')
aclf.close()
- self.reload_acl()
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
session = self.get_session('bob','bob')
@@ -542,6 +767,7 @@ class ACLTests(TestBase010):
session = self.get_session('bob','bob')
try:
+ session.exchange_declare(exchange='myEx', type='direct', durable=False)
session.message_transfer(destination="myEx", message=Message(props,"Test"))
except qpid.session.SessionException, e:
if (530 == e.args[0].error_code):