diff options
Diffstat (limited to 'oslo_policy')
-rw-r--r-- | oslo_policy/generator.py | 31 | ||||
-rw-r--r-- | oslo_policy/policy.py | 443 | ||||
-rw-r--r-- | oslo_policy/tests/test_external.py | 8 | ||||
-rw-r--r-- | oslo_policy/tests/test_generator.py | 50 | ||||
-rw-r--r-- | oslo_policy/tests/test_policy.py | 175 |
5 files changed, 411 insertions, 296 deletions
diff --git a/oslo_policy/generator.py b/oslo_policy/generator.py index ac1a992..f2b6027 100644 --- a/oslo_policy/generator.py +++ b/oslo_policy/generator.py @@ -204,21 +204,32 @@ def _format_rule_default_yaml(default, include_help=True, comment_rule=True, 'reason': _format_help_text(default.deprecated_reason), 'text': text} elif add_deprecated_rules and default.deprecated_rule: + deprecated_reason = ( + default.deprecated_rule.deprecated_reason or + default.deprecated_reason + ) + deprecated_since = ( + default.deprecated_rule.deprecated_since or + default.deprecated_since + ) + # This issues a deprecation warning but aliases the old policy name # with the new policy name for compatibility. deprecated_text = ( '"%(old_name)s":"%(old_check_str)s" has been deprecated ' 'since %(since)s in favor of "%(name)s":"%(check_str)s".' - ) % {'old_name': default.deprecated_rule.name, - 'old_check_str': default.deprecated_rule.check_str, - 'since': default.deprecated_since, - 'name': default.name, - 'check_str': default.check_str, - } - text = ('%(text)s# DEPRECATED\n%(deprecated_text)s\n%(reason)s\n' % - {'text': text, - 'reason': _format_help_text(default.deprecated_reason), - 'deprecated_text': _format_help_text(deprecated_text)}) + ) % { + 'old_name': default.deprecated_rule.name, + 'old_check_str': default.deprecated_rule.check_str, + 'since': deprecated_since, + 'name': default.name, + 'check_str': default.check_str, + } + text = '%(text)s# DEPRECATED\n%(deprecated_text)s\n%(reason)s\n' % { + 'text': text, + 'reason': _format_help_text(deprecated_reason), + 'deprecated_text': _format_help_text(deprecated_text) + } if default.name != default.deprecated_rule.name: text += ('"%(old_name)s": "rule:%(name)s"\n' % diff --git a/oslo_policy/policy.py b/oslo_policy/policy.py index 903f65b..913a28a 100644 --- a/oslo_policy/policy.py +++ b/oslo_policy/policy.py @@ -225,6 +225,7 @@ import collections.abc import copy import logging import os +import typing as ty import warnings from oslo_config import cfg @@ -390,8 +391,8 @@ def parse_file_contents(data): yaml or json format. Both can be parsed as yaml. :param data: A string containing the contents of a policy file. - :returns: A dict of the form {'policy_name1': 'policy1', - 'policy_name2': 'policy2,...} + :returns: A dict of the form ``{'policy_name1': 'policy1', + 'policy_name2': 'policy2,...}`` """ try: # NOTE(snikitin): jsonutils.loads() is much faster than @@ -437,7 +438,6 @@ class Rules(dict): .. warning:: This method is deprecated as of the 1.5.0 release in favor of :meth:`load` and may be removed in the 2.0 release. - """ warnings.warn( 'The load_json() method is deprecated as of the 1.5.0 release in ' @@ -501,24 +501,24 @@ class Enforcer(object): """Responsible for loading and enforcing rules. :param conf: A configuration object. - :param policy_file: Custom policy file to use, if none is - specified, ``conf.oslo_policy.policy_file`` will be - used. - :param rules: Default dictionary / Rules to use. It will be - considered just in the first instantiation. If - :meth:`load_rules` with ``force_reload=True``, - :meth:`clear` or :meth:`set_rules` with ``overwrite=True`` - is called this will be overwritten. - :param default_rule: Default rule to use, conf.default_rule will - be used if none is specified. + :param policy_file: Custom policy file to use, if none is specified, + ``conf.oslo_policy.policy_file`` will be used. + :param rules: Default dictionary / Rules to use. It will be considered just + in the first instantiation. If :meth:`load_rules` with + ``force_reload=True``, :meth:`clear` or :meth:`set_rules` with + ``overwrite=True`` is called this will be overwritten. + :param default_rule: Default rule to use, conf.default_rule will be used if + none is specified. :param use_conf: Whether to load rules from cache or config file. :param overwrite: Whether to overwrite existing rules when reload rules - from config file. + from config file. """ - def __init__(self, conf, policy_file=None, rules=None, - default_rule=None, use_conf=True, overwrite=True, - fallback_to_json_file=True): + def __init__( + self, conf, policy_file=None, rules=None, + default_rule=None, use_conf=True, overwrite=True, + fallback_to_json_file=True, + ): self.conf = conf opts._register(conf) @@ -551,9 +551,9 @@ class Enforcer(object): def set_rules(self, rules, overwrite=True, use_conf=False): """Create a new :class:`Rules` based on the provided dict of rules. - :param dict rules: New rules to use. + :param rules: New rules to use. :param overwrite: Whether to overwrite current rules or update them - with the new rules. + with the new rules. :param use_conf: Whether to reload rules from cache or config file. """ @@ -637,11 +637,15 @@ class Enforcer(object): for default in self.registered_rules.values(): if default.deprecated_for_removal: self._emit_deprecated_for_removal_warning(default) - elif default.deprecated_rule: - self._handle_deprecated_rule(default) - if default.name not in self.rules: - self.rules[default.name] = default.check + if default.name in self.rules: + continue + + check = default.check + if default.deprecated_rule: + check = self._handle_deprecated_rule(default) + + self.rules[default.name] = check # Detect and log obvious incorrect rule definitions if self._need_check_rule: @@ -695,15 +699,14 @@ class Enforcer(object): """Handle cases where a policy rule has been deprecated. :param default: an instance of RuleDefault that contains an instance of - DeprecatedRule + DeprecatedRule """ - # Short-circuit the rule deprecation logic if we've already processed - # it for this particular rule. - if default._deprecated_rule_handled: - return - deprecated_rule = default.deprecated_rule + deprecated_reason = ( + deprecated_rule.deprecated_reason or default.deprecated_reason) + deprecated_since = ( + deprecated_rule.deprecated_since or default.deprecated_since) deprecated_msg = ( 'Policy "%(old_name)s":"%(old_check_str)s" was deprecated in ' @@ -713,10 +716,10 @@ class Enforcer(object): 'file and maintain it manually.' % { 'old_name': deprecated_rule.name, 'old_check_str': deprecated_rule.check_str, - 'release': default.deprecated_since, + 'release': deprecated_since, 'name': default.name, 'check_str': default.check_str, - 'reason': default.deprecated_reason + 'reason': deprecated_reason, } ) @@ -725,9 +728,10 @@ class Enforcer(object): # renamed to foo:create_bar then they need to be able to see that # before they roll out the next release. If the policy name is in # self.file_rules, we know that it's being overridden. - if deprecated_rule.name != default.name and ( - deprecated_rule.name in self.file_rules): - + if ( + deprecated_rule.name != default.name and + deprecated_rule.name in self.file_rules + ): if not self.suppress_deprecation_warnings: warnings.warn(deprecated_msg) @@ -739,15 +743,13 @@ class Enforcer(object): # we set the deprecated rule name to reference the new rule. If we # see that the deprecated override rule is just the new rule, then # we shouldn't mess with it. - if (self.file_rules[deprecated_rule.name].check - != _parser.parse_rule(deprecated_rule.check_str) and - str(self.file_rules[deprecated_rule.name].check) - != 'rule:%s' % default.name): - if default.name not in self.file_rules.keys(): - self.rules[default.name] = self.file_rules[ - deprecated_rule.name - ].check - default._deprecated_rule_handled = True + file_rule = self.file_rules[deprecated_rule.name] + if ( + file_rule.check != deprecated_rule.check and + str(file_rule.check) != 'rule:%s' % default.name and + default.name not in self.file_rules.keys() + ): + return self.file_rules[deprecated_rule.name].check # In this case, the default check string is changing. We need to let # operators know that this is going to change. If they don't want to @@ -761,20 +763,23 @@ class Enforcer(object): # support the new policy. # If the enforce_new_defaults flag is True, do not add OrCheck to the # old check_str and enforce only the new defaults. - if (not self.conf.oslo_policy.enforce_new_defaults - and deprecated_rule.check_str != default.check_str - and default.name not in self.file_rules): - - default.check = OrCheck([_parser.parse_rule(cs) for cs in - [default.check_str, - deprecated_rule.check_str]]) - default._deprecated_rule_handled = True - if not (self.suppress_deprecation_warnings - or self.suppress_default_change_warnings): + if ( + not self.conf.oslo_policy.enforce_new_defaults + and deprecated_rule.check_str != default.check_str + and default.name not in self.file_rules + ): + if not ( + self.suppress_deprecation_warnings + or self.suppress_default_change_warnings + ): warnings.warn(deprecated_msg) + return OrCheck([default.check, deprecated_rule.check]) + + return default.check + def _undefined_check(self, check): - '''Check if a RuleCheck references an undefined rule.''' + """Check if a RuleCheck references an undefined rule.""" if isinstance(check, RuleCheck): if check.match not in self.rules: # Undefined rule @@ -790,12 +795,16 @@ class Enforcer(object): return False def _cycle_check(self, check, seen=None): - '''Check if RuleChecks cycle. + """Check if RuleChecks cycle. + + Looking for something like:: - Looking for something like: - "foo": "rule:bar" - "bar": "rule:foo" - ''' + "foo": "rule:bar" + "bar": "rule:foo" + + :param check: The check to search for. + :param seen: A set of previously seen rules, else None. + """ if seen is None: seen = set() @@ -885,11 +894,10 @@ class Enforcer(object): def _load_policy_file(self, path, force_reload, overwrite=True): """Load policy rules from the specified policy file. - :param str path: A path of the policy file to load rules from. - :param bool force_reload: Forcefully reload the policy file content. - :param bool overwrite: Replace policy rules instead of updating them. + :param path: A path of the policy file to load rules from. + :param force_reload: Forcefully reload the policy file content. + :param overwrite: Replace policy rules instead of updating them. :return: A bool indicating whether rules have been changed or not. - :rtype: bool """ rules_changed = False reloaded, data = _cache_handler.read_cached_file( @@ -907,14 +915,12 @@ class Enforcer(object): """Locate the policy YAML/JSON data file/path. :param path: It's value can be a full path or related path. When - full path specified, this function just returns the full - path. When related path specified, this function will - search configuration directories to find one that exists. + full path specified, this function just returns the full path. When + related path specified, this function will search configuration + directories to find one that exists. :returns: The policy path - - :raises: ConfigFilesNotFoundError if the file/path couldn't - be located. + :raises: ConfigFilesNotFoundError if the file/path couldn't be located. """ policy_path = self.conf.find_file(path) @@ -923,35 +929,30 @@ class Enforcer(object): raise cfg.ConfigFilesNotFoundError((path,)) - def enforce(self, rule, target, creds, do_raise=False, exc=None, - *args, **kwargs): + def enforce( + self, rule, target, creds, do_raise=False, exc=None, *args, **kwargs, + ): """Checks authorization of a rule against the target and credentials. - :param rule: The rule to evaluate. - :type rule: string or :class:`BaseCheck` - :param dict target: As much information about the object being - operated on as possible. The target - argument should be a dict instance or an - instance of a class that fully supports - the Mapping abstract base class. - :param dict creds: As much information about the user performing the - action as possible. This parameter can also be an - instance of ``oslo_context.context.RequestContext``. + :param rule: The rule to evaluate as a string or :class:`BaseCheck`. + :param target: As much information about the object being operated on + as possible. The target argument should be a dict instance or an + instance of a class that fully supports the Mapping abstract base + class. + :param creds: As much information about the user performing the action + as possible. This parameter can also be an instance of + ``oslo_context.context.RequestContext``. :param do_raise: Whether to raise an exception or not if check - fails. + fails. :param exc: Class of the exception to raise if the check fails. - Any remaining arguments passed to :meth:`enforce` (both - positional and keyword arguments) will be passed to - the exception class. If not specified, - :class:`PolicyNotAuthorized` will be used. - - :return: ``False`` if the policy does not allow the action and `exc` is - not provided; otherwise, returns a value that evaluates to - ``True``. Note: for rules using the "case" expression, this - ``True`` value will be the specified string from the - expression. + Any remaining arguments passed to :meth:`enforce` (both positional + and keyword arguments) will be passed to the exception class. If + not specified, :class:`PolicyNotAuthorized` will be used. + :return: ``False`` if the policy does not allow the action and ``exc`` + is not provided; otherwise, returns a value that evaluates to + ``True``. Note: for rules using the "case" expression, this + ``True`` value will be the specified string from the expression. """ - self.load_rules() if isinstance(creds, context.RequestContext): @@ -1132,66 +1133,85 @@ class Enforcer(object): """ if rule not in self.registered_rules: raise PolicyNotRegistered(rule) - return self.enforce(rule, target, creds, do_raise, exc, - *args, **kwargs) + return self.enforce( + rule, target, creds, do_raise, exc, *args, **kwargs) + + +class _BaseRule: + + def __init__(self, name, check_str): + self._name = name + self._check_str = check_str + self._check = _parser.parse_rule(self.check_str) + @property + def name(self): + return self._name -class RuleDefault(object): + @property + def check_str(self): + return self._check_str + + @property + def check(self): + return self._check + + def __str__(self): + return f'"{self.name}": "{self.check_str}"' + + +class RuleDefault(_BaseRule): """A class for holding policy definitions. It is required to supply a name and value at creation time. It is encouraged to also supply a description to assist operators. :param name: The name of the policy. This is used when referencing it - from another rule or during policy enforcement. + from another rule or during policy enforcement. :param check_str: The policy. This is a string defining a policy that - conforms to the policy language outlined at the top of - the file. + conforms to the policy language outlined at the top of the file. :param description: A plain text description of the policy. This will be - used to comment sample policy files for use by - deployers. + used to comment sample policy files for use by deployers. :param deprecated_rule: :class:`.DeprecatedRule` :param deprecated_for_removal: indicates whether the policy is planned for - removal in a future release. + removal in a future release. :param deprecated_reason: indicates why this policy is planned for removal - in a future release. Silently ignored if - deprecated_for_removal is False. + in a future release. Silently ignored if deprecated_for_removal is + False. :param deprecated_since: indicates which release this policy was deprecated - in. Accepts any string, though valid version - strings are encouraged. Silently ignored if - deprecated_for_removal is False. + in. Accepts any string, though valid version strings are encouraged. + Silently ignored if deprecated_for_removal is False. :param scope_types: A list containing the intended scopes of the operation - being done. + being done. - .. versionchanged 1.29 + .. versionchanged:: 1.29 Added *deprecated_rule* parameter. - .. versionchanged 1.29 + .. versionchanged:: 1.29 Added *deprecated_for_removal* parameter. - .. versionchanged 1.29 + .. versionchanged:: 1.29 Added *deprecated_reason* parameter. - .. versionchanged 1.29 + .. versionchanged:: 1.29 Added *deprecated_since* parameter. - .. versionchanged 1.31 + .. versionchanged:: 1.31 Added *scope_types* parameter. - """ - def __init__(self, name, check_str, description=None, - deprecated_rule=None, deprecated_for_removal=False, - deprecated_reason=None, deprecated_since=None, - scope_types=None): - self.name = name - self.check_str = check_str - self.check = _parser.parse_rule(check_str) - self.description = description - self.deprecated_rule = copy.deepcopy(deprecated_rule) or [] - self.deprecated_for_removal = deprecated_for_removal - self.deprecated_reason = deprecated_reason - self.deprecated_since = deprecated_since - self._deprecated_rule_handled = False + def __init__( + self, name, check_str, description=None, + deprecated_rule=None, deprecated_for_removal=False, + deprecated_reason=None, deprecated_since=None, + scope_types=None, + ): + super().__init__(name, check_str) + + self._description = description + self._deprecated_rule = copy.deepcopy(deprecated_rule) or [] + self._deprecated_for_removal = deprecated_for_removal + self._deprecated_reason = deprecated_reason + self._deprecated_since = deprecated_since if self.deprecated_rule: if not isinstance(self.deprecated_rule, DeprecatedRule): @@ -1199,30 +1219,59 @@ class RuleDefault(object): 'deprecated_rule must be a DeprecatedRule object.' ) - if (deprecated_for_removal or deprecated_rule) and ( - deprecated_reason is None or deprecated_since is None): - raise ValueError( - '%(name)s deprecated without deprecated_reason or ' - 'deprecated_since. Both must be supplied if deprecating a ' - 'policy' % {'name': self.name} - ) + # if this rule is being deprecated, we need to provide a deprecation + # reason here, but if this rule is replacing another rule, then the + # deprecation reason belongs on that other rule + if deprecated_for_removal: + if deprecated_reason is None or deprecated_since is None: + raise ValueError( + '%(name)s deprecated without deprecated_reason or ' + 'deprecated_since. Both must be supplied if deprecating a ' + 'policy' % {'name': self.name} + ) + elif deprecated_rule and (deprecated_reason or deprecated_since): + warnings.warn( + f'{name} should not configure deprecated_reason or ' + f'deprecated_since as these should be configured on the ' + f'DeprecatedRule indicated by deprecated_rule. ' + f'This will be an error in a future release', + DeprecationWarning) if scope_types: msg = 'scope_types must be a list of strings.' if not isinstance(scope_types, list): raise ValueError(msg) + for scope_type in scope_types: if not isinstance(scope_type, str): raise ValueError(msg) + if scope_types.count(scope_type) > 1: raise ValueError( 'scope_types must be a list of unique strings.' ) + self.scope_types = scope_types - def __str__(self): - return '"%(name)s": "%(check_str)s"' % {'name': self.name, - 'check_str': self.check_str} + @property + def description(self): + return self._description + + @property + def deprecated_rule(self): + return self._deprecated_rule + + @property + def deprecated_for_removal(self): + return self._deprecated_for_removal + + @property + def deprecated_reason(self): + return self._deprecated_reason + + @property + def deprecated_since(self): + return self._deprecated_since def __eq__(self, other): """Equality operator. @@ -1254,7 +1303,7 @@ class DocumentedRuleDefault(RuleDefault): attributes of this class. Eventually, all usage of RuleDefault should be converted to use DocumentedRuleDefault. - :param operations: List of dicts containing each api url and + :param operations: List of dicts containing each API URL and corresponding http request method. Example:: @@ -1262,11 +1311,13 @@ class DocumentedRuleDefault(RuleDefault): operations=[{'path': '/foo', 'method': 'GET'}, {'path': '/some', 'method': 'POST'}] """ - def __init__(self, name, check_str, description, operations, - deprecated_rule=None, deprecated_for_removal=False, - deprecated_reason=None, deprecated_since=None, - scope_types=None): - super(DocumentedRuleDefault, self).__init__( + def __init__( + self, name, check_str, description, operations, + deprecated_rule=None, deprecated_for_removal=False, + deprecated_reason=None, deprecated_since=None, + scope_types=None, + ): + super().__init__( name, check_str, description, deprecated_rule=deprecated_rule, deprecated_for_removal=deprecated_for_removal, @@ -1274,41 +1325,36 @@ class DocumentedRuleDefault(RuleDefault): deprecated_since=deprecated_since, scope_types=scope_types ) - self.operations = operations - @property - def description(self): - return self._description + self._operations = operations - @description.setter - def description(self, value): - # Validates description isn't empty. - if not value: + if not self._description: raise InvalidRuleDefault('Description is required') - self._description = value - - @property - def operations(self): - return self._operations - @operations.setter - def operations(self, ops): - if not isinstance(ops, list): + if not isinstance(self._operations, list): raise InvalidRuleDefault('Operations must be a list') - if not ops: + + if not self._operations: raise InvalidRuleDefault('Operations list must not be empty') - for op in ops: + for op in self._operations: if 'path' not in op: raise InvalidRuleDefault('Operation must contain a path') if 'method' not in op: raise InvalidRuleDefault('Operation must contain a method') if len(op.keys()) > 2: raise InvalidRuleDefault('Operation contains > 2 keys') - self._operations = ops + @property + def description(self): + return self._description + + @property + def operations(self): + return self._operations -class DeprecatedRule(object): + +class DeprecatedRule(_BaseRule): """Represents a Deprecated policy or rule. @@ -1330,6 +1376,8 @@ class DeprecatedRule(object): deprecated_rule = policy.DeprecatedRule( name='foo:create_bar', check_str='role:fizz' + deprecated_reason='role:bang is a better default', + deprecated_since='N', ) policy.DocumentedRuleDefault( @@ -1338,8 +1386,6 @@ class DeprecatedRule(object): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='role:bang is a better default', - deprecated_since='N' ) DeprecatedRule can be used to change the policy name itself. Assume the @@ -1361,6 +1407,8 @@ class DeprecatedRule(object): deprecated_rule = policy.DeprecatedRule( name='foo:post_bar', check_str='role:fizz' + deprecated_reason='foo:create_bar is more consistent', + deprecated_since='N', ) policy.DocumentedRuleDefault( @@ -1369,8 +1417,6 @@ class DeprecatedRule(object): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='foo:create_bar is more consistent', - deprecated_since='N' ) Finally, let's use DeprecatedRule to break a policy into more granular @@ -1415,6 +1461,10 @@ class DeprecatedRule(object): deprecated_rule = policy.DeprecatedRule( name='foo:bar', check_str='role:bazz' + deprecated_reason=( + 'foo:bar has been replaced by more granular policies' + ), + deprecated_since='N', ) policy.DocumentedRuleDefault( @@ -1423,8 +1473,6 @@ class DeprecatedRule(object): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='foo:create_bar is more granular than foo:bar', - deprecated_since='N' ) policy.DocumentedRuleDefault( name='foo:list_bars', @@ -1432,8 +1480,6 @@ class DeprecatedRule(object): description='List bars.', operations=[{'path': '/v1/bars', 'method': 'GET'}], deprecated_rule=deprecated_rule, - deprecated_reason='foo:list_bars is more granular than foo:bar', - deprecated_since='N' ) policy.DocumentedRuleDefault( name='foo:get_bar', @@ -1441,8 +1487,6 @@ class DeprecatedRule(object): description='Get a bar.', operations=[{'path': '/v1/bars/{bar_id}', 'method': 'GET'}], deprecated_rule=deprecated_rule, - deprecated_reason='foo:get_bar is more granular than foo:bar', - deprecated_since='N' ) policy.DocumentedRuleDefault( name='foo:update_bar', @@ -1450,8 +1494,6 @@ class DeprecatedRule(object): description='Update a bar.', operations=[{'path': '/v1/bars/{bar_id}', 'method': 'PATCH'}], deprecated_rule=deprecated_rule, - deprecated_reason='foo:update_bar is more granular than foo:bar', - deprecated_since='N' ) policy.DocumentedRuleDefault( name='foo:delete_bar', @@ -1459,19 +1501,50 @@ class DeprecatedRule(object): description='Delete a bar.', operations=[{'path': '/v1/bars/{bar_id}', 'method': 'DELETE'}], deprecated_rule=deprecated_rule, - deprecated_reason='foo:delete_bar is more granular than foo:bar', - deprecated_since='N' ) - .. versionchanged 1.29 + :param name: The name of the policy. This is used when referencing it + from another rule or during policy enforcement. + :param check_str: The policy. This is a string defining a policy that + conforms to the policy language outlined at the top of the file. + :param deprecated_reason: indicates why this policy is planned for removal + in a future release. + :param deprecated_since: indicates which release this policy was deprecated + in. Accepts any string, though valid version strings are encouraged. + + .. versionchanged:: 1.29 Added *DeprecatedRule* object. + + .. versionchanged:: 3.4 + Added *deprecated_reason* parameter. + + .. versionchanged:: 3.4 + Added *deprecated_since* parameter. """ - def __init__(self, name, check_str): - """Construct a DeprecatedRule object. + def __init__( + self, + name: str, + check_str: str, + *, + deprecated_reason: ty.Optional[str] = None, + deprecated_since: ty.Optional[str] = None, + ): + super().__init__(name, check_str) - :param name: the policy name - :param check_str: the value of the policy's check string - """ - self.name = name - self.check_str = check_str + self._deprecated_reason = deprecated_reason + self._deprecated_since = deprecated_since + + if not deprecated_reason or not deprecated_since: + warnings.warn( + f'{name} deprecated without deprecated_reason or ' + f'deprecated_since. This will be an error in a future release', + DeprecationWarning) + + @property + def deprecated_reason(self): + return self._deprecated_reason + + @property + def deprecated_since(self): + return self._deprecated_since diff --git a/oslo_policy/tests/test_external.py b/oslo_policy/tests/test_external.py index 478920b..797e70f 100644 --- a/oslo_policy/tests/test_external.py +++ b/oslo_policy/tests/test_external.py @@ -16,6 +16,7 @@ import json from unittest import mock +import fixtures from oslo_serialization import jsonutils from requests_mock.contrib import fixture as rm_fixture from urllib import parse as urlparse @@ -155,6 +156,11 @@ class HttpsCheckTestCase(base.PolicyBaseTestCase): opts._register(self.conf) self.requests_mock = self.useFixture(rm_fixture.Fixture()) + # ensure environment variables don't mess with our test results + # https://requests.readthedocs.io/en/master/user/advanced/#ssl-cert-verification + self.useFixture(fixtures.EnvironmentVariable('REQUESTS_CA_BUNDLE')) + self.useFixture(fixtures.EnvironmentVariable('CURL_CA_BUNDLE')) + def decode_post_data(self, post_data): result = {} for item in post_data.split('&'): @@ -203,6 +209,8 @@ class HttpsCheckTestCase(base.PolicyBaseTestCase): def test_https_accept_with_verify(self): self.conf.set_override('remote_ssl_verify_server_crt', True, group='oslo_policy') + self.conf.set_override('remote_ssl_ca_crt_file', None, + group='oslo_policy') self.requests_mock.post('https://example.com/target', text='True') check = _external.HttpsCheck('https', '//example.com/%(name)s') diff --git a/oslo_policy/tests/test_generator.py b/oslo_policy/tests/test_generator.py index 8df0ec8..5dbb4ed 100644 --- a/oslo_policy/tests/test_generator.py +++ b/oslo_policy/tests/test_generator.py @@ -194,17 +194,17 @@ class GenerateSampleYAMLTestCase(base.PolicyBaseTestCase): def test_deprecated_policies_are_aliased_to_new_names(self): deprecated_rule = policy.DeprecatedRule( name='foo:post_bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason=( + 'foo:post_bar is being removed in favor of foo:create_bar' + ), + deprecated_since='N', ) new_rule = policy.RuleDefault( name='foo:create_bar', check_str='role:fizz', description='Create a bar.', deprecated_rule=deprecated_rule, - deprecated_reason=( - 'foo:post_bar is being removed in favor of foo:create_bar' - ), - deprecated_since='N' ) opts = {'rules': [new_rule]} @@ -240,17 +240,17 @@ class GenerateSampleYAMLTestCase(base.PolicyBaseTestCase): def test_deprecated_policies_with_same_name(self): deprecated_rule = policy.DeprecatedRule( name='foo:create_bar', - check_str='role:old' + check_str='role:old', + deprecated_reason=( + 'role:fizz is a more sane default for foo:create_bar' + ), + deprecated_since='N', ) new_rule = policy.RuleDefault( name='foo:create_bar', check_str='role:fizz', description='Create a bar.', deprecated_rule=deprecated_rule, - deprecated_reason=( - 'role:fizz is a more sane default for foo:create_bar' - ), - deprecated_since='N' ) opts = {'rules': [new_rule]} @@ -606,12 +606,18 @@ class ListRedundantTestCase(base.PolicyBaseTestCase): enforcer.register_default( policy.RuleDefault('owner', 'project_id:%(project_id)s')) # register a new opt - deprecated_rule = policy.DeprecatedRule('old_foo', 'role:bar') + deprecated_rule = policy.DeprecatedRule( + name='old_foo', + check_str='role:bar', + deprecated_reason='reason', + deprecated_since='T' + ) enforcer.register_default( - policy.RuleDefault('foo', 'role:foo', - deprecated_rule=deprecated_rule, - deprecated_reason='reason', - deprecated_since='T') + policy.RuleDefault( + name='foo', + check_str='role:foo', + deprecated_rule=deprecated_rule, + ), ) # Mock out stevedore to return the configured enforcer @@ -656,7 +662,9 @@ class UpgradePolicyTestCase(base.PolicyBaseTestCase): self.create_config_file('policy.json', policy_json_contents) deprecated_policy = policy.DeprecatedRule( name='deprecated_name', - check_str='rule:admin' + check_str='rule:admin', + deprecated_reason='test', + deprecated_since='Stein', ) self.new_policy = policy.DocumentedRuleDefault( name='new_policy_name', @@ -664,8 +672,6 @@ class UpgradePolicyTestCase(base.PolicyBaseTestCase): description='test_policy', operations=[{'path': '/test', 'method': 'GET'}], deprecated_rule=deprecated_policy, - deprecated_reason='test', - deprecated_since='Stein' ) self.extensions = [] ext = stevedore.extension.Extension(name='test_upgrade', @@ -848,7 +854,9 @@ class ConvertJsonToYamlTestCase(base.PolicyBaseTestCase): 'converted_policy.yaml') deprecated_policy = policy.DeprecatedRule( name='deprecated_rule1_name', - check_str='rule:admin' + check_str='rule:admin', + deprecated_reason='testing', + deprecated_since='ussuri', ) self.registered_policy = [ policy.DocumentedRuleDefault( @@ -857,9 +865,7 @@ class ConvertJsonToYamlTestCase(base.PolicyBaseTestCase): description='test_rule1', operations=[{'path': '/test', 'method': 'GET'}], deprecated_rule=deprecated_policy, - deprecated_reason='testing', - deprecated_since='ussuri', - scope_types=['system'] + scope_types=['system'], ), policy.RuleDefault( name='rule2_name', diff --git a/oslo_policy/tests/test_policy.py b/oslo_policy/tests/test_policy.py index 513b43d..d5c86c3 100644 --- a/oslo_policy/tests/test_policy.py +++ b/oslo_policy/tests/test_policy.py @@ -788,20 +788,17 @@ class EnforcerTest(base.PolicyBaseTestCase): rule_original = policy.RuleDefault( name='test', check_str='role:owner',) - rule_original._deprecated_rule_handled = False + self.enforcer.register_default(rule_original) - self.enforcer.registered_rules['test'].check_str = 'role:admin' - self.enforcer.registered_rules['test'].check = 'role:admin' - self.enforcer.registered_rules['test']._deprecated_rule_handled = True - self.assertEqual(self.enforcer.registered_rules['test'].check_str, - 'role:admin') - self.assertEqual(self.enforcer.registered_rules['test'].check, - 'role:admin') - self.assertTrue( - self.enforcer.registered_rules['test']._deprecated_rule_handled) + self.enforcer.registered_rules['test']._check_str = 'role:admin' + self.enforcer.registered_rules['test']._check = 'role:admin' + + self.assertEqual( + self.enforcer.registered_rules['test'].check_str, 'role:admin') + self.assertEqual( + self.enforcer.registered_rules['test'].check, 'role:admin') self.assertEqual(rule_original.check_str, 'role:owner') self.assertEqual(rule_original.check.__str__(), 'role:owner') - self.assertFalse(rule_original._deprecated_rule_handled) def test_non_reversible_check(self): self.create_config_file('policy.json', @@ -1253,7 +1250,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): def test_deprecate_a_policy_check_string(self): deprecated_rule = policy.DeprecatedRule( name='foo:create_bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason='"role:bang" is a better default', + deprecated_since='N' ) rule_list = [policy.DocumentedRuleDefault( @@ -1262,8 +1261,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"role:bang" is a better default', - deprecated_since='N' )] enforcer = policy.Enforcer(self.conf) enforcer.register_defaults(rule_list) @@ -1293,7 +1290,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): def test_deprecate_an_empty_policy_check_string(self): deprecated_rule = policy.DeprecatedRule( name='foo:create_bar', - check_str='' + check_str='', + deprecated_reason='because of reasons', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( @@ -1302,8 +1301,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='because of reasons', - deprecated_since='N' )] enforcer = policy.Enforcer(self.conf) enforcer.register_defaults(rule_list) @@ -1321,7 +1318,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): def test_deprecate_replace_with_empty_policy_check_string(self): deprecated_rule = policy.DeprecatedRule( name='foo:create_bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason='because of reasons', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( @@ -1330,8 +1329,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='because of reasons', - deprecated_since='N' )] enforcer = policy.Enforcer(self.conf) enforcer.register_defaults(rule_list) @@ -1348,15 +1345,7 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): def test_deprecate_a_policy_name(self): deprecated_rule = policy.DeprecatedRule( name='foo:bar', - check_str='role:baz' - ) - - rule_list = [policy.DocumentedRuleDefault( - name='foo:create_bar', check_str='role:baz', - description='Create a bar.', - operations=[{'path': '/v1/bars/', 'method': 'POST'}], - deprecated_rule=deprecated_rule, deprecated_reason=( '"foo:bar" is not granular enough. If your deployment has ' 'overridden "foo:bar", ensure you override the new policies ' @@ -1365,7 +1354,15 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): '"foo:bar:update", "foo:bar:list", and "foo:bar:delete", ' 'which might be backwards incompatible for your deployment' ), - deprecated_since='N' + deprecated_since='N', + ) + + rule_list = [policy.DocumentedRuleDefault( + name='foo:create_bar', + check_str='role:baz', + description='Create a bar.', + operations=[{'path': '/v1/bars/', 'method': 'POST'}], + deprecated_rule=deprecated_rule, )] expected_msg = ( 'Policy "foo:bar":"role:baz" was deprecated in N in favor of ' @@ -1439,7 +1436,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): def test_deprecate_check_str_suppress_does_not_log_warning(self): deprecated_rule = policy.DeprecatedRule( name='foo:create_bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason='"role:bang" is a better default', + deprecated_since='N' ) rule_list = [policy.DocumentedRuleDefault( @@ -1448,8 +1447,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"role:bang" is a better default', - deprecated_since='N' )] enforcer = policy.Enforcer(self.conf) enforcer.suppress_deprecation_warnings = True @@ -1461,7 +1458,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): def test_deprecate_name_suppress_does_not_log_warning(self): deprecated_rule = policy.DeprecatedRule( name='foo:bar', - check_str='role:baz' + check_str='role:baz', + deprecated_reason='"foo:bar" is not granular enough.', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( @@ -1470,8 +1469,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars/', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"foo:bar" is not granular enough.', - deprecated_since='N' )] rules = jsonutils.dumps({'foo:bar': 'role:bang'}) @@ -1509,7 +1506,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): def test_suppress_default_change_warnings_flag_not_log_warning(self): deprecated_rule = policy.DeprecatedRule( name='foo:create_bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason='"role:bang" is a better default', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( @@ -1518,8 +1517,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"role:bang" is a better default', - deprecated_since='N' )] enforcer = policy.Enforcer(self.conf) enforcer.suppress_default_change_warnings = True @@ -1528,7 +1525,7 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): enforcer.load_rules() mock_warn.assert_not_called() - def test_deprecated_policy_for_removal_must_include_deprecated_since(self): + def test_deprecated_policy_for_removal_must_include_deprecated_meta(self): self.assertRaises( ValueError, policy.DocumentedRuleDefault, @@ -1538,24 +1535,25 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): operations=[{'path': '/v1/foos/', 'method': 'POST'}], deprecated_for_removal=True, deprecated_reason='Some reason.' + # no deprecated_since ) - def test_deprecated_policy_must_include_deprecated_since(self): + def test_deprecated_policy_should_not_include_deprecated_meta(self): deprecated_rule = policy.DeprecatedRule( name='foo:bar', check_str='rule:baz' ) - self.assertRaises( - ValueError, - policy.DocumentedRuleDefault, - name='foo:bar', - check_str='rule:baz', - description='Create a foo.', - operations=[{'path': '/v1/foos/', 'method': 'POST'}], - deprecated_rule=deprecated_rule, - deprecated_reason='Some reason.' - ) + with mock.patch('warnings.warn') as mock_warn: + policy.DocumentedRuleDefault( + name='foo:bar', + check_str='rule:baz', + description='Create a foo.', + operations=[{'path': '/v1/foos/', 'method': 'POST'}], + deprecated_rule=deprecated_rule, + deprecated_reason='Some reason.' + ) + mock_warn.assert_called_once() def test_deprecated_rule_requires_deprecated_rule_object(self): self.assertRaises( @@ -1591,7 +1589,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): # better. deprecated_rule = policy.DeprecatedRule( name='foo:bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason='"role:bang" is a better default', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( name='foo:create_bar', @@ -1599,8 +1599,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"role:bang" is a better default', - deprecated_since='N' )] self.enforcer.register_defaults(rule_list) @@ -1625,7 +1623,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): # better. deprecated_rule = policy.DeprecatedRule( name='foo:bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason='"role:bang" is a better default', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( name='foo:create_bar', @@ -1633,8 +1633,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"role:bang" is a better default', - deprecated_since='N' )] self.enforcer.register_defaults(rule_list) @@ -1667,7 +1665,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): # better. deprecated_rule = policy.DeprecatedRule( name='foo:bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason='"role:bang" is a better default', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( name='foo:create_bar', @@ -1675,8 +1675,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"role:bang" is a better default', - deprecated_since='N' )] self.enforcer.register_defaults(rule_list) @@ -1711,7 +1709,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): # Deprecate the policy name in favor of something better. deprecated_rule = policy.DeprecatedRule( name='old_rule', - check_str='role:bang' + check_str='role:bang', + deprecated_reason='"old_rule" is a bad name', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( name='new_rule', @@ -1719,8 +1719,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Replacement for old_rule.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"old_rule" is a bad name', - deprecated_since='N' )] self.enforcer.register_defaults(rule_list) @@ -1740,7 +1738,9 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): group='oslo_policy') deprecated_rule = policy.DeprecatedRule( name='foo:create_bar', - check_str='role:fizz' + check_str='role:fizz', + deprecated_reason='"role:bang" is a better default', + deprecated_since='N', ) rule_list = [policy.DocumentedRuleDefault( @@ -1749,8 +1749,6 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): description='Create a bar.', operations=[{'path': '/v1/bars', 'method': 'POST'}], deprecated_rule=deprecated_rule, - deprecated_reason='"role:bang" is a better default', - deprecated_since='N' )] enforcer = policy.Enforcer(self.conf) enforcer.register_defaults(rule_list) @@ -1782,27 +1780,34 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): deprecated_reason='"role:bang" is a better default', deprecated_since='N' ) + check = rule.check enforcer = policy.Enforcer(self.conf) enforcer.register_defaults([rule]) - registered_default_rule = enforcer.registered_rules['foo:create_bar'] - # Check that rule deprecation handling hasn't been done, yet - self.assertFalse(registered_default_rule._deprecated_rule_handled) - # Loading the rules will modify the rule check string to logically OR - # the new value with the deprecated value + # Check that rule processing hasn't been done, yet + self.assertEqual({}, enforcer.rules) + + # Load the rules enforcer.load_rules() - self.assertTrue(registered_default_rule._deprecated_rule_handled) - # Make sure the original value is used instead of instantiating new - # OrCheck objects whenever we perform subsequent reloads - expected_check = policy.OrCheck([_parser.parse_rule(cs) for cs in - [rule.check_str, - deprecated_rule.check_str]]) + # Loading the rules will store a version of the rule check string + # logically ORed with the check string of the deprecated value. Make + # sure this is happening but that the original rule check is unchanged + expected_check = policy.OrCheck([ + _parser.parse_rule(cs) for cs in + [rule.check_str, deprecated_rule.check_str] + ]) + self.assertIn('foo:create_bar', enforcer.rules) + self.assertEqual( + str(enforcer.rules['foo:create_bar']), str(expected_check)) + self.assertEqual(check, rule.check) + # Hacky way to check whether _handle_deprecated_rule was called again. + # If a second call to load_rules doesn't overwrite our dummy rule then + # we know it didn't call the deprecated rule function again. + enforcer.rules['foo:create_bar'] = 'foo:bar' enforcer.load_rules() - self.assertEqual(registered_default_rule.check.__str__(), - expected_check.__str__()) - self.assertTrue(registered_default_rule._deprecated_rule_handled) + self.assertEqual('foo:bar', enforcer.rules['foo:create_bar']) class DocumentedRuleDefaultTestCase(base.PolicyBaseTestCase): @@ -1888,6 +1893,18 @@ class DocumentedRuleDefaultTestCase(base.PolicyBaseTestCase): operations=invalid_op) +class DeprecatedRuleTestCase(base.PolicyBaseTestCase): + + def test_should_include_deprecated_meta(self): + with mock.patch('warnings.warn') as mock_warn: + policy.DeprecatedRule( + name='foo:bar', + check_str='rule:baz' + ) + + mock_warn.assert_called_once() + + class EnforcerCheckRulesTest(base.PolicyBaseTestCase): def setUp(self): super(EnforcerCheckRulesTest, self).setUp() |