summaryrefslogtreecommitdiff
path: root/oslo_policy
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_policy')
-rw-r--r--oslo_policy/policy.py20
-rw-r--r--oslo_policy/shell.py4
-rw-r--r--oslo_policy/tests/test_policy.py55
3 files changed, 73 insertions, 6 deletions
diff --git a/oslo_policy/policy.py b/oslo_policy/policy.py
index f21ebe9..903f65b 100644
--- a/oslo_policy/policy.py
+++ b/oslo_policy/policy.py
@@ -221,7 +221,7 @@ by setting the ``policy_default_rule`` configuration setting to the
desired rule name.
"""
-import collections
+import collections.abc
import copy
import logging
import os
@@ -698,6 +698,11 @@ class Enforcer(object):
DeprecatedRule
"""
+ # Short-circuit the rule deprecation logic if we've already processed
+ # it for this particular rule.
+ if default._deprecated_rule_handled:
+ return
+
deprecated_rule = default.deprecated_rule
deprecated_msg = (
@@ -742,6 +747,7 @@ class Enforcer(object):
self.rules[default.name] = self.file_rules[
deprecated_rule.name
].check
+ default._deprecated_rule_handled = True
# In this case, the default check string is changing. We need to let
# operators know that this is going to change. If they don't want to
@@ -762,6 +768,7 @@ class Enforcer(object):
default.check = OrCheck([_parser.parse_rule(cs) for cs in
[default.check_str,
deprecated_rule.check_str]])
+ default._deprecated_rule_handled = True
if not (self.suppress_deprecation_warnings
or self.suppress_default_change_warnings):
warnings.warn(deprecated_msg)
@@ -953,9 +960,9 @@ class Enforcer(object):
# a method on RequestContext objects that converts attributes of the
# context object to policy values. However, ``to_policy_values()``
# doesn't actually return a dictionary, it's a subclass of
- # collections.MutableMapping, which behaves like a dictionary but
+ # collections.abc.MutableMapping, which behaves like a dictionary but
# doesn't pass the type check.
- elif not isinstance(creds, collections.MutableMapping):
+ elif not isinstance(creds, collections.abc.MutableMapping):
msg = (
'Expected type oslo_context.context.RequestContext, dict, or '
'the output of '
@@ -1094,7 +1101,11 @@ class Enforcer(object):
if default.name in self.registered_rules:
raise DuplicatePolicyError(default.name)
- self.registered_rules[default.name] = default
+ # NOTE Always make copy of registered rule because policy engine
+ # update these rules in many places (one example is
+ # self._handle_deprecated_rule() ). This will avoid any conflict
+ # in rule object values when running tests in parallel.
+ self.registered_rules[default.name] = copy.deepcopy(default)
def register_defaults(self, defaults):
"""Registers a list of RuleDefaults.
@@ -1180,6 +1191,7 @@ class RuleDefault(object):
self.deprecated_for_removal = deprecated_for_removal
self.deprecated_reason = deprecated_reason
self.deprecated_since = deprecated_since
+ self._deprecated_rule_handled = False
if self.deprecated_rule:
if not isinstance(self.deprecated_rule, DeprecatedRule):
diff --git a/oslo_policy/shell.py b/oslo_policy/shell.py
index 7ffb5ea..da1bf1e 100644
--- a/oslo_policy/shell.py
+++ b/oslo_policy/shell.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import collections
+import collections.abc
import sys
from oslo_serialization import jsonutils
@@ -59,7 +59,7 @@ def flatten(d, parent_key=''):
items = []
for k, v in d.items():
new_key = parent_key + '.' + k if parent_key else k
- if isinstance(v, collections.MutableMapping):
+ if isinstance(v, collections.abc.MutableMapping):
items.extend(flatten(v, new_key).items())
else:
items.append((new_key, v))
diff --git a/oslo_policy/tests/test_policy.py b/oslo_policy/tests/test_policy.py
index f27f6b1..513b43d 100644
--- a/oslo_policy/tests/test_policy.py
+++ b/oslo_policy/tests/test_policy.py
@@ -784,6 +784,25 @@ class EnforcerTest(base.PolicyBaseTestCase):
policy.RuleDefault(name='owner',
check_str='role:owner'))
+ def test_enforcer_does_not_modify_original_registered_rule(self):
+ rule_original = policy.RuleDefault(
+ name='test',
+ check_str='role:owner',)
+ rule_original._deprecated_rule_handled = False
+ self.enforcer.register_default(rule_original)
+ self.enforcer.registered_rules['test'].check_str = 'role:admin'
+ self.enforcer.registered_rules['test'].check = 'role:admin'
+ self.enforcer.registered_rules['test']._deprecated_rule_handled = True
+ self.assertEqual(self.enforcer.registered_rules['test'].check_str,
+ 'role:admin')
+ self.assertEqual(self.enforcer.registered_rules['test'].check,
+ 'role:admin')
+ self.assertTrue(
+ self.enforcer.registered_rules['test']._deprecated_rule_handled)
+ self.assertEqual(rule_original.check_str, 'role:owner')
+ self.assertEqual(rule_original.check.__str__(), 'role:owner')
+ self.assertFalse(rule_original._deprecated_rule_handled)
+
def test_non_reversible_check(self):
self.create_config_file('policy.json',
jsonutils.dumps(
@@ -1749,6 +1768,42 @@ class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase):
enforcer.enforce('foo:create_bar', {}, {'roles': ['baz']})
)
+ def test_deprecation_logic_is_only_performed_once_per_rule(self):
+ deprecated_rule = policy.DeprecatedRule(
+ name='foo:create_bar',
+ check_str='role:fizz'
+ )
+ rule = policy.DocumentedRuleDefault(
+ name='foo:create_bar',
+ check_str='role:bang',
+ description='Create a bar.',
+ operations=[{'path': '/v1/bars', 'method': 'POST'}],
+ deprecated_rule=deprecated_rule,
+ deprecated_reason='"role:bang" is a better default',
+ deprecated_since='N'
+ )
+
+ enforcer = policy.Enforcer(self.conf)
+ enforcer.register_defaults([rule])
+ registered_default_rule = enforcer.registered_rules['foo:create_bar']
+ # Check that rule deprecation handling hasn't been done, yet
+ self.assertFalse(registered_default_rule._deprecated_rule_handled)
+
+ # Loading the rules will modify the rule check string to logically OR
+ # the new value with the deprecated value
+ enforcer.load_rules()
+ self.assertTrue(registered_default_rule._deprecated_rule_handled)
+
+ # Make sure the original value is used instead of instantiating new
+ # OrCheck objects whenever we perform subsequent reloads
+ expected_check = policy.OrCheck([_parser.parse_rule(cs) for cs in
+ [rule.check_str,
+ deprecated_rule.check_str]])
+ enforcer.load_rules()
+ self.assertEqual(registered_default_rule.check.__str__(),
+ expected_check.__str__())
+ self.assertTrue(registered_default_rule._deprecated_rule_handled)
+
class DocumentedRuleDefaultTestCase(base.PolicyBaseTestCase):