summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/acl.py')
-rwxr-xr-xqpid/cpp/src/tests/acl.py1077
1 files changed, 1077 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
new file mode 100755
index 0000000000..5e9a150d8f
--- /dev/null
+++ b/qpid/cpp/src/tests/acl.py
@@ -0,0 +1,1077 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import qpid
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import uuid4
+from qpid.testlib import TestBase010
+from qmf.console import Session
+from qpid.datatypes import Message
+import qpid.messaging
+
+class ACLFile:
+ def __init__(self, policy='data_dir/policy.acl'):
+ self.f = open(policy,'w')
+
+ def write(self,line):
+ self.f.write(line)
+
+ def close(self):
+ self.f.close()
+
+class ACLTests(TestBase010):
+
+ def get_session(self, user, passwd):
+ socket = connect(self.broker.host, self.broker.port)
+ connection = Connection (sock=socket, username=user, password=passwd,
+ mechanism="PLAIN")
+ connection.start()
+ return connection.session(str(uuid4()))
+
+ def reload_acl(self):
+ acl = self.qmf.getObjects(_class="acl")[0]
+ return acl.reloadACLFile()
+
+ def get_acl_file(self):
+ return ACLFile(self.config.defines.get("policy-file", "data_dir/policy.acl"))
+
+ def setUp(self):
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all all\n')
+ aclf.close()
+ TestBase010.setUp(self)
+ self.startQmf()
+ self.reload_acl()
+
+ def tearDown(self):
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all all\n')
+ aclf.close()
+ self.reload_acl()
+ TestBase010.tearDown(self)
+
+ #=====================================
+ # ACL general tests
+ #=====================================
+
+ def test_deny_mode(self):
+ """
+ Test the deny all mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow anonymous all all\n')
+ aclf.write('acl allow bob@QPID create queue\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+ try:
+ session.queue_declare(queue="deny_queue")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request");
+ self.fail("Error during queue create request");
+
+ try:
+ session.exchange_bind(exchange="amq.direct", queue="deny_queue", binding_key="routing_key")
+ self.fail("ACL should deny queue bind request");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+
+ def test_allow_mode(self):
+ """
+ Test the allow all mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID bind exchange\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+ try:
+ session.queue_declare(queue="allow_queue")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request");
+ self.fail("Error during queue create request");
+
+ try:
+ session.exchange_bind(exchange="amq.direct", queue="allow_queue", binding_key="routing_key")
+ self.fail("ACL should deny queue bind request");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+
+
+ #=====================================
+ # ACL file format tests
+ #=====================================
+
+ def test_empty_groups(self):
+ """
+ Test empty groups
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl group\n')
+ aclf.write('acl group admins bob@QPID joe@QPID\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("Insufficient tokens for acl definition",0,len(result.text)) == -1):
+ self.fail("ACL Reader should reject the acl file due to empty group name")
+
+ def test_illegal_acl_formats(self):
+ """
+ Test illegal acl formats
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl group admins bob@QPID joe@QPID\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("Unknown ACL permission",0,len(result.text)) == -1):
+ self.fail(result)
+
+ def test_illegal_extension_lines(self):
+ """
+ Test illegal extension lines
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('group admins bob@QPID \n')
+ aclf.write(' \ \n')
+ aclf.write('joe@QPID \n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("contains an illegal extension",0,len(result.text)) == -1):
+ self.fail(result)
+
+ if (result.text.find("Non-continuation line must start with \"group\" or \"acl\"",0,len(result.text)) == -1):
+ self.fail(result)
+
+ def test_llegal_extension_lines(self):
+ """
+ Test proper extention lines
+ """
+ aclf = self.get_acl_file()
+ aclf.write('group test1 joe@EXAMPLE.com \\ \n') # should be allowed
+ aclf.write(' jack@EXAMPLE.com \\ \n') # should be allowed
+ aclf.write('jill@TEST.COM \\ \n') # should be allowed
+ aclf.write('host/123.example.com@TEST.COM\n') # should be allowed
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("ACL format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ def test_user_realm(self):
+ """
+ Test a user defined without a realm
+ Ex. group admin rajith
+ """
+ aclf = self.get_acl_file()
+ aclf.write('group admin bob\n') # shouldn't be allowed
+ aclf.write('acl deny admin bind exchange\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("Username 'bob' must contain a realm",0,len(result.text)) == -1):
+ self.fail(result)
+
+ def test_allowed_chars_for_username(self):
+ """
+ Test a user defined without a realm
+ Ex. group admin rajith
+ """
+ aclf = self.get_acl_file()
+ aclf.write('group test1 joe@EXAMPLE.com\n') # should be allowed
+ aclf.write('group test2 jack_123-jill@EXAMPLE.com\n') # should be allowed
+ aclf.write('group test4 host/somemachine.example.com@EXAMPLE.COM\n') # should be allowed
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("ACL format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('group test1 joe$H@EXAMPLE.com\n') # shouldn't be allowed
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("Username \"joe$H@EXAMPLE.com\" contains illegal characters",0,len(result.text)) == -1):
+ self.fail(result)
+
+ #=====================================
+ # ACL validation tests
+ #=====================================
+
+ def test_illegal_queue_policy(self):
+ """
+ Test illegal queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ding\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "ding is not a valid value for 'policytype', possible values are one of" \
+ " { 'ring' 'ring_strict' 'flow_to_disk' 'reject' }";
+ if (result.text != expected):
+ self.fail(result)
+
+ def test_illegal_queue_size(self):
+ """
+ Test illegal queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 maxqueuesize=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'maxqueuesize', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.text != expected):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 maxqueuesize=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'maxqueuesize', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.text != expected):
+ self.fail(result)
+
+
+ def test_illegal_queue_count(self):
+ """
+ Test illegal queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 maxqueuecount=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'maxqueuecount', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.text != expected):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 maxqueuecount=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'maxqueuecount', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.text != expected):
+ self.fail(result)
+
+
+ #=====================================
+ # ACL queue tests
+ #=====================================
+
+ def test_queue_allow_mode(self):
+ """
+ Test cases for queue acl in allow mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q1 durable=true passive=true\n')
+ aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
+ aclf.write('acl deny bob@QPID access queue name=q3\n')
+ aclf.write('acl deny bob@QPID purge queue name=q3\n')
+ aclf.write('acl deny bob@QPID delete queue name=q4\n')
+ aclf.write('acl deny bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_declare(queue="q1", durable=True, passive=True)
+ self.fail("ACL should deny queue create request with name=q1 durable=true passive=true");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.policy_type"] = "ring"
+ session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=q2 exclusive=true qpid.policy_type=ring");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.policy_type"] = "ring_strict"
+ session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=q2 exclusive=true qpid.policy_type=ring_strict");
+
+ try:
+ queue_options = {}
+ queue_options["qpid.max_count"] = 200
+ queue_options["qpid.max_size"] = 500
+ session.queue_declare(queue="q5", exclusive=True, arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=q2, qpid.max_size=500 and qpid.max_count=200");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.max_count"] = 200
+ queue_options["qpid.max_size"] = 100
+ session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=q2, qpid.max_size=100 and qpid.max_count=200 ");
+ try:
+ session.queue_declare(queue="q3", exclusive=True)
+ session.queue_declare(queue="q4", durable=True)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request for q3 and q4 with any parameter");
+
+ try:
+ session.queue_query(queue="q3")
+ self.fail("ACL should deny queue query request for q3");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_purge(queue="q3")
+ self.fail("ACL should deny queue purge request for q3");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_purge(queue="q4")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue purge request for q4");
+
+ try:
+ session.queue_delete(queue="q4")
+ self.fail("ACL should deny queue delete request for q4");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_delete(queue="q3")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue delete request for q3");
+
+
+ def test_queue_deny_mode(self):
+ """
+ Test cases for queue acl in deny mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID create queue name=q1 durable=true passive=true\n')
+ aclf.write('acl allow bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
+ aclf.write('acl allow bob@QPID access queue name=q3\n')
+ aclf.write('acl allow bob@QPID purge queue name=q3\n')
+ aclf.write('acl allow bob@QPID create queue name=q3\n')
+ aclf.write('acl allow bob@QPID create queue name=q4\n')
+ aclf.write('acl allow bob@QPID delete queue name=q4\n')
+ aclf.write('acl allow bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
+ aclf.write('acl allow anonymous all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_declare(queue="q1", durable=True, passive=True)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=q1 durable=true passive=true");
+
+ try:
+ session.queue_declare(queue="q1", durable=False, passive=False)
+ self.fail("ACL should deny queue create request with name=q1 durable=true passive=false");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_declare(queue="q2", exclusive=False)
+ self.fail("ACL should deny queue create request with name=q2 exclusive=false");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.max_count"] = 200
+ queue_options["qpid.max_size"] = 500
+ session.queue_declare(queue="q5", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=q2 maxqueuesize=500 maxqueuecount=200");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.max_count"] = 100
+ queue_options["qpid.max_size"] = 500
+ session.queue_declare(queue="q5", arguments=queue_options)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=q2 maxqueuesize=500 maxqueuecount=200");
+
+ try:
+ queue_options = {}
+ queue_options["qpid.policy_type"] = "ring"
+ session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request for q2 with exclusive=true policytype=ring");
+
+ try:
+ session.queue_declare(queue="q3")
+ session.queue_declare(queue="q4")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request for q3 and q4");
+
+ try:
+ session.queue_query(queue="q4")
+ self.fail("ACL should deny queue query request for q4");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_purge(queue="q4")
+ self.fail("ACL should deny queue purge request for q4");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_purge(queue="q3")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue purge request for q3");
+
+ try:
+ session.queue_query(queue="q3")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue query request for q3");
+
+ try:
+ session.queue_delete(queue="q3")
+ self.fail("ACL should deny queue delete request for q3");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_delete(queue="q4")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue delete request for q4");
+
+ #=====================================
+ # ACL exchange tests
+ #=====================================
+
+ def test_exchange_acl_allow_mode(self):
+ session = self.get_session('bob','bob')
+ session.queue_declare(queue="baz")
+
+ """
+ Test cases for exchange acl in allow mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create exchange name=testEx durable=true passive=true\n')
+ aclf.write('acl deny bob@QPID create exchange name=ex1 type=direct\n')
+ aclf.write('acl deny bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
+ aclf.write('acl deny bob@QPID bind exchange name=myEx queuename=q1 routingkey=rk1\n')
+ aclf.write('acl deny bob@QPID unbind exchange name=myEx queuename=q1 routingkey=rk1\n')
+ aclf.write('acl deny bob@QPID delete exchange name=myEx\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+ session.queue_declare(queue='q1')
+ session.queue_declare(queue='q2')
+ session.exchange_declare(exchange='myEx', type='direct')
+
+ try:
+ session.exchange_declare(exchange='testEx', durable=True, passive=True)
+ self.fail("ACL should deny exchange create request with name=testEx durable=true passive=true");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_declare(exchange='testEx', type='direct', durable=True, passive=False)
+ except qpid.session.SessionException, e:
+ print e
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange create request for testEx with any parameter other than durable=true and passive=true");
+
+ try:
+ session.exchange_declare(exchange='ex1', type='direct')
+ self.fail("ACL should deny exchange create request with name=ex1 type=direct");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_declare(exchange='myXml', type='direct')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange create request for myXml with any parameter");
+
+ try:
+ session.exchange_query(name='myEx')
+ self.fail("ACL should deny exchange query request for myEx");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk1.*')
+ self.fail("ACL should deny exchange bound request for myEx with queuename=q1 and routing_key='rk1.*' ");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_query(name='amq.topic')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange query request for exchange='amq.topic'");
+
+ try:
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk2.*')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange bound request for myEx with queuename=q1 and binding_key='rk2.*'");
+
+ try:
+ session.exchange_bind(exchange='myEx', queue='q1', binding_key='rk1')
+ self.fail("ACL should deny exchange bind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_bind(exchange='myEx', queue='q1', binding_key='x')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange bind request for exchange='myEx', queue='q1', binding_key='x'");
+
+ try:
+ session.exchange_bind(exchange='myEx', queue='q2', binding_key='rk1')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange bind request for exchange='myEx', queue='q2', binding_key='rk1'");
+
+ try:
+ session.exchange_unbind(exchange='myEx', queue='q1', binding_key='rk1')
+ self.fail("ACL should deny exchange unbind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_unbind(exchange='myEx', queue='q1', binding_key='x')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange unbind request for exchange='myEx', queue='q1', binding_key='x'");
+
+ try:
+ session.exchange_unbind(exchange='myEx', queue='q2', binding_key='rk1')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange unbind request for exchange='myEx', queue='q2', binding_key='rk1'");
+
+ try:
+ session.exchange_delete(exchange='myEx')
+ self.fail("ACL should deny exchange delete request for myEx");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_delete(exchange='myXml')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange delete request for myXml");
+
+
+ def test_exchange_acl_deny_mode(self):
+ session = self.get_session('bob','bob')
+ session.queue_declare(queue='bar')
+
+ """
+ Test cases for exchange acl in deny mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID create exchange name=myEx durable=true passive=false\n')
+ aclf.write('acl allow bob@QPID bind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
+ aclf.write('acl allow bob@QPID unbind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
+ aclf.write('acl allow bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
+ aclf.write('acl allow bob@QPID delete exchange name=myEx\n')
+ aclf.write('acl allow anonymous all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_declare(exchange='myEx', type='direct', durable=True, passive=False)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange create request for myEx with durable=true and passive=false");
+
+ try:
+ session.exchange_declare(exchange='myEx', type='direct', durable=False)
+ self.fail("ACL should deny exchange create request with name=myEx durable=false");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_bind(exchange='amq.topic', queue='bar', binding_key='foo.bar')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange bind request for exchange='amq.topic', queue='bar', binding_key='foor.bar'");
+
+ try:
+ session.exchange_bind(exchange='amq.topic', queue='baz', binding_key='foo.bar')
+ self.fail("ACL should deny exchange bind request for exchange='amq.topic', queue='baz', binding_key='foo.bar'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_bind(exchange='amq.topic', queue='bar', binding_key='fooz.bar')
+ self.fail("ACL should deny exchange bind request for exchange='amq.topic', queue='bar', binding_key='fooz.bar'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_unbind(exchange='amq.topic', queue='bar', binding_key='foo.bar')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange unbind request for exchange='amq.topic', queue='bar', binding_key='foor.bar'");
+ try:
+ session.exchange_unbind(exchange='amq.topic', queue='baz', binding_key='foo.bar')
+ self.fail("ACL should deny exchange unbind request for exchange='amq.topic', queue='baz', binding_key='foo.bar'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_unbind(exchange='amq.topic', queue='bar', binding_key='fooz.bar')
+ self.fail("ACL should deny exchange unbind request for exchange='amq.topic', queue='bar', binding_key='fooz.bar'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_query(name='amq.topic')
+ self.fail("ACL should deny exchange query request for amq.topic");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk2.*')
+ self.fail("ACL should deny exchange bound request for amq.topic with queuename=q1 and routing_key='rk2.*' ");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_query(name='myEx')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange query request for exchange='myEx'");
+
+ try:
+ session.exchange_bound(exchange='myEx', queue='q1', binding_key='rk1.*')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange bound request for myEx with queuename=q1 and binding_key='rk1.*'");
+
+ try:
+ session.exchange_delete(exchange='myXml')
+ self.fail("ACL should deny exchange delete request for myXml");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_delete(exchange='myEx')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow exchange delete request for myEx");
+
+ def test_create_and_delete_exchange_via_qmf(self):
+ """
+ Test acl is enforced when creating/deleting via QMF
+ methods. Note that in order to be able to send the QMF methods
+ and receive the responses a significant amount of permissions
+ need to be enabled (TODO: can the set below be narrowed down
+ at all?)
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID create exchange\n')
+ aclf.write('acl allow admin@QPID delete exchange\n')
+ aclf.write('acl allow all access exchange\n')
+ aclf.write('acl allow all bind exchange\n')
+ aclf.write('acl allow all create queue\n')
+ aclf.write('acl allow all access queue\n')
+ aclf.write('acl allow all delete queue\n')
+ aclf.write('acl allow all consume queue\n')
+ aclf.write('acl allow all access method\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+ bob.create_exchange("my-exchange") #should pass
+ #cleanup by deleting exchange
+ try:
+ bob.delete_exchange("my-exchange") #should fail
+ self.fail("ACL should deny exchange delete request for my-exchange");
+ except Exception, e:
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ admin = BrokerAdmin(self.config.broker, "admin", "admin")
+ admin.delete_exchange("my-exchange") #should pass
+
+ anonymous = BrokerAdmin(self.config.broker)
+ try:
+ anonymous.create_exchange("another-exchange") #should fail
+ self.fail("ACL should deny exchange create request for another-exchange");
+ except Exception, e:
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+
+
+ #=====================================
+ # ACL consume tests
+ #=====================================
+
+ def test_consume_allow_mode(self):
+ """
+ Test cases for consume in allow mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID consume queue name=q1\n')
+ aclf.write('acl deny bob@QPID consume queue name=q2\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+
+ try:
+ session.queue_declare(queue='q1')
+ session.queue_declare(queue='q2')
+ session.queue_declare(queue='q3')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow create queue request");
+
+ try:
+ session.message_subscribe(queue='q1', destination='myq1')
+ self.fail("ACL should deny subscription for queue='q1'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.message_subscribe(queue='q2', destination='myq1')
+ self.fail("ACL should deny subscription for queue='q2'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.message_subscribe(queue='q3', destination='myq1')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow subscription for q3");
+
+
+ def test_consume_deny_mode(self):
+ """
+ Test cases for consume in allow mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID consume queue name=q1\n')
+ aclf.write('acl allow bob@QPID consume queue name=q2\n')
+ aclf.write('acl allow bob@QPID create queue\n')
+ aclf.write('acl allow anonymous all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+
+ try:
+ session.queue_declare(queue='q1')
+ session.queue_declare(queue='q2')
+ session.queue_declare(queue='q3')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow create queue request");
+
+ try:
+ session.message_subscribe(queue='q1', destination='myq1')
+ session.message_subscribe(queue='q2', destination='myq2')
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow subscription for q1 and q2");
+
+ try:
+ session.message_subscribe(queue='q3', destination='myq3')
+ self.fail("ACL should deny subscription for queue='q3'");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+
+ #=====================================
+ # ACL publish tests
+ #=====================================
+
+ def test_publish_acl_allow_mode(self):
+ """
+ Test various publish acl
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID publish exchange name=amq.direct routingkey=rk1\n')
+ aclf.write('acl deny bob@QPID publish exchange name=amq.topic\n')
+ aclf.write('acl deny bob@QPID publish exchange name=myEx routingkey=rk2\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ props = session.delivery_properties(routing_key="rk1")
+
+ try:
+ session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
+ self.fail("ACL should deny message transfer to name=amq.direct routingkey=rk1");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.message_transfer(destination="amq.topic", message=Message(props,"Test"))
+ self.fail("ACL should deny message transfer to name=amq.topic");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.exchange_declare(exchange='myEx', type='direct', durable=False)
+ session.message_transfer(destination="myEx", message=Message(props,"Test"))
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow message transfer to exchange myEx with routing key rk1");
+
+
+ props = session.delivery_properties(routing_key="rk2")
+ try:
+ session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk2");
+
+
+ def test_publish_acl_deny_mode(self):
+ """
+ Test various publish acl
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID publish exchange name=amq.direct routingkey=rk1\n')
+ aclf.write('acl allow bob@QPID publish exchange name=amq.topic\n')
+ aclf.write('acl allow bob@QPID publish exchange name=myEx routingkey=rk2\n')
+ aclf.write('acl allow bob@QPID create exchange\n')
+ aclf.write('acl allow anonymous all all \n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ props = session.delivery_properties(routing_key="rk2")
+
+ try:
+ session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
+ self.fail("ACL should deny message transfer to name=amq.direct routingkey=rk2");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.message_transfer(destination="amq.topic", message=Message(props,"Test"))
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow message transfer to exchange amq.topic with any routing key");
+
+ try:
+ session.exchange_declare(exchange='myEx', type='direct', durable=False)
+ session.message_transfer(destination="myEx", message=Message(props,"Test"))
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow message transfer to exchange myEx with routing key=rk2");
+
+ props = session.delivery_properties(routing_key="rk1")
+
+ try:
+ session.message_transfer(destination="myEx", message=Message(props,"Test"))
+ self.fail("ACL should deny message transfer to name=myEx routingkey=rk1");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1");
+
+class BrokerAdmin:
+ def __init__(self, broker, username=None, password=None):
+ self.connection = qpid.messaging.Connection(broker)
+ if username:
+ self.connection.username = username
+ self.connection.password = password
+ self.connection.sasl_mechanisms = "PLAIN"
+ self.connection.open()
+ self.session = self.connection.session()
+ self.sender = self.session.sender("qmf.default.direct/broker")
+ self.reply_to = "responses-#; {create:always}"
+ self.receiver = self.session.receiver(self.reply_to)
+
+ def invoke(self, method, arguments):
+ content = {
+ "_object_id": {"_object_name": "org.apache.qpid.broker:broker:amqp-broker"},
+ "_method_name": method,
+ "_arguments": arguments
+ }
+ request = qpid.messaging.Message(reply_to=self.reply_to, content=content)
+ request.properties["x-amqp-0-10.app-id"] = "qmf2"
+ request.properties["qmf.opcode"] = "_method_request"
+ self.sender.send(request)
+ response = self.receiver.fetch()
+ self.session.acknowledge()
+ if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
+ if response.properties['qmf.opcode'] == '_method_response':
+ return response.content['_arguments']
+ elif response.properties['qmf.opcode'] == '_exception':
+ raise Exception(response.content['_values'])
+ else: raise Exception("Invalid response received, unexpected opcode: %s" % response.properties['qmf.opcode'])
+ else: raise Exception("Invalid response received, not a qmfv2 method: %s" % response.properties['x-amqp-0-10.app-id'])
+ def create_exchange(self, name, exchange_type=None, options={}):
+ properties = options
+ if exchange_type: properties["exchange_type"] = exchange_type
+ self.invoke("create", {"type": "exchange", "name":name, "properties":properties})
+
+ def create_queue(self, name, properties={}):
+ self.invoke("create", {"type": "queue", "name":name, "properties":properties})
+
+ def delete_exchange(self, name):
+ self.invoke("delete", {"type": "exchange", "name":name})
+
+ def delete_queue(self, name):
+ self.invoke("delete", {"type": "queue", "name":name})