diff options
-rw-r--r-- | oslo_policy/_checks.py | 121 | ||||
-rw-r--r-- | oslo_policy/_external.py | 111 | ||||
-rw-r--r-- | oslo_policy/_parser.py | 5 | ||||
-rw-r--r-- | oslo_policy/fixture.py | 4 | ||||
-rw-r--r-- | oslo_policy/tests/test_checks.py | 291 | ||||
-rw-r--r-- | oslo_policy/tests/test_external.py | 310 | ||||
-rw-r--r-- | setup.cfg | 4 |
7 files changed, 444 insertions, 402 deletions
diff --git a/oslo_policy/_checks.py b/oslo_policy/_checks.py index c426285..c3031cd 100644 --- a/oslo_policy/_checks.py +++ b/oslo_policy/_checks.py @@ -17,19 +17,25 @@ import abc import ast -import contextlib -import copy import inspect -import os -from oslo_serialization import jsonutils -import requests import six +import stevedore -from oslo_policy._i18n import _ +registered_checks = {} +extension_checks = None -registered_checks = {} +def get_extensions(): + global extension_checks + if extension_checks is None: + em = stevedore.ExtensionManager('oslo.policy.rule_checks', + invoke_on_load=False) + extension_checks = { + extension.name: extension.plugin + for extension in em + } + return extension_checks def _check(rule, target, creds, enforcer, current_rule): @@ -276,107 +282,6 @@ class RoleCheck(Check): return False -@register('http') -class HttpCheck(Check): - """Check ``http:`` rules by calling to a remote server. - - This example implementation simply verifies that the response - is exactly ``True``. - """ - - def __call__(self, target, creds, enforcer, current_rule=None): - url = ('http:' + self.match) % target - - # Convert instances of object() in target temporarily to - # empty dict to avoid circular reference detection - # errors in jsonutils.dumps(). - temp_target = copy.deepcopy(target) - for key in target.keys(): - element = target.get(key) - if type(element) is object: - temp_target[key] = {} - - data = json = None - if (enforcer.conf.oslo_policy.remote_content_type == - 'application/x-www-form-urlencoded'): - data = {'rule': jsonutils.dumps(current_rule), - 'target': jsonutils.dumps(temp_target), - 'credentials': jsonutils.dumps(creds)} - else: - json = {'rule': current_rule, - 'target': temp_target, - 'credentials': creds} - - with contextlib.closing(requests.post(url, json=json, data=data)) as r: - return r.text.lstrip('"').rstrip('"') == 'True' - - -@register('https') -class HttpsCheck(Check): - """Check ``https:`` rules by calling to a remote server. - - This example implementation simply verifies that the response - is exactly ``True``. - """ - - def __call__(self, target, creds, enforcer, current_rule=None): - url = ('https:' + self.match) % target - - cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file - key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file - ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file - verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt - - if cert_file: - if not os.path.exists(cert_file): - raise RuntimeError( - _("Unable to find ssl cert_file : %s") % cert_file) - if not os.access(cert_file, os.R_OK): - raise RuntimeError( - _("Unable to access ssl cert_file : %s") % cert_file) - if key_file: - if not os.path.exists(key_file): - raise RuntimeError( - _("Unable to find ssl key_file : %s") % key_file) - if not os.access(key_file, os.R_OK): - raise RuntimeError( - _("Unable to access ssl key_file : %s") % key_file) - cert = (cert_file, key_file) - if verify_server: - if ca_crt_file: - if not os.path.exists(ca_crt_file): - raise RuntimeError( - _("Unable to find ca cert_file : %s") % ca_crt_file) - verify_server = ca_crt_file - - # Convert instances of object() in target temporarily to - # empty dict to avoid circular reference detection - # errors in jsonutils.dumps(). - temp_target = copy.deepcopy(target) - for key in target.keys(): - element = target.get(key) - if type(element) is object: - temp_target[key] = {} - - data = json = None - if (enforcer.conf.oslo_policy.remote_content_type == - 'application/x-www-form-urlencoded'): - data = {'rule': jsonutils.dumps(current_rule), - 'target': jsonutils.dumps(temp_target), - 'credentials': jsonutils.dumps(creds)} - else: - json = {'rule': current_rule, - 'target': temp_target, - 'credentials': creds} - - with contextlib.closing( - requests.post(url, json=json, - data=data, cert=cert, - verify=verify_server) - ) as r: - return r.text.lstrip('"').rstrip('"') == 'True' - - @register(None) class GenericCheck(Check): """Check an individual match. diff --git a/oslo_policy/_external.py b/oslo_policy/_external.py new file mode 100644 index 0000000..928e667 --- /dev/null +++ b/oslo_policy/_external.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import copy +import os + +from oslo_policy import _checks +from oslo_policy._i18n import _ +from oslo_serialization import jsonutils +import requests + + +class HttpCheck(_checks.Check): + """Check ``http:`` rules by calling to a remote server. + + This example implementation simply verifies that the response + is exactly ``True``. + """ + + def __call__(self, target, creds, enforcer, current_rule=None): + url = ('http:' + self.match) % target + data, json = self._construct_payload(creds, current_rule, + enforcer, target) + with contextlib.closing( + requests.post(url, json=json, data=data) + ) as r: + return r.text.lstrip('"').rstrip('"') == 'True' + + @staticmethod + def _construct_payload(creds, current_rule, enforcer, target): + # Convert instances of object() in target temporarily to + # empty dict to avoid circular reference detection + # errors in jsonutils.dumps(). + temp_target = copy.deepcopy(target) + for key in target.keys(): + element = target.get(key) + if type(element) is object: + temp_target[key] = {} + data = json = None + if (enforcer.conf.oslo_policy.remote_content_type == + 'application/x-www-form-urlencoded'): + data = {'rule': jsonutils.dumps(current_rule), + 'target': jsonutils.dumps(temp_target), + 'credentials': jsonutils.dumps(creds)} + else: + json = {'rule': current_rule, + 'target': temp_target, + 'credentials': creds} + return data, json + + +class HttpsCheck(HttpCheck): + """Check ``https:`` rules by calling to a remote server. + + This example implementation simply verifies that the response + is exactly ``True``. + """ + + def __call__(self, target, creds, enforcer, current_rule=None): + url = ('https:' + self.match) % target + + cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file + key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file + ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file + verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt + + if cert_file: + if not os.path.exists(cert_file): + raise RuntimeError( + _("Unable to find ssl cert_file : %s") % cert_file) + if not os.access(cert_file, os.R_OK): + raise RuntimeError( + _("Unable to access ssl cert_file : %s") % cert_file) + if key_file: + if not os.path.exists(key_file): + raise RuntimeError( + _("Unable to find ssl key_file : %s") % key_file) + if not os.access(key_file, os.R_OK): + raise RuntimeError( + _("Unable to access ssl key_file : %s") % key_file) + cert = (cert_file, key_file) + if verify_server: + if ca_crt_file: + if not os.path.exists(ca_crt_file): + raise RuntimeError( + _("Unable to find ca cert_file : %s") % ca_crt_file) + verify_server = ca_crt_file + + data, json = self._construct_payload(creds, current_rule, + enforcer, target) + with contextlib.closing( + requests.post(url, json=json, + data=data, cert=cert, + verify=verify_server) + ) as r: + return r.text.lstrip('"').rstrip('"') == 'True' diff --git a/oslo_policy/_parser.py b/oslo_policy/_parser.py index e85c81f..06dae07 100644 --- a/oslo_policy/_parser.py +++ b/oslo_policy/_parser.py @@ -216,7 +216,10 @@ def _parse_check(rule): return _checks.FalseCheck() # Find what implements the check - if kind in _checks.registered_checks: + extension_checks = _checks.get_extensions() + if kind in extension_checks: + return extension_checks[kind](kind, match) + elif kind in _checks.registered_checks: return _checks.registered_checks[kind](kind, match) elif None in _checks.registered_checks: return _checks.registered_checks[None](kind, match) diff --git a/oslo_policy/fixture.py b/oslo_policy/fixture.py index eaab5ee..e0b5107 100644 --- a/oslo_policy/fixture.py +++ b/oslo_policy/fixture.py @@ -36,7 +36,7 @@ class HttpCheckFixture(fixtures.Fixture): self.useFixture( fixtures.MonkeyPatch( - 'oslo_policy._checks.HttpCheck.__call__', + 'oslo_policy._external.HttpCheck.__call__', mocked_call, ) ) @@ -63,7 +63,7 @@ class HttpsCheckFixture(fixtures.Fixture): self.useFixture( fixtures.MonkeyPatch( - 'oslo_policy._checks.HttpCheck.__call__', + 'oslo_policy._external.HttpsCheck.__call__', mocked_call, ) ) diff --git a/oslo_policy/tests/test_checks.py b/oslo_policy/tests/test_checks.py index 1e402e9..1cd08ec 100644 --- a/oslo_policy/tests/test_checks.py +++ b/oslo_policy/tests/test_checks.py @@ -13,15 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. -import json import mock -from oslo_serialization import jsonutils from oslotest import base as test_base -from requests_mock.contrib import fixture as rm_fixture -import six.moves.urllib.parse as urlparse from oslo_policy import _checks -from oslo_policy import opts from oslo_policy.tests import base from oslo_policy.tests import token_fixture @@ -96,292 +91,6 @@ class RoleCheckTestCase(base.PolicyBaseTestCase): self.assertFalse(check({}, {}, self.enforcer)) -class HttpCheckTestCase(base.PolicyBaseTestCase): - - def setUp(self): - super(HttpCheckTestCase, self).setUp() - opts._register(self.conf) - self.requests_mock = self.useFixture(rm_fixture.Fixture()) - - def decode_post_data(self, post_data): - result = {} - for item in post_data.split('&'): - key, _sep, value = item.partition('=') - result[key] = jsonutils.loads(urlparse.unquote_plus(value)) - return result - - def test_accept(self): - self.requests_mock.post('http://example.com/target', text='True') - - check = _checks.HttpCheck('http', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - self.assertTrue(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual('application/x-www-form-urlencoded', - last_request.headers['Content-Type']) - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(target=target_dict, credentials=cred_dict, - rule=None), - self.decode_post_data(last_request.body)) - - def test_accept_json(self): - self.conf.set_override('remote_content_type', 'application/json', - group='oslo_policy') - self.requests_mock.post('http://example.com/target', text='True') - - check = _checks.HttpCheck('http', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - self.assertTrue(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual('application/json', - last_request.headers['Content-Type']) - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(rule=None, - credentials=cred_dict, - target=target_dict), - json.loads(last_request.body.decode('utf-8'))) - - def test_reject(self): - self.requests_mock.post("http://example.com/target", text='other') - - check = _checks.HttpCheck('http', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - self.assertFalse(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(target=target_dict, credentials=cred_dict, - rule=None), - self.decode_post_data(last_request.body)) - - def test_http_with_objects_in_target(self): - self.requests_mock.post("http://example.com/target", text='True') - - check = _checks.HttpCheck('http', '//example.com/%(name)s') - target = {'a': object(), - 'name': 'target', - 'b': 'test data'} - self.assertTrue(check(target, - dict(user='user', roles=['a', 'b', 'c']), - self.enforcer)) - - def test_http_with_strings_in_target(self): - self.requests_mock.post("http://example.com/target", text='True') - - check = _checks.HttpCheck('http', '//example.com/%(name)s') - target = {'a': 'some_string', - 'name': 'target', - 'b': 'test data'} - self.assertTrue(check(target, - dict(user='user', roles=['a', 'b', 'c']), - self.enforcer)) - - def test_accept_with_rule_in_argument(self): - self.requests_mock.post('http://example.com/target', text='True') - - check = _checks.HttpCheck('http', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - current_rule = "a_rule" - self.assertTrue(check(target_dict, cred_dict, self.enforcer, - current_rule)) - - last_request = self.requests_mock.last_request - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(target=target_dict, credentials=cred_dict, - rule=current_rule), - self.decode_post_data(last_request.body)) - - def test_reject_with_rule_in_argument(self): - self.requests_mock.post("http://example.com/target", text='other') - - check = _checks.HttpCheck('http', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - current_rule = "a_rule" - self.assertFalse(check(target_dict, cred_dict, self.enforcer, - current_rule)) - - last_request = self.requests_mock.last_request - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(target=target_dict, credentials=cred_dict, - rule=current_rule), - self.decode_post_data(last_request.body)) - - -class HttpsCheckTestCase(base.PolicyBaseTestCase): - - def setUp(self): - super(HttpsCheckTestCase, self).setUp() - opts._register(self.conf) - self.requests_mock = self.useFixture(rm_fixture.Fixture()) - - def decode_post_data(self, post_data): - result = {} - for item in post_data.split('&'): - key, _sep, value = item.partition('=') - result[key] = jsonutils.loads(urlparse.unquote_plus(value)) - return result - - def test_https_accept(self): - self.requests_mock.post('https://example.com/target', text='True') - - check = _checks.HttpsCheck('https', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - self.assertTrue(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual('application/x-www-form-urlencoded', - last_request.headers['Content-Type']) - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(rule=None, - target=target_dict, - credentials=cred_dict), - self.decode_post_data(last_request.body)) - - def test_https_accept_json(self): - self.conf.set_override('remote_content_type', 'application/json', - group='oslo_policy') - self.requests_mock.post('https://example.com/target', text='True') - - check = _checks.HttpsCheck('https', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - self.assertTrue(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual('application/json', - last_request.headers['Content-Type']) - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(rule=None, - target=target_dict, - credentials=cred_dict), - json.loads(last_request.body.decode('utf-8'))) - - def test_https_accept_with_verify(self): - self.conf.set_override('remote_ssl_verify_server_crt', True, - group='oslo_policy') - self.requests_mock.post('https://example.com/target', text='True') - - check = _checks.HttpsCheck('https', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - self.assertTrue(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual(True, last_request.verify) - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(rule=None, - target=target_dict, - credentials=cred_dict), - self.decode_post_data(last_request.body)) - - def test_https_accept_with_verify_cert(self): - self.conf.set_override('remote_ssl_verify_server_crt', True, - group='oslo_policy') - self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt", - group='oslo_policy') - self.requests_mock.post('https://example.com/target', text='True') - - check = _checks.HttpsCheck('https', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - with mock.patch('os.path.exists') as path_exists: - path_exists.return_value = True - self.assertTrue(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual('ca.crt', last_request.verify) - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(rule=None, - target=target_dict, - credentials=cred_dict), - self.decode_post_data(last_request.body)) - - def test_https_accept_with_verify_and_client_certs(self): - self.conf.set_override('remote_ssl_verify_server_crt', True, - group='oslo_policy') - self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt", - group='oslo_policy') - self.conf.set_override('remote_ssl_client_key_file', "client.key", - group='oslo_policy') - self.conf.set_override('remote_ssl_client_crt_file', "client.crt", - group='oslo_policy') - self.requests_mock.post('https://example.com/target', text='True') - - check = _checks.HttpsCheck('https', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - with mock.patch('os.path.exists') as path_exists: - with mock.patch('os.access') as os_access: - path_exists.return_value = True - os_access.return_value = True - self.assertTrue(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual('ca.crt', last_request.verify) - self.assertEqual(('client.crt', 'client.key'), last_request.cert) - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(rule=None, - target=target_dict, - credentials=cred_dict), - self.decode_post_data(last_request.body)) - - def test_https_reject(self): - self.requests_mock.post("https://example.com/target", text='other') - - check = _checks.HttpsCheck('https', '//example.com/%(name)s') - - target_dict = dict(name='target', spam='spammer') - cred_dict = dict(user='user', roles=['a', 'b', 'c']) - self.assertFalse(check(target_dict, cred_dict, self.enforcer)) - - last_request = self.requests_mock.last_request - self.assertEqual('POST', last_request.method) - self.assertEqual(dict(rule=None, - target=target_dict, - credentials=cred_dict), - self.decode_post_data(last_request.body)) - - def test_https_with_objects_in_target(self): - self.requests_mock.post("https://example.com/target", text='True') - - check = _checks.HttpsCheck('https', '//example.com/%(name)s') - target = {'a': object(), - 'name': 'target', - 'b': 'test data'} - self.assertTrue(check(target, - dict(user='user', roles=['a', 'b', 'c']), - self.enforcer)) - - def test_https_with_strings_in_target(self): - self.requests_mock.post("https://example.com/target", text='True') - - check = _checks.HttpsCheck('https', '//example.com/%(name)s') - target = {'a': 'some_string', - 'name': 'target', - 'b': 'test data'} - self.assertTrue(check(target, - dict(user='user', roles=['a', 'b', 'c']), - self.enforcer)) - - class GenericCheckTestCase(base.PolicyBaseTestCase): def test_no_cred(self): check = _checks.GenericCheck('name', '%(name)s') diff --git a/oslo_policy/tests/test_external.py b/oslo_policy/tests/test_external.py new file mode 100644 index 0000000..92ff539 --- /dev/null +++ b/oslo_policy/tests/test_external.py @@ -0,0 +1,310 @@ +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import mock +from oslo_serialization import jsonutils +from requests_mock.contrib import fixture as rm_fixture +import six.moves.urllib.parse as urlparse + +from oslo_policy import _external +from oslo_policy import opts +from oslo_policy.tests import base + + +class HttpCheckTestCase(base.PolicyBaseTestCase): + + def setUp(self): + super(HttpCheckTestCase, self).setUp() + opts._register(self.conf) + self.requests_mock = self.useFixture(rm_fixture.Fixture()) + + def decode_post_data(self, post_data): + result = {} + for item in post_data.split('&'): + key, _sep, value = item.partition('=') + result[key] = jsonutils.loads(urlparse.unquote_plus(value)) + return result + + def test_accept(self): + self.requests_mock.post('http://example.com/target', text='True') + + check = _external.HttpCheck('http', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + self.assertTrue(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual('application/x-www-form-urlencoded', + last_request.headers['Content-Type']) + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(target=target_dict, credentials=cred_dict, + rule=None), + self.decode_post_data(last_request.body)) + + def test_accept_json(self): + self.conf.set_override('remote_content_type', 'application/json', + group='oslo_policy') + self.requests_mock.post('http://example.com/target', text='True') + + check = _external.HttpCheck('http', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + self.assertTrue(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual('application/json', + last_request.headers['Content-Type']) + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(rule=None, + credentials=cred_dict, + target=target_dict), + json.loads(last_request.body.decode('utf-8'))) + + def test_reject(self): + self.requests_mock.post("http://example.com/target", text='other') + + check = _external.HttpCheck('http', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + self.assertFalse(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(target=target_dict, credentials=cred_dict, + rule=None), + self.decode_post_data(last_request.body)) + + def test_http_with_objects_in_target(self): + self.requests_mock.post("http://example.com/target", text='True') + + check = _external.HttpCheck('http', '//example.com/%(name)s') + target = {'a': object(), + 'name': 'target', + 'b': 'test data'} + self.assertTrue(check(target, + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer)) + + def test_http_with_strings_in_target(self): + self.requests_mock.post("http://example.com/target", text='True') + + check = _external.HttpCheck('http', '//example.com/%(name)s') + target = {'a': 'some_string', + 'name': 'target', + 'b': 'test data'} + self.assertTrue(check(target, + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer)) + + def test_accept_with_rule_in_argument(self): + self.requests_mock.post('http://example.com/target', text='True') + + check = _external.HttpCheck('http', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + current_rule = "a_rule" + self.assertTrue(check(target_dict, cred_dict, self.enforcer, + current_rule)) + + last_request = self.requests_mock.last_request + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(target=target_dict, credentials=cred_dict, + rule=current_rule), + self.decode_post_data(last_request.body)) + + def test_reject_with_rule_in_argument(self): + self.requests_mock.post("http://example.com/target", text='other') + + check = _external.HttpCheck('http', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + current_rule = "a_rule" + self.assertFalse(check(target_dict, cred_dict, self.enforcer, + current_rule)) + + last_request = self.requests_mock.last_request + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(target=target_dict, credentials=cred_dict, + rule=current_rule), + self.decode_post_data(last_request.body)) + + +class HttpsCheckTestCase(base.PolicyBaseTestCase): + + def setUp(self): + super(HttpsCheckTestCase, self).setUp() + opts._register(self.conf) + self.requests_mock = self.useFixture(rm_fixture.Fixture()) + + def decode_post_data(self, post_data): + result = {} + for item in post_data.split('&'): + key, _sep, value = item.partition('=') + result[key] = jsonutils.loads(urlparse.unquote_plus(value)) + return result + + def test_https_accept(self): + self.requests_mock.post('https://example.com/target', text='True') + + check = _external.HttpsCheck('https', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + self.assertTrue(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual('application/x-www-form-urlencoded', + last_request.headers['Content-Type']) + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(rule=None, + target=target_dict, + credentials=cred_dict), + self.decode_post_data(last_request.body)) + + def test_https_accept_json(self): + self.conf.set_override('remote_content_type', 'application/json', + group='oslo_policy') + self.requests_mock.post('https://example.com/target', text='True') + + check = _external.HttpsCheck('https', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + self.assertTrue(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual('application/json', + last_request.headers['Content-Type']) + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(rule=None, + target=target_dict, + credentials=cred_dict), + json.loads(last_request.body.decode('utf-8'))) + + def test_https_accept_with_verify(self): + self.conf.set_override('remote_ssl_verify_server_crt', True, + group='oslo_policy') + self.requests_mock.post('https://example.com/target', text='True') + + check = _external.HttpsCheck('https', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + self.assertTrue(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual(True, last_request.verify) + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(rule=None, + target=target_dict, + credentials=cred_dict), + self.decode_post_data(last_request.body)) + + def test_https_accept_with_verify_cert(self): + self.conf.set_override('remote_ssl_verify_server_crt', True, + group='oslo_policy') + self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt", + group='oslo_policy') + self.requests_mock.post('https://example.com/target', text='True') + + check = _external.HttpsCheck('https', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + with mock.patch('os.path.exists') as path_exists: + path_exists.return_value = True + self.assertTrue(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual('ca.crt', last_request.verify) + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(rule=None, + target=target_dict, + credentials=cred_dict), + self.decode_post_data(last_request.body)) + + def test_https_accept_with_verify_and_client_certs(self): + self.conf.set_override('remote_ssl_verify_server_crt', True, + group='oslo_policy') + self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt", + group='oslo_policy') + self.conf.set_override('remote_ssl_client_key_file', "client.key", + group='oslo_policy') + self.conf.set_override('remote_ssl_client_crt_file', "client.crt", + group='oslo_policy') + self.requests_mock.post('https://example.com/target', text='True') + + check = _external.HttpsCheck('https', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + with mock.patch('os.path.exists') as path_exists: + with mock.patch('os.access') as os_access: + path_exists.return_value = True + os_access.return_value = True + self.assertTrue(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual('ca.crt', last_request.verify) + self.assertEqual(('client.crt', 'client.key'), last_request.cert) + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(rule=None, + target=target_dict, + credentials=cred_dict), + self.decode_post_data(last_request.body)) + + def test_https_reject(self): + self.requests_mock.post("https://example.com/target", text='other') + + check = _external.HttpsCheck('https', '//example.com/%(name)s') + + target_dict = dict(name='target', spam='spammer') + cred_dict = dict(user='user', roles=['a', 'b', 'c']) + self.assertFalse(check(target_dict, cred_dict, self.enforcer)) + + last_request = self.requests_mock.last_request + self.assertEqual('POST', last_request.method) + self.assertEqual(dict(rule=None, + target=target_dict, + credentials=cred_dict), + self.decode_post_data(last_request.body)) + + def test_https_with_objects_in_target(self): + self.requests_mock.post("https://example.com/target", text='True') + + check = _external.HttpsCheck('https', '//example.com/%(name)s') + target = {'a': object(), + 'name': 'target', + 'b': 'test data'} + self.assertTrue(check(target, + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer)) + + def test_https_with_strings_in_target(self): + self.requests_mock.post("https://example.com/target", text='True') + + check = _external.HttpsCheck('https', '//example.com/%(name)s') + target = {'a': 'some_string', + 'name': 'target', + 'b': 'test data'} + self.assertTrue(check(target, + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer)) @@ -39,6 +39,10 @@ console_scripts = oslopolicy-policy-generator = oslo_policy.generator:generate_policy oslopolicy-list-redundant = oslo_policy.generator:list_redundant +oslo.policy.rule_checks = + http = oslo_policy._external:HttpCheck + https = oslo_policy._external:HttpsCheck + [build_sphinx] all-files = 1 warning-is-error = 1 |