diff options
Diffstat (limited to 'oslo_policy')
-rw-r--r-- | oslo_policy/policy.py | 20 | ||||
-rw-r--r-- | oslo_policy/shell.py | 4 | ||||
-rw-r--r-- | oslo_policy/tests/test_policy.py | 55 |
3 files changed, 73 insertions, 6 deletions
diff --git a/oslo_policy/policy.py b/oslo_policy/policy.py index f21ebe9..903f65b 100644 --- a/oslo_policy/policy.py +++ b/oslo_policy/policy.py @@ -221,7 +221,7 @@ by setting the ``policy_default_rule`` configuration setting to the desired rule name. """ -import collections +import collections.abc import copy import logging import os @@ -698,6 +698,11 @@ class Enforcer(object): DeprecatedRule """ + # Short-circuit the rule deprecation logic if we've already processed + # it for this particular rule. + if default._deprecated_rule_handled: + return + deprecated_rule = default.deprecated_rule deprecated_msg = ( @@ -742,6 +747,7 @@ class Enforcer(object): self.rules[default.name] = self.file_rules[ deprecated_rule.name ].check + default._deprecated_rule_handled = True # In this case, the default check string is changing. We need to let # operators know that this is going to change. If they don't want to @@ -762,6 +768,7 @@ class Enforcer(object): default.check = OrCheck([_parser.parse_rule(cs) for cs in [default.check_str, deprecated_rule.check_str]]) + default._deprecated_rule_handled = True if not (self.suppress_deprecation_warnings or self.suppress_default_change_warnings): warnings.warn(deprecated_msg) @@ -953,9 +960,9 @@ class Enforcer(object): # a method on RequestContext objects that converts attributes of the # context object to policy values. However, ``to_policy_values()`` # doesn't actually return a dictionary, it's a subclass of - # collections.MutableMapping, which behaves like a dictionary but + # collections.abc.MutableMapping, which behaves like a dictionary but # doesn't pass the type check. - elif not isinstance(creds, collections.MutableMapping): + elif not isinstance(creds, collections.abc.MutableMapping): msg = ( 'Expected type oslo_context.context.RequestContext, dict, or ' 'the output of ' @@ -1094,7 +1101,11 @@ class Enforcer(object): if default.name in self.registered_rules: raise DuplicatePolicyError(default.name) - self.registered_rules[default.name] = default + # NOTE Always make copy of registered rule because policy engine + # update these rules in many places (one example is + # self._handle_deprecated_rule() ). This will avoid any conflict + # in rule object values when running tests in parallel. + self.registered_rules[default.name] = copy.deepcopy(default) def register_defaults(self, defaults): """Registers a list of RuleDefaults. @@ -1180,6 +1191,7 @@ class RuleDefault(object): self.deprecated_for_removal = deprecated_for_removal self.deprecated_reason = deprecated_reason self.deprecated_since = deprecated_since + self._deprecated_rule_handled = False if self.deprecated_rule: if not isinstance(self.deprecated_rule, DeprecatedRule): diff --git a/oslo_policy/shell.py b/oslo_policy/shell.py index 7ffb5ea..da1bf1e 100644 --- a/oslo_policy/shell.py +++ b/oslo_policy/shell.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections +import collections.abc import sys from oslo_serialization import jsonutils @@ -59,7 +59,7 @@ def flatten(d, parent_key=''): items = [] for k, v in d.items(): new_key = parent_key + '.' + k if parent_key else k - if isinstance(v, collections.MutableMapping): + if isinstance(v, collections.abc.MutableMapping): items.extend(flatten(v, new_key).items()) else: items.append((new_key, v)) diff --git a/oslo_policy/tests/test_policy.py b/oslo_policy/tests/test_policy.py index f27f6b1..513b43d 100644 --- a/oslo_policy/tests/test_policy.py +++ b/oslo_policy/tests/test_policy.py @@ -784,6 +784,25 @@ class EnforcerTest(base.PolicyBaseTestCase): policy.RuleDefault(name='owner', check_str='role:owner')) + def test_enforcer_does_not_modify_original_registered_rule(self): + rule_original = policy.RuleDefault( + name='test', + check_str='role:owner',) + rule_original._deprecated_rule_handled = False + self.enforcer.register_default(rule_original) + self.enforcer.registered_rules['test'].check_str = 'role:admin' + self.enforcer.registered_rules['test'].check = 'role:admin' + self.enforcer.registered_rules['test']._deprecated_rule_handled = True + self.assertEqual(self.enforcer.registered_rules['test'].check_str, + 'role:admin') + self.assertEqual(self.enforcer.registered_rules['test'].check, + 'role:admin') + self.assertTrue( + self.enforcer.registered_rules['test']._deprecated_rule_handled) + self.assertEqual(rule_original.check_str, 'role:owner') + self.assertEqual(rule_original.check.__str__(), 'role:owner') + self.assertFalse(rule_original._deprecated_rule_handled) + def test_non_reversible_check(self): self.create_config_file('policy.json', jsonutils.dumps( @@ -1749,6 +1768,42 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): enforcer.enforce('foo:create_bar', {}, {'roles': ['baz']}) ) + def test_deprecation_logic_is_only_performed_once_per_rule(self): + deprecated_rule = policy.DeprecatedRule( + name='foo:create_bar', + check_str='role:fizz' + ) + rule = policy.DocumentedRuleDefault( + name='foo:create_bar', + check_str='role:bang', + description='Create a bar.', + operations=[{'path': '/v1/bars', 'method': 'POST'}], + deprecated_rule=deprecated_rule, + deprecated_reason='"role:bang" is a better default', + deprecated_since='N' + ) + + enforcer = policy.Enforcer(self.conf) + enforcer.register_defaults([rule]) + registered_default_rule = enforcer.registered_rules['foo:create_bar'] + # Check that rule deprecation handling hasn't been done, yet + self.assertFalse(registered_default_rule._deprecated_rule_handled) + + # Loading the rules will modify the rule check string to logically OR + # the new value with the deprecated value + enforcer.load_rules() + self.assertTrue(registered_default_rule._deprecated_rule_handled) + + # Make sure the original value is used instead of instantiating new + # OrCheck objects whenever we perform subsequent reloads + expected_check = policy.OrCheck([_parser.parse_rule(cs) for cs in + [rule.check_str, + deprecated_rule.check_str]]) + enforcer.load_rules() + self.assertEqual(registered_default_rule.check.__str__(), + expected_check.__str__()) + self.assertTrue(registered_default_rule._deprecated_rule_handled) + class DocumentedRuleDefaultTestCase(base.PolicyBaseTestCase): |