summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavanum Srinivas <davanum@gmail.com>2017-09-25 09:25:53 -0400
committerDavanum Srinivas <davanum@gmail.com>2017-10-05 08:40:47 -0400
commit7e185ad96b043c0cb450a6dfc6245db7aa4a0fd6 (patch)
tree962d2b48170b0b6203015cb78682aa8670f5a6e3
parent89d226916c6165cbc4ef116c7ad4ce602f010181 (diff)
downloadoslo-policy-7e185ad96b043c0cb450a6dfc6245db7aa4a0fd6.tar.gz
http/https check rules as stevedore extensions
Why? HttpCheck/HttpsCheck are examples of rule checks that can be implemented outside of the oslo.policy library. Once we setup the infra for registering and using these as stevedore extensions, we automatically get the capability of other folks contributing to writing custom rules for their own use cases. * Add HttpCheck/HttpsCheck as entrypoints in setup.cfg * parser will check get_extensions() to see if there are any external checks registered * Move HttpCheck/HttpsCheck into external module * Move related test cases to test_external.py Change-Id: Icde2b26a38d7c7842defae053228d9208454b969
-rw-r--r--oslo_policy/_checks.py121
-rw-r--r--oslo_policy/_external.py111
-rw-r--r--oslo_policy/_parser.py5
-rw-r--r--oslo_policy/fixture.py4
-rw-r--r--oslo_policy/tests/test_checks.py291
-rw-r--r--oslo_policy/tests/test_external.py310
-rw-r--r--setup.cfg4
7 files changed, 444 insertions, 402 deletions
diff --git a/oslo_policy/_checks.py b/oslo_policy/_checks.py
index c426285..c3031cd 100644
--- a/oslo_policy/_checks.py
+++ b/oslo_policy/_checks.py
@@ -17,19 +17,25 @@
import abc
import ast
-import contextlib
-import copy
import inspect
-import os
-from oslo_serialization import jsonutils
-import requests
import six
+import stevedore
-from oslo_policy._i18n import _
+registered_checks = {}
+extension_checks = None
-registered_checks = {}
+def get_extensions():
+ global extension_checks
+ if extension_checks is None:
+ em = stevedore.ExtensionManager('oslo.policy.rule_checks',
+ invoke_on_load=False)
+ extension_checks = {
+ extension.name: extension.plugin
+ for extension in em
+ }
+ return extension_checks
def _check(rule, target, creds, enforcer, current_rule):
@@ -276,107 +282,6 @@ class RoleCheck(Check):
return False
-@register('http')
-class HttpCheck(Check):
- """Check ``http:`` rules by calling to a remote server.
-
- This example implementation simply verifies that the response
- is exactly ``True``.
- """
-
- def __call__(self, target, creds, enforcer, current_rule=None):
- url = ('http:' + self.match) % target
-
- # Convert instances of object() in target temporarily to
- # empty dict to avoid circular reference detection
- # errors in jsonutils.dumps().
- temp_target = copy.deepcopy(target)
- for key in target.keys():
- element = target.get(key)
- if type(element) is object:
- temp_target[key] = {}
-
- data = json = None
- if (enforcer.conf.oslo_policy.remote_content_type ==
- 'application/x-www-form-urlencoded'):
- data = {'rule': jsonutils.dumps(current_rule),
- 'target': jsonutils.dumps(temp_target),
- 'credentials': jsonutils.dumps(creds)}
- else:
- json = {'rule': current_rule,
- 'target': temp_target,
- 'credentials': creds}
-
- with contextlib.closing(requests.post(url, json=json, data=data)) as r:
- return r.text.lstrip('"').rstrip('"') == 'True'
-
-
-@register('https')
-class HttpsCheck(Check):
- """Check ``https:`` rules by calling to a remote server.
-
- This example implementation simply verifies that the response
- is exactly ``True``.
- """
-
- def __call__(self, target, creds, enforcer, current_rule=None):
- url = ('https:' + self.match) % target
-
- cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
- key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
- ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
- verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
-
- if cert_file:
- if not os.path.exists(cert_file):
- raise RuntimeError(
- _("Unable to find ssl cert_file : %s") % cert_file)
- if not os.access(cert_file, os.R_OK):
- raise RuntimeError(
- _("Unable to access ssl cert_file : %s") % cert_file)
- if key_file:
- if not os.path.exists(key_file):
- raise RuntimeError(
- _("Unable to find ssl key_file : %s") % key_file)
- if not os.access(key_file, os.R_OK):
- raise RuntimeError(
- _("Unable to access ssl key_file : %s") % key_file)
- cert = (cert_file, key_file)
- if verify_server:
- if ca_crt_file:
- if not os.path.exists(ca_crt_file):
- raise RuntimeError(
- _("Unable to find ca cert_file : %s") % ca_crt_file)
- verify_server = ca_crt_file
-
- # Convert instances of object() in target temporarily to
- # empty dict to avoid circular reference detection
- # errors in jsonutils.dumps().
- temp_target = copy.deepcopy(target)
- for key in target.keys():
- element = target.get(key)
- if type(element) is object:
- temp_target[key] = {}
-
- data = json = None
- if (enforcer.conf.oslo_policy.remote_content_type ==
- 'application/x-www-form-urlencoded'):
- data = {'rule': jsonutils.dumps(current_rule),
- 'target': jsonutils.dumps(temp_target),
- 'credentials': jsonutils.dumps(creds)}
- else:
- json = {'rule': current_rule,
- 'target': temp_target,
- 'credentials': creds}
-
- with contextlib.closing(
- requests.post(url, json=json,
- data=data, cert=cert,
- verify=verify_server)
- ) as r:
- return r.text.lstrip('"').rstrip('"') == 'True'
-
-
@register(None)
class GenericCheck(Check):
"""Check an individual match.
diff --git a/oslo_policy/_external.py b/oslo_policy/_external.py
new file mode 100644
index 0000000..928e667
--- /dev/null
+++ b/oslo_policy/_external.py
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import copy
+import os
+
+from oslo_policy import _checks
+from oslo_policy._i18n import _
+from oslo_serialization import jsonutils
+import requests
+
+
+class HttpCheck(_checks.Check):
+ """Check ``http:`` rules by calling to a remote server.
+
+ This example implementation simply verifies that the response
+ is exactly ``True``.
+ """
+
+ def __call__(self, target, creds, enforcer, current_rule=None):
+ url = ('http:' + self.match) % target
+ data, json = self._construct_payload(creds, current_rule,
+ enforcer, target)
+ with contextlib.closing(
+ requests.post(url, json=json, data=data)
+ ) as r:
+ return r.text.lstrip('"').rstrip('"') == 'True'
+
+ @staticmethod
+ def _construct_payload(creds, current_rule, enforcer, target):
+ # Convert instances of object() in target temporarily to
+ # empty dict to avoid circular reference detection
+ # errors in jsonutils.dumps().
+ temp_target = copy.deepcopy(target)
+ for key in target.keys():
+ element = target.get(key)
+ if type(element) is object:
+ temp_target[key] = {}
+ data = json = None
+ if (enforcer.conf.oslo_policy.remote_content_type ==
+ 'application/x-www-form-urlencoded'):
+ data = {'rule': jsonutils.dumps(current_rule),
+ 'target': jsonutils.dumps(temp_target),
+ 'credentials': jsonutils.dumps(creds)}
+ else:
+ json = {'rule': current_rule,
+ 'target': temp_target,
+ 'credentials': creds}
+ return data, json
+
+
+class HttpsCheck(HttpCheck):
+ """Check ``https:`` rules by calling to a remote server.
+
+ This example implementation simply verifies that the response
+ is exactly ``True``.
+ """
+
+ def __call__(self, target, creds, enforcer, current_rule=None):
+ url = ('https:' + self.match) % target
+
+ cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
+ key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
+ ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
+ verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
+
+ if cert_file:
+ if not os.path.exists(cert_file):
+ raise RuntimeError(
+ _("Unable to find ssl cert_file : %s") % cert_file)
+ if not os.access(cert_file, os.R_OK):
+ raise RuntimeError(
+ _("Unable to access ssl cert_file : %s") % cert_file)
+ if key_file:
+ if not os.path.exists(key_file):
+ raise RuntimeError(
+ _("Unable to find ssl key_file : %s") % key_file)
+ if not os.access(key_file, os.R_OK):
+ raise RuntimeError(
+ _("Unable to access ssl key_file : %s") % key_file)
+ cert = (cert_file, key_file)
+ if verify_server:
+ if ca_crt_file:
+ if not os.path.exists(ca_crt_file):
+ raise RuntimeError(
+ _("Unable to find ca cert_file : %s") % ca_crt_file)
+ verify_server = ca_crt_file
+
+ data, json = self._construct_payload(creds, current_rule,
+ enforcer, target)
+ with contextlib.closing(
+ requests.post(url, json=json,
+ data=data, cert=cert,
+ verify=verify_server)
+ ) as r:
+ return r.text.lstrip('"').rstrip('"') == 'True'
diff --git a/oslo_policy/_parser.py b/oslo_policy/_parser.py
index e85c81f..06dae07 100644
--- a/oslo_policy/_parser.py
+++ b/oslo_policy/_parser.py
@@ -216,7 +216,10 @@ def _parse_check(rule):
return _checks.FalseCheck()
# Find what implements the check
- if kind in _checks.registered_checks:
+ extension_checks = _checks.get_extensions()
+ if kind in extension_checks:
+ return extension_checks[kind](kind, match)
+ elif kind in _checks.registered_checks:
return _checks.registered_checks[kind](kind, match)
elif None in _checks.registered_checks:
return _checks.registered_checks[None](kind, match)
diff --git a/oslo_policy/fixture.py b/oslo_policy/fixture.py
index eaab5ee..e0b5107 100644
--- a/oslo_policy/fixture.py
+++ b/oslo_policy/fixture.py
@@ -36,7 +36,7 @@ class HttpCheckFixture(fixtures.Fixture):
self.useFixture(
fixtures.MonkeyPatch(
- 'oslo_policy._checks.HttpCheck.__call__',
+ 'oslo_policy._external.HttpCheck.__call__',
mocked_call,
)
)
@@ -63,7 +63,7 @@ class HttpsCheckFixture(fixtures.Fixture):
self.useFixture(
fixtures.MonkeyPatch(
- 'oslo_policy._checks.HttpCheck.__call__',
+ 'oslo_policy._external.HttpsCheck.__call__',
mocked_call,
)
)
diff --git a/oslo_policy/tests/test_checks.py b/oslo_policy/tests/test_checks.py
index 1e402e9..1cd08ec 100644
--- a/oslo_policy/tests/test_checks.py
+++ b/oslo_policy/tests/test_checks.py
@@ -13,15 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
-import json
import mock
-from oslo_serialization import jsonutils
from oslotest import base as test_base
-from requests_mock.contrib import fixture as rm_fixture
-import six.moves.urllib.parse as urlparse
from oslo_policy import _checks
-from oslo_policy import opts
from oslo_policy.tests import base
from oslo_policy.tests import token_fixture
@@ -96,292 +91,6 @@ class RoleCheckTestCase(base.PolicyBaseTestCase):
self.assertFalse(check({}, {}, self.enforcer))
-class HttpCheckTestCase(base.PolicyBaseTestCase):
-
- def setUp(self):
- super(HttpCheckTestCase, self).setUp()
- opts._register(self.conf)
- self.requests_mock = self.useFixture(rm_fixture.Fixture())
-
- def decode_post_data(self, post_data):
- result = {}
- for item in post_data.split('&'):
- key, _sep, value = item.partition('=')
- result[key] = jsonutils.loads(urlparse.unquote_plus(value))
- return result
-
- def test_accept(self):
- self.requests_mock.post('http://example.com/target', text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('application/x-www-form-urlencoded',
- last_request.headers['Content-Type'])
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(target=target_dict, credentials=cred_dict,
- rule=None),
- self.decode_post_data(last_request.body))
-
- def test_accept_json(self):
- self.conf.set_override('remote_content_type', 'application/json',
- group='oslo_policy')
- self.requests_mock.post('http://example.com/target', text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('application/json',
- last_request.headers['Content-Type'])
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- credentials=cred_dict,
- target=target_dict),
- json.loads(last_request.body.decode('utf-8')))
-
- def test_reject(self):
- self.requests_mock.post("http://example.com/target", text='other')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertFalse(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(target=target_dict, credentials=cred_dict,
- rule=None),
- self.decode_post_data(last_request.body))
-
- def test_http_with_objects_in_target(self):
- self.requests_mock.post("http://example.com/target", text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
- target = {'a': object(),
- 'name': 'target',
- 'b': 'test data'}
- self.assertTrue(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer))
-
- def test_http_with_strings_in_target(self):
- self.requests_mock.post("http://example.com/target", text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
- target = {'a': 'some_string',
- 'name': 'target',
- 'b': 'test data'}
- self.assertTrue(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer))
-
- def test_accept_with_rule_in_argument(self):
- self.requests_mock.post('http://example.com/target', text='True')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- current_rule = "a_rule"
- self.assertTrue(check(target_dict, cred_dict, self.enforcer,
- current_rule))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(target=target_dict, credentials=cred_dict,
- rule=current_rule),
- self.decode_post_data(last_request.body))
-
- def test_reject_with_rule_in_argument(self):
- self.requests_mock.post("http://example.com/target", text='other')
-
- check = _checks.HttpCheck('http', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- current_rule = "a_rule"
- self.assertFalse(check(target_dict, cred_dict, self.enforcer,
- current_rule))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(target=target_dict, credentials=cred_dict,
- rule=current_rule),
- self.decode_post_data(last_request.body))
-
-
-class HttpsCheckTestCase(base.PolicyBaseTestCase):
-
- def setUp(self):
- super(HttpsCheckTestCase, self).setUp()
- opts._register(self.conf)
- self.requests_mock = self.useFixture(rm_fixture.Fixture())
-
- def decode_post_data(self, post_data):
- result = {}
- for item in post_data.split('&'):
- key, _sep, value = item.partition('=')
- result[key] = jsonutils.loads(urlparse.unquote_plus(value))
- return result
-
- def test_https_accept(self):
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('application/x-www-form-urlencoded',
- last_request.headers['Content-Type'])
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_accept_json(self):
- self.conf.set_override('remote_content_type', 'application/json',
- group='oslo_policy')
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('application/json',
- last_request.headers['Content-Type'])
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- json.loads(last_request.body.decode('utf-8')))
-
- def test_https_accept_with_verify(self):
- self.conf.set_override('remote_ssl_verify_server_crt', True,
- group='oslo_policy')
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual(True, last_request.verify)
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_accept_with_verify_cert(self):
- self.conf.set_override('remote_ssl_verify_server_crt', True,
- group='oslo_policy')
- self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
- group='oslo_policy')
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- with mock.patch('os.path.exists') as path_exists:
- path_exists.return_value = True
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('ca.crt', last_request.verify)
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_accept_with_verify_and_client_certs(self):
- self.conf.set_override('remote_ssl_verify_server_crt', True,
- group='oslo_policy')
- self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
- group='oslo_policy')
- self.conf.set_override('remote_ssl_client_key_file', "client.key",
- group='oslo_policy')
- self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
- group='oslo_policy')
- self.requests_mock.post('https://example.com/target', text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- with mock.patch('os.path.exists') as path_exists:
- with mock.patch('os.access') as os_access:
- path_exists.return_value = True
- os_access.return_value = True
- self.assertTrue(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('ca.crt', last_request.verify)
- self.assertEqual(('client.crt', 'client.key'), last_request.cert)
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_reject(self):
- self.requests_mock.post("https://example.com/target", text='other')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
-
- target_dict = dict(name='target', spam='spammer')
- cred_dict = dict(user='user', roles=['a', 'b', 'c'])
- self.assertFalse(check(target_dict, cred_dict, self.enforcer))
-
- last_request = self.requests_mock.last_request
- self.assertEqual('POST', last_request.method)
- self.assertEqual(dict(rule=None,
- target=target_dict,
- credentials=cred_dict),
- self.decode_post_data(last_request.body))
-
- def test_https_with_objects_in_target(self):
- self.requests_mock.post("https://example.com/target", text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
- target = {'a': object(),
- 'name': 'target',
- 'b': 'test data'}
- self.assertTrue(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer))
-
- def test_https_with_strings_in_target(self):
- self.requests_mock.post("https://example.com/target", text='True')
-
- check = _checks.HttpsCheck('https', '//example.com/%(name)s')
- target = {'a': 'some_string',
- 'name': 'target',
- 'b': 'test data'}
- self.assertTrue(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer))
-
-
class GenericCheckTestCase(base.PolicyBaseTestCase):
def test_no_cred(self):
check = _checks.GenericCheck('name', '%(name)s')
diff --git a/oslo_policy/tests/test_external.py b/oslo_policy/tests/test_external.py
new file mode 100644
index 0000000..92ff539
--- /dev/null
+++ b/oslo_policy/tests/test_external.py
@@ -0,0 +1,310 @@
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import mock
+from oslo_serialization import jsonutils
+from requests_mock.contrib import fixture as rm_fixture
+import six.moves.urllib.parse as urlparse
+
+from oslo_policy import _external
+from oslo_policy import opts
+from oslo_policy.tests import base
+
+
+class HttpCheckTestCase(base.PolicyBaseTestCase):
+
+ def setUp(self):
+ super(HttpCheckTestCase, self).setUp()
+ opts._register(self.conf)
+ self.requests_mock = self.useFixture(rm_fixture.Fixture())
+
+ def decode_post_data(self, post_data):
+ result = {}
+ for item in post_data.split('&'):
+ key, _sep, value = item.partition('=')
+ result[key] = jsonutils.loads(urlparse.unquote_plus(value))
+ return result
+
+ def test_accept(self):
+ self.requests_mock.post('http://example.com/target', text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('application/x-www-form-urlencoded',
+ last_request.headers['Content-Type'])
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(target=target_dict, credentials=cred_dict,
+ rule=None),
+ self.decode_post_data(last_request.body))
+
+ def test_accept_json(self):
+ self.conf.set_override('remote_content_type', 'application/json',
+ group='oslo_policy')
+ self.requests_mock.post('http://example.com/target', text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('application/json',
+ last_request.headers['Content-Type'])
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ credentials=cred_dict,
+ target=target_dict),
+ json.loads(last_request.body.decode('utf-8')))
+
+ def test_reject(self):
+ self.requests_mock.post("http://example.com/target", text='other')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertFalse(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(target=target_dict, credentials=cred_dict,
+ rule=None),
+ self.decode_post_data(last_request.body))
+
+ def test_http_with_objects_in_target(self):
+ self.requests_mock.post("http://example.com/target", text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+ target = {'a': object(),
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertTrue(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer))
+
+ def test_http_with_strings_in_target(self):
+ self.requests_mock.post("http://example.com/target", text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+ target = {'a': 'some_string',
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertTrue(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer))
+
+ def test_accept_with_rule_in_argument(self):
+ self.requests_mock.post('http://example.com/target', text='True')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ current_rule = "a_rule"
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer,
+ current_rule))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(target=target_dict, credentials=cred_dict,
+ rule=current_rule),
+ self.decode_post_data(last_request.body))
+
+ def test_reject_with_rule_in_argument(self):
+ self.requests_mock.post("http://example.com/target", text='other')
+
+ check = _external.HttpCheck('http', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ current_rule = "a_rule"
+ self.assertFalse(check(target_dict, cred_dict, self.enforcer,
+ current_rule))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(target=target_dict, credentials=cred_dict,
+ rule=current_rule),
+ self.decode_post_data(last_request.body))
+
+
+class HttpsCheckTestCase(base.PolicyBaseTestCase):
+
+ def setUp(self):
+ super(HttpsCheckTestCase, self).setUp()
+ opts._register(self.conf)
+ self.requests_mock = self.useFixture(rm_fixture.Fixture())
+
+ def decode_post_data(self, post_data):
+ result = {}
+ for item in post_data.split('&'):
+ key, _sep, value = item.partition('=')
+ result[key] = jsonutils.loads(urlparse.unquote_plus(value))
+ return result
+
+ def test_https_accept(self):
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('application/x-www-form-urlencoded',
+ last_request.headers['Content-Type'])
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_accept_json(self):
+ self.conf.set_override('remote_content_type', 'application/json',
+ group='oslo_policy')
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('application/json',
+ last_request.headers['Content-Type'])
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ json.loads(last_request.body.decode('utf-8')))
+
+ def test_https_accept_with_verify(self):
+ self.conf.set_override('remote_ssl_verify_server_crt', True,
+ group='oslo_policy')
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual(True, last_request.verify)
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_accept_with_verify_cert(self):
+ self.conf.set_override('remote_ssl_verify_server_crt', True,
+ group='oslo_policy')
+ self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
+ group='oslo_policy')
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ with mock.patch('os.path.exists') as path_exists:
+ path_exists.return_value = True
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('ca.crt', last_request.verify)
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_accept_with_verify_and_client_certs(self):
+ self.conf.set_override('remote_ssl_verify_server_crt', True,
+ group='oslo_policy')
+ self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
+ group='oslo_policy')
+ self.conf.set_override('remote_ssl_client_key_file', "client.key",
+ group='oslo_policy')
+ self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
+ group='oslo_policy')
+ self.requests_mock.post('https://example.com/target', text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ with mock.patch('os.path.exists') as path_exists:
+ with mock.patch('os.access') as os_access:
+ path_exists.return_value = True
+ os_access.return_value = True
+ self.assertTrue(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('ca.crt', last_request.verify)
+ self.assertEqual(('client.crt', 'client.key'), last_request.cert)
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_reject(self):
+ self.requests_mock.post("https://example.com/target", text='other')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+
+ target_dict = dict(name='target', spam='spammer')
+ cred_dict = dict(user='user', roles=['a', 'b', 'c'])
+ self.assertFalse(check(target_dict, cred_dict, self.enforcer))
+
+ last_request = self.requests_mock.last_request
+ self.assertEqual('POST', last_request.method)
+ self.assertEqual(dict(rule=None,
+ target=target_dict,
+ credentials=cred_dict),
+ self.decode_post_data(last_request.body))
+
+ def test_https_with_objects_in_target(self):
+ self.requests_mock.post("https://example.com/target", text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+ target = {'a': object(),
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertTrue(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer))
+
+ def test_https_with_strings_in_target(self):
+ self.requests_mock.post("https://example.com/target", text='True')
+
+ check = _external.HttpsCheck('https', '//example.com/%(name)s')
+ target = {'a': 'some_string',
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertTrue(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer))
diff --git a/setup.cfg b/setup.cfg
index dc076d3..0f7b4f0 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -39,6 +39,10 @@ console_scripts =
oslopolicy-policy-generator = oslo_policy.generator:generate_policy
oslopolicy-list-redundant = oslo_policy.generator:list_redundant
+oslo.policy.rule_checks =
+ http = oslo_policy._external:HttpCheck
+ https = oslo_policy._external:HttpsCheck
+
[build_sphinx]
all-files = 1
warning-is-error = 1