summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_policy/_checks.py121
-rw-r--r--oslo_policy/_external.py111
-rw-r--r--oslo_policy/_parser.py5
-rw-r--r--oslo_policy/fixture.py4
-rw-r--r--oslo_policy/tests/test_checks.py291
-rw-r--r--oslo_policy/tests/test_external.py310
-rw-r--r--setup.cfg4
7 files changed, 444 insertions, 402 deletions
diff --git a/oslo_policy/_checks.py b/oslo_policy/_checks.py
index c426285..c3031cd 100644
--- a/oslo_policy/_checks.py
+++ b/oslo_policy/_checks.py
@@ -17,19 +17,25 @@
import abc
import ast
-import contextlib
-import copy
import inspect
-import os
-from oslo_serialization import jsonutils
-import requests
import six
+import stevedore
-from oslo_policy._i18n import _
+registered_checks = {}
+extension_checks = None
-registered_checks = {}
+def get_extensions():
+ global extension_checks
+ if extension_checks is None:
+ em = stevedore.ExtensionManager('oslo.policy.rule_checks',
+ invoke_on_load=False)
+ extension_checks = {
+ extension.name: extension.plugin
+ for extension in em
+ }
+ return extension_checks
def _check(rule, target, creds, enforcer, current_rule):
@@ -276,107 +282,6 @@ class RoleCheck(Check):
return False
-@register('http')
-class HttpCheck(Check):
- """Check ``http:`` rules by calling to a remote server.
-
- This example implementation simply verifies that the response
- is exactly ``True``.
- """
-
- def __call__(self, target, creds, enforcer, current_rule=None):
- url = ('http:' + self.match) % target
-
- # Convert instances of object() in target temporarily to
- # empty dict to avoid circular reference detection
- # errors in jsonutils.dumps().
- temp_target = copy.deepcopy(target)
- for key in target.keys():
- element = target.get(key)
- if type(element) is object:
- temp_target[key] = {}
-
- data = json = None
- if (enforcer.conf.oslo_policy.remote_content_type ==
- 'application/x-www-form-urlencoded'):
- data = {'rule': jsonutils.dumps(current_rule),
- 'target': jsonutils.dumps(temp_target),
- 'credentials': jsonutils.dumps(creds)}
- else:
- json = {'rule': current_rule,
- 'target': temp_target,
- 'credentials': creds}
-
- with contextlib.closing(requests.post(url, json=json, data=data)) as r:
- return r.text.lstrip('"').rstrip('"') == 'True'
-
-
-@register('https')
-class HttpsCheck(Check):
- """Check ``https:`` rules by calling to a remote server.
-
- This example implementation simply verifies that the response
- is exactly ``True``.
- """
-
- def __call__(self, target, creds, enforcer, current_rule=None):
- url = ('https:' + self.match) % target
-
- cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
- key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
- ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
- verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
-
- if cert_file:
- if not os.path.exists(cert_file):
- raise RuntimeError(
- _("Unable to find ssl cert_file : %s") % cert_file)
- if not os.access(cert_file, os.R_OK):
- raise RuntimeError(
- _("Unable to access ssl cert_file : %s") % cert_file)
- if key_file:
- if not os.path.exists(key_file):
- raise RuntimeError(
- _("Unable to find ssl key_file : %s") % key_file)
- if not os.access(key_file, os.R_OK):
- raise RuntimeError(
- _("Unable to access ssl key_file : %s") % key_file)
- cert = (cert_file, key_file)
- if verify_server:
- if ca_crt_file:
- if not os.path.exists(ca_crt_file):
- raise RuntimeError(
- _("Unable to find ca cert_file : %s") % ca_crt_file)
- verify_server = ca_crt_file
-
- # Convert instances of object() in target temporarily to
- # empty dict to avoid circular reference detection
- # errors in jsonutils.dumps().
- temp_target = copy.deepcopy(target)
- for key in target.keys():
- element = target.get(key)
- if type(element) is object:
- temp_target[key] = {}
-
- data = json = None
- if (enforcer.conf.oslo_policy.remote_content_type ==
- 'application/x-www-form-urlencoded'):
- data = {'rule': jsonutils.dumps(current_rule),
- 'target': jsonutils.dumps(temp_target),
- 'credentials': jsonutils.dumps(creds)}
- else:
- json = {'rule': current_rule,
- 'target': temp_target,
- 'credentials': creds}
-
- with contextlib.closing(
- requests.post(url, json=json,
- data=data, cert=cert,
- verify=verify_server)
- ) as r:
- return r.text.lstrip('"').rstrip('"') == 'True'
-
-
@register(None)
class GenericCheck(Check):
"""Check an individual match.
diff --git a/oslo_policy/_external.py b/oslo_policy/_external.py
new file mode 100644
index 0000000..928e667
--- /dev/null
+++ b/oslo_policy/_external.py
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import copy
+import os
+
+from oslo_policy import _checks
+from oslo_policy._i18n import _
+from oslo_serialization import jsonutils
+import requests
+
+
+class HttpCheck(_checks.Check):
+ """Check ``http:`` rules by calling to a remote server.
+
+ This example implementation simply verifies that the response
+ is exactly ``True``.
+ """
+
+ def __call__(self, target, creds, enforcer, current_rule=None):
+ url = ('http:' + self.match) % target
+ data, json = self._construct_payload(creds, current_rule,
+ enforcer, target)
+ with contextlib.closing(
+ requests.post(url, json=json, data=data)
+ ) as r:
+ return r.text.lstrip('"').rstrip('"') == 'True'
+
+ @staticmethod
+ def _construct_payload(creds, current_rule, enforcer, target):
+ # Convert instances of object() in target temporarily to
+ # empty dict to avoid circular reference detection
+ # errors in jsonutils.dumps().
+ temp_target = copy.deepcopy(target)
+ for key in target.keys():
+ element = target.get(key)
+ if type(element) is object:
+ temp_target[key] = {}
+ data = json = None
+ if (enforcer.conf.oslo_policy.remote_content_type ==
+ 'application/x-www-form-urlencoded'):
+ data = {'rule': jsonutils.dumps(current_rule),
+ 'target': jsonutils.dumps(temp_target),
+ 'credentials': jsonutils.dumps(creds)}
+ else:
+ json = {'rule': current_rule,
+ 'target': temp_target,
+ 'credentials': creds}
+ return data, json
+
+
+class HttpsCheck(HttpCheck):
+ """Check ``https:`` rules by calling to a remote server.
+
+ This example implementation simply verifies that the response
+ is exactly ``True``.
+ """
+
+ def __call__(self, target, creds, enforcer, current_rule=None):
+ url = ('https:' + self.match) % target
+
+ cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
+ key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
+ ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
+ verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
+
+ if cert_file:
+ if not os.path.exists(cert_file):
+ raise RuntimeError(
+ _("Unable to find ssl cert_file : %s") % cert_file)
+ if not os.access(cert_file, os.R_OK):
+ raise RuntimeError(
+ _("Unable to access ssl cert_file : %s") % cert_file)
+ if key_file:
+ if not os.path.exists(key_file):
+ raise RuntimeError(
+ _("Unable to find ssl key_file : %s") % key_file)
+ if not os.access(key_file, os.R_OK):
+ raise RuntimeError(
+ _("Unable to access ssl key_file : %s") % key_file)
+ cert = (cert_file, key_file)
+ if verify_server:
+ if ca_crt_file:
+ if not os.path.exists(ca_crt_file):
+ raise RuntimeError(
+ _("Unable to find ca cert_file : %s") % ca_crt_file)
+ verify_server = ca_crt_file
+
+ data, json = self._construct_payload(creds, current_rule,
+ enforcer, target)
+ with contextlib.closing(
+ requests.post(url, json=json,
+ data=data, cert=cert,
+ verify=verify_server)
+ ) as r:
+ return r.text.lstrip('"').rstrip('"') == 'True'
diff --git a/oslo_policy/_parser.py b/oslo_policy/_parser.py
index e85c81f..06dae07 100644
--- a/oslo_policy/_parser.py
+++ b/oslo_policy/_parser.py
@@ -216,7 +216,10 @@ def _parse_check(rule):
return _checks.FalseCheck()
# Find what implements the check
- if kind in _checks.registered_checks:
+ extension_checks = _checks.get_extensions()
+ if kind in extension_checks:
+ return extension_checks[kind](kind, match)
+ elif kind in _checks.registered_checks:
return _checks.registered_checks[kind](kind, match)
elif None in _checks.registered_checks:
return _checks.registered_checks[None](kind, match)
diff --git a/oslo_policy/fixture.py b/oslo_policy/fixture.py
index eaab5ee..e0b5107 100644
--- a/oslo_policy/fixture.py
+++ b/oslo_policy/fixture.py
@@ -36,7 +36,7 @@ class HttpCheckFixture(fixtures.Fixture):
self.useFixture(
fixtures.MonkeyPatch(
- 'oslo_policy._checks.HttpCheck.__call__',
+ 'oslo_policy._external.HttpCheck.__call__',
mocked_call,
)
)
@@ -63,7 +63,7 @@ class HttpsCheckFixture(fixtures.Fixture):
self.useFixture(
fixtures.MonkeyPatch(
- 'oslo_policy._checks.HttpCheck.__call__',
+ 'oslo_policy._external.HttpsCheck.__call__',
mocked_call,
)
)
diff --git a/oslo_policy/tests/test_checks.py b/oslo_policy/tests/test_checks.py
index 1e402e9..1cd08ec 100644
--- a/oslo_policy/tests/test_checks.py
+++ b/oslo_policy/tests/test_checks.py
@@ -13,15 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
-import json
import mock
-from oslo_serialization import jsonutils
from oslotest import base as test_base
-from requests_mock.contrib import fixture as rm_fixture
-import six.moves.urllib.parse as urlparse
from oslo_policy import _checks
-from oslo_policy import opts
from oslo_policy.tests import base
from oslo_policy.tests import token_fixture
@@ -96,292 +91,6 @@ class RoleCheckTestCase(base.PolicyBaseTestCase):
self.assertFalse(check({}, {}, self.enforcer))
-class HttpCheckTestCase(base.PolicyBaseTestCase):
-
- def setUp(self):
- super(HttpCheckTestCase, self).setUp()
- opts._register(self.conf)
- self.requests_mock = self.useFixture(rm_fixture.Fixture())
-
- def decode_post_data(self, post_data):
- result = {}
- for item in post_data.split('&'):
- key, _sep, value = item.partition('=')
- result[key] = jsonutils.loads(urlparse.unquote_plus(value))
- return result
-
- def test_accept(self):
- self.requests_mock.post('http://example.com/target', text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('application/x-www-form-urlencoded',
- last_request.headers['Content-Type'])
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(target=target_dict, credentials=cred_dict,
- rule=None),
- self.decode_post_data(last_request.body))
-
- def test_accept_json(self):
- self.conf.set_override('remote_content_type', 'application/json',
- group='oslo_policy')
- self.requests_mock.post('http://example.com/target', text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('application/json',
- last_request.headers['Content-Type'])
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- credentials=cred_dict,
- target=target_dict),
- json.loads(last_request.body.decode('utf-8')))
-
- def test_reject(self):
- self.requests_mock.post("http://example.com/target", text='other')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertFalse(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(target=target_dict, credentials=cred_dict,
- rule=None),
- self.decode_post_data(last_request.body))
-
- def test_http_with_objects_in_target(self):
- self.requests_mock.post("http://example.com/target", text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
- target = {'a': object(),
- 'name': 'target',
- 'b': 'test data'}
- self.assertTrue(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer))
-
- def test_http_with_strings_in_target(self):
- self.requests_mock.post("http://example.com/target", text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
- target = {'a': 'some_string',
- 'name': 'target',
- 'b': 'test data'}
- self.assertTrue(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer))
-
- def test_accept_with_rule_in_argument(self):
- self.requests_mock.post('http://example.com/target', text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- current_rule = "a_rule"
- self.assertTrue(check(target_dict, cred_dict, self.enforcer,
- current_rule))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(target=target_dict, credentials=cred_dict,
- rule=current_rule),
- self.decode_post_data(last_request.body))
-
- def test_reject_with_rule_in_argument(self):
- self.requests_mock.post("http://example.com/target", text='other')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- current_rule = "a_rule"
- self.assertFalse(check(target_dict, cred_dict, self.enforcer,
- current_rule))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(target=target_dict, credentials=cred_dict,
- rule=current_rule),
- self.decode_post_data(last_request.body))
-
-
-class HttpsCheckTestCase(base.PolicyBaseTestCase):
-
- def setUp(self):
- super(HttpsCheckTestCase, self).setUp()
- opts._register(self.conf)
- self.requests_mock = self.useFixture(rm_fixture.Fixture())
-
- def decode_post_data(self, post_data):
- result = {}
- for item in post_data.split('&'):
- key, _sep, value = item.partition('=')
- result[key] = jsonutils.loads(urlparse.unquote_plus(value))
- return result
-
- def test_https_accept(self):
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('application/x-www-form-urlencoded',
- last_request.headers['Content-Type'])
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_accept_json(self):
- self.conf.set_override('remote_content_type', 'application/json',
- group='oslo_policy')
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('application/json',
- last_request.headers['Content-Type'])
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- json.loads(last_request.body.decode('utf-8')))
-
- def test_https_accept_with_verify(self):
- self.conf.set_override('remote_ssl_verify_server_crt', True,
- group='oslo_policy')
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual(True, last_request.verify)
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_accept_with_verify_cert(self):
- self.conf.set_override('remote_ssl_verify_server_crt', True,
- group='oslo_policy')
- self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
- group='oslo_policy')
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- with mock.patch('os.path.exists') as path_exists:
- path_exists.return_value = True
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('ca.crt', last_request.verify)
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_accept_with_verify_and_client_certs(self):
- self.conf.set_override('remote_ssl_verify_server_crt', True,
- group='oslo_policy')
- self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
- group='oslo_policy')
- self.conf.set_override('remote_ssl_client_key_file', "client.key",
- group='oslo_policy')
- self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
- group='oslo_policy')
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- with mock.patch('os.path.exists') as path_exists:
- with mock.patch('os.access') as os_access:
- path_exists.return_value = True
- os_access.return_value = True
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('ca.crt', last_request.verify)
- self.assertEqual(('client.crt', 'client.key'), last_request.cert)
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_reject(self):
- self.requests_mock.post("https://example.com/target", text='other')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertFalse(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_with_objects_in_target(self):
- self.requests_mock.post("https://example.com/target", text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
- target = {'a': object(),
- 'name': 'target',
- 'b': 'test data'}
- self.assertTrue(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer))
-
- def test_https_with_strings_in_target(self):
- self.requests_mock.post("https://example.com/target", text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
- target = {'a': 'some_string',
- 'name': 'target',
- 'b': 'test data'}
- self.assertTrue(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer))
-
-
class GenericCheckTestCase(base.PolicyBaseTestCase):
def test_no_cred(self):
check = _checks.GenericCheck('name', '%(name)s')
diff --git a/oslo_policy/tests/test_external.py b/oslo_policy/tests/test_external.py
new file mode 100644
index 0000000..92ff539
--- /dev/null
+++ b/oslo_policy/tests/test_external.py
@@ -0,0 +1,310 @@
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import mock
+from oslo_serialization import jsonutils
+from requests_mock.contrib import fixture as rm_fixture
+import six.moves.urllib.parse as urlparse
+
+from oslo_policy import _external
+from oslo_policy import opts
+from oslo_policy.tests import base
+
+
+class HttpCheckTestCase(base.PolicyBaseTestCase):
+
+ def setUp(self):
+ super(HttpCheckTestCase, self).setUp()
+ opts._register(self.conf)
+ self.requests_mock = self.useFixture(rm_fixture.Fixture())
+
+ def decode_post_data(self, post_data):
+ result = {}
+ for item in post_data.split('&'):
+ key, _sep, value = item.partition('=')
+ result[key] = jsonutils.loads(urlparse.unquote_plus(value))
+ return result
+
+ def test_accept(self):
+ self.requests_mock.post('http://example.com/target', text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('application/x-www-form-urlencoded',
+ last_request.headers['Content-Type'])
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(target=target_dict, credentials=cred_dict,
+ rule=None),
+ self.decode_post_data(last_request.body))
+
+ def test_accept_json(self):
+ self.conf.set_override('remote_content_type', 'application/json',
+ group='oslo_policy')
+ self.requests_mock.post('http://example.com/target', text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('application/json',
+ last_request.headers['Content-Type'])
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ credentials=cred_dict,
+ target=target_dict),
+ json.loads(last_request.body.decode('utf-8')))
+
+ def test_reject(self):
+ self.requests_mock.post("http://example.com/target", text='other')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertFalse(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(target=target_dict, credentials=cred_dict,
+ rule=None),
+ self.decode_post_data(last_request.body))
+
+ def test_http_with_objects_in_target(self):
+ self.requests_mock.post("http://example.com/target", text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+ target = {'a': object(),
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertTrue(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer))
+
+ def test_http_with_strings_in_target(self):
+ self.requests_mock.post("http://example.com/target", text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+ target = {'a': 'some_string',
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertTrue(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer))
+
+ def test_accept_with_rule_in_argument(self):
+ self.requests_mock.post('http://example.com/target', text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ current_rule = "a_rule"
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer,
+ current_rule))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(target=target_dict, credentials=cred_dict,
+ rule=current_rule),
+ self.decode_post_data(last_request.body))
+
+ def test_reject_with_rule_in_argument(self):
+ self.requests_mock.post("http://example.com/target", text='other')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ current_rule = "a_rule"
+ self.assertFalse(check(target_dict, cred_dict, self.enforcer,
+ current_rule))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(target=target_dict, credentials=cred_dict,
+ rule=current_rule),
+ self.decode_post_data(last_request.body))
+
+
+class HttpsCheckTestCase(base.PolicyBaseTestCase):
+
+ def setUp(self):
+ super(HttpsCheckTestCase, self).setUp()
+ opts._register(self.conf)
+ self.requests_mock = self.useFixture(rm_fixture.Fixture())
+
+ def decode_post_data(self, post_data):
+ result = {}
+ for item in post_data.split('&'):
+ key, _sep, value = item.partition('=')
+ result[key] = jsonutils.loads(urlparse.unquote_plus(value))
+ return result
+
+ def test_https_accept(self):
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('application/x-www-form-urlencoded',
+ last_request.headers['Content-Type'])
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_accept_json(self):
+ self.conf.set_override('remote_content_type', 'application/json',
+ group='oslo_policy')
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('application/json',
+ last_request.headers['Content-Type'])
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ json.loads(last_request.body.decode('utf-8')))
+
+ def test_https_accept_with_verify(self):
+ self.conf.set_override('remote_ssl_verify_server_crt', True,
+ group='oslo_policy')
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual(True, last_request.verify)
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_accept_with_verify_cert(self):
+ self.conf.set_override('remote_ssl_verify_server_crt', True,
+ group='oslo_policy')
+ self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
+ group='oslo_policy')
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ with mock.patch('os.path.exists') as path_exists:
+ path_exists.return_value = True
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('ca.crt', last_request.verify)
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_accept_with_verify_and_client_certs(self):
+ self.conf.set_override('remote_ssl_verify_server_crt', True,
+ group='oslo_policy')
+ self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
+ group='oslo_policy')
+ self.conf.set_override('remote_ssl_client_key_file', "client.key",
+ group='oslo_policy')
+ self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
+ group='oslo_policy')
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ with mock.patch('os.path.exists') as path_exists:
+ with mock.patch('os.access') as os_access:
+ path_exists.return_value = True
+ os_access.return_value = True
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('ca.crt', last_request.verify)
+ self.assertEqual(('client.crt', 'client.key'), last_request.cert)
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_reject(self):
+ self.requests_mock.post("https://example.com/target", text='other')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertFalse(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_with_objects_in_target(self):
+ self.requests_mock.post("https://example.com/target", text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+ target = {'a': object(),
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertTrue(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer))
+
+ def test_https_with_strings_in_target(self):
+ self.requests_mock.post("https://example.com/target", text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+ target = {'a': 'some_string',
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertTrue(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer))
diff --git a/setup.cfg b/setup.cfg
index dc076d3..0f7b4f0 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -39,6 +39,10 @@ console_scripts =
oslopolicy-policy-generator = oslo_policy.generator:generate_policy
oslopolicy-list-redundant = oslo_policy.generator:list_redundant
+oslo.policy.rule_checks =
+ http = oslo_policy._external:HttpCheck
+ https = oslo_policy._external:HttpsCheck
+
[build_sphinx]
all-files = 1
warning-is-error = 1