summaryrefslogtreecommitdiff
path: root/cpp/src/tests/acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/acl.py')
-rwxr-xr-xcpp/src/tests/acl.py508
1 files changed, 254 insertions, 254 deletions
diff --git a/cpp/src/tests/acl.py b/cpp/src/tests/acl.py
index 5cc3b9c917..4f9a558ba7 100755
--- a/cpp/src/tests/acl.py
+++ b/cpp/src/tests/acl.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,13 +31,13 @@ import qpid.messaging
class ACLFile:
def __init__(self, policy='data_dir/policy.acl'):
self.f = open(policy,'w')
-
+
def write(self,line):
self.f.write(line)
-
+
def close(self):
self.f.close()
-
+
class ACLTests(TestBase010):
def get_session(self, user, passwd):
@@ -75,8 +75,8 @@ class ACLTests(TestBase010):
#=====================================
# ACL general tests
- #=====================================
-
+ #=====================================
+
def test_deny_mode(self):
"""
Test the deny all mode
@@ -85,12 +85,12 @@ class ACLTests(TestBase010):
aclf.write('acl allow anonymous all all\n')
aclf.write('acl allow bob@QPID create queue\n')
aclf.write('acl deny all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
try:
session.queue_declare(queue="deny_queue")
@@ -98,13 +98,13 @@ class ACLTests(TestBase010):
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue create request");
self.fail("Error during queue create request");
-
+
try:
session.exchange_bind(exchange="amq.direct", queue="deny_queue", binding_key="routing_key")
self.fail("ACL should deny queue bind request");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
-
+ self.assertEqual(403,e.args[0].error_code)
+
def test_allow_mode(self):
"""
Test the allow all mode
@@ -112,12 +112,12 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID bind exchange\n')
aclf.write('acl allow all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
try:
session.queue_declare(queue="allow_queue")
@@ -125,12 +125,12 @@ class ACLTests(TestBase010):
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue create request");
self.fail("Error during queue create request");
-
+
try:
session.exchange_bind(exchange="amq.direct", queue="allow_queue", binding_key="routing_key")
self.fail("ACL should deny queue bind request");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
def test_allow_mode_with_specfic_allow_override(self):
@@ -140,13 +140,13 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('group admins bob@QPID joe@QPID \n')
aclf.write('acl allow bob@QPID create queue \n')
- aclf.write('acl deny admins create queue \n')
+ aclf.write('acl deny admins create queue \n')
aclf.write('acl allow all all')
- aclf.close()
+ aclf.close()
result = self.reload_acl()
if (result.text.find("format error",0,len(result.text)) != -1):
- self.fail(result)
+ self.fail(result)
session = self.get_session('bob','bob')
@@ -155,12 +155,12 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow create queue request");
-
-
+
+
#=====================================
# ACL file format tests
- #=====================================
-
+ #=====================================
+
def test_empty_groups(self):
"""
Test empty groups
@@ -169,11 +169,11 @@ class ACLTests(TestBase010):
aclf.write('acl group\n')
aclf.write('acl group admins bob@QPID joe@QPID\n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
if (result.find("Insufficient tokens for acl definition",0,len(result)) == -1):
- self.fail("ACL Reader should reject the acl file due to empty group name")
+ self.fail("ACL Reader should reject the acl file due to empty group name")
def test_illegal_acl_formats(self):
"""
@@ -183,24 +183,24 @@ class ACLTests(TestBase010):
aclf.write('acl group admins bob@QPID joe@QPID\n')
aclf.write('acl allow all all')
aclf.close()
-
- result = self.reload_acl()
+
+ result = self.reload_acl()
if (result.find("Unknown ACL permission",0,len(result)) == -1):
- self.fail(result)
-
+ self.fail(result)
+
def test_illegal_extension_lines(self):
"""
Test illegal extension lines
"""
-
+
aclf = self.get_acl_file()
aclf.write('group admins bob@QPID \n')
aclf.write(' \ \n')
aclf.write('joe@QPID \n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
if (result.find("contains an illegal extension",0,len(result)) == -1):
self.fail(result)
@@ -218,7 +218,7 @@ class ACLTests(TestBase010):
aclf.write('host/123.example.com@TEST.COM\n') # should be allowed
aclf.write('acl allow all all')
aclf.close()
-
+
result = self.reload_acl()
if (result):
self.fail(result)
@@ -233,7 +233,7 @@ class ACLTests(TestBase010):
aclf.write('acl deny admin bind exchange\n')
aclf.write('acl allow all all')
aclf.close()
-
+
result = self.reload_acl()
if (result.find("Username 'bob' must contain a realm",0,len(result)) == -1):
self.fail(result)
@@ -249,7 +249,7 @@ class ACLTests(TestBase010):
aclf.write('group test4 host/somemachine.example.com@EXAMPLE.COM\n') # should be allowed
aclf.write('acl allow all all')
aclf.close()
-
+
result = self.reload_acl()
if (result):
self.fail(result)
@@ -257,7 +257,7 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('group test1 joe$H@EXAMPLE.com\n') # shouldn't be allowed
aclf.write('acl allow all all')
- aclf.close()
+ aclf.close()
result = self.reload_acl()
if (result.find("Username \"joe$H@EXAMPLE.com\" contains illegal characters",0,len(result)) == -1):
@@ -271,17 +271,17 @@ class ACLTests(TestBase010):
"""
Test illegal queue policy
"""
-
+
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ding\n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
expected = "ding is not a valid value for 'policytype', possible values are one of" \
- " { 'ring' 'ring_strict' 'flow_to_disk' 'reject' }";
+ " { 'ring' 'ring_strict' 'flow_to_disk' 'reject' }";
if (result.find(expected) == -1):
- self.fail(result)
+ self.fail(result)
def test_illegal_queuemaxsize_upper_limit_spec(self):
"""
@@ -293,24 +293,24 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 maxqueuesize=-1\n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
expected = "-1 is not a valid value for 'queuemaxsizeupperlimit', " \
- "values should be between 0 and 9223372036854775807";
+ "values should be between 0 and 9223372036854775807";
if (result.find(expected) == -1):
- self.fail(result)
+ self.fail(result)
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 maxqueuesize=9223372036854775808\n')
- aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
expected = "9223372036854775808 is not a valid value for 'queuemaxsizeupperlimit', " \
"values should be between 0 and 9223372036854775807";
if (result.find(expected) == -1):
- self.fail(result)
+ self.fail(result)
#
# Use queuemaxsizeupperlimit
@@ -318,24 +318,24 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 queuemaxsizeupperlimit=-1\n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
expected = "-1 is not a valid value for 'queuemaxsizeupperlimit', " \
- "values should be between 0 and 9223372036854775807";
- if (result.text != expected):
- self.fail(result)
+ "values should be between 0 and 9223372036854775807";
+ if (result.text != expected):
+ self.fail(result)
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 queuemaxsizeupperlimit=9223372036854775808\n')
- aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
expected = "9223372036854775808 is not a valid value for 'queuemaxsizeupperlimit', " \
"values should be between 0 and 9223372036854775807";
- if (result.text != expected):
- self.fail(result)
+ if (result.text != expected):
+ self.fail(result)
@@ -350,24 +350,24 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 maxqueuecount=-1\n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
expected = "-1 is not a valid value for 'queuemaxcountupperlimit', " \
- "values should be between 0 and 9223372036854775807";
+ "values should be between 0 and 9223372036854775807";
if (result.find(expected) == -1):
- self.fail(result)
+ self.fail(result)
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 maxqueuecount=9223372036854775808\n')
- aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
expected = "9223372036854775808 is not a valid value for 'queuemaxcountupperlimit', " \
"values should be between 0 and 9223372036854775807";
if (result.find(expected) == -1):
- self.fail(result)
+ self.fail(result)
#
# use maxqueuecountupperlimit
@@ -375,24 +375,24 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 queuemaxcountupperlimit=-1\n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
expected = "-1 is not a valid value for 'queuemaxcountupperlimit', " \
- "values should be between 0 and 9223372036854775807";
- if (result.text != expected):
- self.fail(result)
+ "values should be between 0 and 9223372036854775807";
+ if (result.text != expected):
+ self.fail(result)
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 queuemaxcountupperlimit=9223372036854775808\n')
- aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
expected = "9223372036854775808 is not a valid value for 'queuemaxcountupperlimit', " \
"values should be between 0 and 9223372036854775807";
- if (result.text != expected):
- self.fail(result)
+ if (result.text != expected):
+ self.fail(result)
def test_illegal_queuemaxsize_lower_limit_spec(self):
@@ -402,24 +402,24 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 queuemaxsizelowerlimit=-1\n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
expected = "-1 is not a valid value for 'queuemaxsizelowerlimit', " \
- "values should be between 0 and 9223372036854775807";
- if (result.text != expected):
- self.fail(result)
+ "values should be between 0 and 9223372036854775807";
+ if (result.text != expected):
+ self.fail(result)
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 queuemaxsizelowerlimit=9223372036854775808\n')
- aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
expected = "9223372036854775808 is not a valid value for 'queuemaxsizelowerlimit', " \
"values should be between 0 and 9223372036854775807";
- if (result.text != expected):
- self.fail(result)
+ if (result.text != expected):
+ self.fail(result)
@@ -431,30 +431,30 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 queuemaxcountlowerlimit=-1\n')
aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.close()
+
+ result = self.reload_acl()
expected = "-1 is not a valid value for 'queuemaxcountlowerlimit', " \
- "values should be between 0 and 9223372036854775807";
- if (result.text != expected):
- self.fail(result)
+ "values should be between 0 and 9223372036854775807";
+ if (result.text != expected):
+ self.fail(result)
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 queuemaxcountlowerlimit=9223372036854775808\n')
- aclf.write('acl allow all all')
- aclf.close()
-
- result = self.reload_acl()
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
expected = "9223372036854775808 is not a valid value for 'queuemaxcountlowerlimit', " \
"values should be between 0 and 9223372036854775807";
- if (result.text != expected):
- self.fail(result)
+ if (result.text != expected):
+ self.fail(result)
#=====================================
# ACL queue tests
#=====================================
-
+
def test_queue_allow_mode(self):
"""
Test cases for queue acl in allow mode
@@ -464,37 +464,37 @@ class ACLTests(TestBase010):
aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
aclf.write('acl deny bob@QPID access queue name=q3\n')
aclf.write('acl deny bob@QPID purge queue name=q3\n')
- aclf.write('acl deny bob@QPID delete queue name=q4\n')
- aclf.write('acl deny bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
+ aclf.write('acl deny bob@QPID delete queue name=q4\n')
+ aclf.write('acl deny bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
aclf.write('acl allow all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
-
+
try:
session.queue_declare(queue="q1", durable=True, passive=True)
self.fail("ACL should deny queue create request with name=q1 durable=true passive=true");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
queue_options = {}
- queue_options["qpid.policy_type"] = "ring"
+ queue_options["qpid.policy_type"] = "ring"
session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
self.fail("ACL should deny queue create request with name=q2 exclusive=true qpid.policy_type=ring");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
queue_options = {}
- queue_options["qpid.policy_type"] = "ring_strict"
- session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
+ queue_options["qpid.policy_type"] = "ring_strict"
+ session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue create request with name=q2 exclusive=true qpid.policy_type=ring_strict");
@@ -502,17 +502,17 @@ class ACLTests(TestBase010):
try:
queue_options = {}
queue_options["qpid.max_count"] = 200
- queue_options["qpid.max_size"] = 500
+ queue_options["qpid.max_size"] = 500
session.queue_declare(queue="q5", exclusive=True, arguments=queue_options)
self.fail("ACL should deny queue create request with name=q2, qpid.max_size=500 and qpid.max_count=200");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
queue_options = {}
queue_options["qpid.max_count"] = 200
- queue_options["qpid.max_size"] = 100
+ queue_options["qpid.max_size"] = 100
session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
@@ -530,33 +530,33 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.queue_purge(queue="q3")
self.fail("ACL should deny queue purge request for q3");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.queue_purge(queue="q4")
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue purge request for q4");
-
+
try:
session.queue_delete(queue="q4")
self.fail("ACL should deny queue delete request for q4");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.queue_delete(queue="q3")
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue delete request for q3");
-
+
def test_queue_deny_mode(self):
"""
@@ -568,26 +568,26 @@ class ACLTests(TestBase010):
aclf.write('acl allow bob@QPID access queue name=q3\n')
aclf.write('acl allow bob@QPID purge queue name=q3\n')
aclf.write('acl allow bob@QPID create queue name=q3\n')
- aclf.write('acl allow bob@QPID create queue name=q4\n')
+ aclf.write('acl allow bob@QPID create queue name=q4\n')
aclf.write('acl allow bob@QPID delete queue name=q4\n')
aclf.write('acl allow bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
aclf.write('acl allow bob@QPID create queue name=q6 queuemaxsizelowerlimit=50 queuemaxsizeupperlimit=100 queuemaxcountlowerlimit=50 queuemaxcountupperlimit=100\n')
aclf.write('acl allow anonymous all all\n')
aclf.write('acl deny all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
-
+
try:
session.queue_declare(queue="q1", durable=True, passive=True)
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue create request with name=q1 durable=true passive=true");
-
+
try:
session.queue_declare(queue="q1", durable=False, passive=False)
self.fail("ACL should deny queue create request with name=q1 durable=true passive=false");
@@ -599,24 +599,24 @@ class ACLTests(TestBase010):
session.queue_declare(queue="q2", exclusive=False)
self.fail("ACL should deny queue create request with name=q2 exclusive=false");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
queue_options = {}
queue_options["qpid.max_count"] = 200
- queue_options["qpid.max_size"] = 500
+ queue_options["qpid.max_size"] = 500
session.queue_declare(queue="q5", arguments=queue_options)
self.fail("ACL should deny queue create request with name=q5 maxqueuesize=500 maxqueuecount=200");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
queue_options = {}
queue_options["qpid.max_count"] = 100
- queue_options["qpid.max_size"] = 500
- session.queue_declare(queue="q5", arguments=queue_options)
+ queue_options["qpid.max_size"] = 500
+ session.queue_declare(queue="q5", arguments=queue_options)
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue create request with name=q5 maxqueuesize=500 maxqueuecount=200");
@@ -624,48 +624,48 @@ class ACLTests(TestBase010):
try:
queue_options = {}
queue_options["qpid.max_count"] = 49
- queue_options["qpid.max_size"] = 100
+ queue_options["qpid.max_size"] = 100
session.queue_declare(queue="q6", arguments=queue_options)
self.fail("ACL should deny queue create request with name=q6 maxqueuesize=100 maxqueuecount=49");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
queue_options = {}
queue_options["qpid.max_count"] = 101
queue_options["qpid.max_size"] = 100
- session.queue_declare(queue="q6", arguments=queue_options)
+ session.queue_declare(queue="q6", arguments=queue_options)
self.fail("ACL should allow queue create request with name=q6 maxqueuesize=100 maxqueuecount=101");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
queue_options = {}
queue_options["qpid.max_count"] = 100
- queue_options["qpid.max_size"] = 49
+ queue_options["qpid.max_size"] = 49
session.queue_declare(queue="q6", arguments=queue_options)
self.fail("ACL should deny queue create request with name=q6 maxqueuesize=49 maxqueuecount=100");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
queue_options = {}
queue_options["qpid.max_count"] = 100
- queue_options["qpid.max_size"] =101
+ queue_options["qpid.max_size"] =101
session.queue_declare(queue="q6", arguments=queue_options)
self.fail("ACL should deny queue create request with name=q6 maxqueuesize=101 maxqueuecount=100");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
queue_options = {}
queue_options["qpid.max_count"] = 50
- queue_options["qpid.max_size"] = 50
- session.queue_declare(queue="q6", arguments=queue_options)
+ queue_options["qpid.max_size"] = 50
+ session.queue_declare(queue="q6", arguments=queue_options)
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue create request with name=q6 maxqueuesize=50 maxqueuecount=50");
@@ -673,7 +673,7 @@ class ACLTests(TestBase010):
try:
queue_options = {}
queue_options["qpid.policy_type"] = "ring"
- session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
+ session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue create request for q2 with exclusive=true policytype=ring");
@@ -691,14 +691,14 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.queue_purge(queue="q4")
self.fail("ACL should deny queue purge request for q4");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.queue_purge(queue="q3")
except qpid.session.SessionException, e:
@@ -710,14 +710,14 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow queue query request for q3");
-
+
try:
session.queue_delete(queue="q3")
self.fail("ACL should deny queue delete request for q3");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.queue_delete(queue="q4")
except qpid.session.SessionException, e:
@@ -727,9 +727,9 @@ class ACLTests(TestBase010):
#=====================================
# ACL exchange tests
#=====================================
-
+
def test_exchange_acl_allow_mode(self):
- session = self.get_session('bob','bob')
+ session = self.get_session('bob','bob')
session.queue_declare(queue="baz")
"""
@@ -743,12 +743,12 @@ class ACLTests(TestBase010):
aclf.write('acl deny bob@QPID unbind exchange name=myEx queuename=q1 routingkey=rk1\n')
aclf.write('acl deny bob@QPID delete exchange name=myEx\n')
aclf.write('acl allow all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
session.queue_declare(queue='q1')
session.queue_declare(queue='q2')
@@ -760,21 +760,21 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.exchange_declare(exchange='testEx', type='direct', durable=True, passive=False)
except qpid.session.SessionException, e:
print e
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange create request for testEx with any parameter other than durable=true and passive=true");
-
+
try:
session.exchange_declare(exchange='ex1', type='direct')
self.fail("ACL should deny exchange create request with name=ex1 type=direct");
- except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.exchange_declare(exchange='myXml', type='direct')
except qpid.session.SessionException, e:
@@ -796,13 +796,13 @@ class ACLTests(TestBase010):
session = self.get_session('bob','bob')
try:
- session.exchange_query(name='amq.topic')
+ session.exchange_query(name='amq.topic')
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange query request for exchange='amq.topic'");
-
+
try:
- session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk2.*')
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk2.*')
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange bound request for myEx with queuename=q1 and binding_key='rk2.*'");
@@ -811,12 +811,12 @@ class ACLTests(TestBase010):
session.exchange_bind(exchange='myEx', queue='q1', binding_key='rk1')
self.fail("ACL should deny exchange bind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
session.exchange_bind(exchange='myEx', queue='q1', binding_key='x')
- except qpid.session.SessionException, e:
+ except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange bind request for exchange='myEx', queue='q1', binding_key='x'");
@@ -830,7 +830,7 @@ class ACLTests(TestBase010):
session.exchange_unbind(exchange='myEx', queue='q1', binding_key='rk1')
self.fail("ACL should deny exchange unbind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
except qpid.session.SessionException, e:
- self.assertEqual(403,e.args[0].error_code)
+ self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
@@ -844,20 +844,20 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange unbind request for exchange='myEx', queue='q2', binding_key='rk1'");
-
+
try:
session.exchange_delete(exchange='myEx')
self.fail("ACL should deny exchange delete request for myEx");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.exchange_delete(exchange='myXml')
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange delete request for myXml");
-
+
def test_exchange_acl_deny_mode(self):
session = self.get_session('bob','bob')
@@ -868,18 +868,18 @@ class ACLTests(TestBase010):
"""
aclf = self.get_acl_file()
aclf.write('acl allow bob@QPID create exchange name=myEx durable=true passive=false\n')
- aclf.write('acl allow bob@QPID bind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
+ aclf.write('acl allow bob@QPID bind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
aclf.write('acl allow bob@QPID unbind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
aclf.write('acl allow bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
aclf.write('acl allow bob@QPID delete exchange name=myEx\n')
- aclf.write('acl allow anonymous all all\n')
+ aclf.write('acl allow anonymous all all\n')
aclf.write('acl deny all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
try:
@@ -887,14 +887,14 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange create request for myEx with durable=true and passive=false");
-
+
try:
session.exchange_declare(exchange='myEx', type='direct', durable=False)
self.fail("ACL should deny exchange create request with name=myEx durable=false");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.exchange_bind(exchange='amq.topic', queue='bar', binding_key='foo.bar')
except qpid.session.SessionException, e:
@@ -949,13 +949,13 @@ class ACLTests(TestBase010):
session = self.get_session('bob','bob')
try:
- session.exchange_query(name='myEx')
+ session.exchange_query(name='myEx')
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange query request for exchange='myEx'");
-
+
try:
- session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk1.*')
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk1.*')
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange bound request for myEx with queuename=q1 and binding_key='rk1.*'");
@@ -966,7 +966,7 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.exchange_delete(exchange='myEx')
except qpid.session.SessionException, e:
@@ -1022,24 +1022,24 @@ class ACLTests(TestBase010):
#=====================================
# ACL consume tests
#=====================================
-
+
def test_consume_allow_mode(self):
"""
Test cases for consume in allow mode
"""
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID consume queue name=q1\n')
- aclf.write('acl deny bob@QPID consume queue name=q2\n')
+ aclf.write('acl deny bob@QPID consume queue name=q2\n')
aclf.write('acl allow all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
-
-
+
+
try:
session.queue_declare(queue='q1')
session.queue_declare(queue='q2')
@@ -1047,27 +1047,27 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow create queue request");
-
+
try:
session.message_subscribe(queue='q1', destination='myq1')
self.fail("ACL should deny subscription for queue='q1'");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.message_subscribe(queue='q2', destination='myq1')
self.fail("ACL should deny subscription for queue='q2'");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.message_subscribe(queue='q3', destination='myq1')
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
- self.fail("ACL should allow subscription for q3");
-
+ self.fail("ACL should allow subscription for q3");
+
def test_consume_deny_mode(self):
"""
@@ -1076,18 +1076,18 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl allow bob@QPID consume queue name=q1\n')
aclf.write('acl allow bob@QPID consume queue name=q2\n')
- aclf.write('acl allow bob@QPID create queue\n')
- aclf.write('acl allow anonymous all\n')
+ aclf.write('acl allow bob@QPID create queue\n')
+ aclf.write('acl allow anonymous all\n')
aclf.write('acl deny all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
-
-
+
+
try:
session.queue_declare(queue='q1')
session.queue_declare(queue='q2')
@@ -1101,20 +1101,20 @@ class ACLTests(TestBase010):
session.message_subscribe(queue='q2', destination='myq2')
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
- self.fail("ACL should allow subscription for q1 and q2");
-
+ self.fail("ACL should allow subscription for q1 and q2");
+
try:
session.message_subscribe(queue='q3', destination='myq3')
self.fail("ACL should deny subscription for queue='q3'");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
#=====================================
# ACL publish tests
#=====================================
-
+
def test_publish_acl_allow_mode(self):
"""
Test various publish acl
@@ -1122,40 +1122,40 @@ class ACLTests(TestBase010):
aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID publish exchange name=amq.direct routingkey=rk1\n')
aclf.write('acl deny bob@QPID publish exchange name=amq.topic\n')
- aclf.write('acl deny bob@QPID publish exchange name=myEx routingkey=rk2\n')
+ aclf.write('acl deny bob@QPID publish exchange name=myEx routingkey=rk2\n')
aclf.write('acl allow all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
-
+
props = session.delivery_properties(routing_key="rk1")
-
- try:
+
+ try:
session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
self.fail("ACL should deny message transfer to name=amq.direct routingkey=rk1");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
- session = self.get_session('bob','bob')
-
+ session = self.get_session('bob','bob')
+
try:
session.message_transfer(destination="amq.topic", message=Message(props,"Test"))
self.fail("ACL should deny message transfer to name=amq.topic");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
-
+
try:
session.exchange_declare(exchange='myEx', type='direct', durable=False)
session.message_transfer(destination="myEx", message=Message(props,"Test"))
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
- self.fail("ACL should allow message transfer to exchange myEx with routing key rk1");
-
-
+ self.fail("ACL should allow message transfer to exchange myEx with routing key rk1");
+
+
props = session.delivery_properties(routing_key="rk2")
try:
session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
@@ -1172,39 +1172,39 @@ class ACLTests(TestBase010):
aclf.write('acl allow bob@QPID publish exchange name=amq.direct routingkey=rk1\n')
aclf.write('acl allow bob@QPID publish exchange name=amq.topic\n')
aclf.write('acl allow bob@QPID publish exchange name=myEx routingkey=rk2\n')
- aclf.write('acl allow bob@QPID create exchange\n')
- aclf.write('acl allow anonymous all all \n')
+ aclf.write('acl allow bob@QPID create exchange\n')
+ aclf.write('acl allow anonymous all all \n')
aclf.write('acl deny all all')
- aclf.close()
-
+ aclf.close()
+
result = self.reload_acl()
if (result):
- self.fail(result)
-
+ self.fail(result)
+
session = self.get_session('bob','bob')
-
+
props = session.delivery_properties(routing_key="rk2")
-
- try:
+
+ try:
session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
self.fail("ACL should deny message transfer to name=amq.direct routingkey=rk2");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
- session = self.get_session('bob','bob')
-
+ session = self.get_session('bob','bob')
+
try:
session.message_transfer(destination="amq.topic", message=Message(props,"Test"))
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
- self.fail("ACL should allow message transfer to exchange amq.topic with any routing key");
-
+ self.fail("ACL should allow message transfer to exchange amq.topic with any routing key");
+
try:
session.exchange_declare(exchange='myEx', type='direct', durable=False)
session.message_transfer(destination="myEx", message=Message(props,"Test"))
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
- self.fail("ACL should allow message transfer to exchange myEx with routing key=rk2");
-
+ self.fail("ACL should allow message transfer to exchange myEx with routing key=rk2");
+
props = session.delivery_properties(routing_key="rk1")
try:
@@ -1212,7 +1212,7 @@ class ACLTests(TestBase010):
self.fail("ACL should deny message transfer to name=myEx routingkey=rk1");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
- session = self.get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.message_transfer(destination="amq.direct", message=Message(props,"Test"))