diff options
Diffstat (limited to 'nova/tests/unit/policies/base.py')
-rw-r--r-- | nova/tests/unit/policies/base.py | 165 |
1 files changed, 152 insertions, 13 deletions
diff --git a/nova/tests/unit/policies/base.py b/nova/tests/unit/policies/base.py index aff01d14f6..7490441d92 100644 --- a/nova/tests/unit/policies/base.py +++ b/nova/tests/unit/policies/base.py @@ -24,6 +24,23 @@ from nova.tests import fixtures LOG = logging.getLogger(__name__) +def rule_if_system(system_rule, non_system_rule, context): + """Helper function to pick a rule based on system-ness of context. + + This can be used (with functools.partial) to choose between two + rule names, based on whether or not the context has system + scope. Specifically if we will fail the parent of a nested policy + check based on scope_types=['project'], this can be used to choose + the parent rule name for the error message check in + common_policy_check(). + + """ + if context.system_scope: + return system_rule + else: + return non_system_rule + + class BasePolicyTest(test.TestCase): # NOTE(gmann): Set this flag to True if you would like to tests the # new behaviour of policy without deprecated rules. @@ -36,11 +53,21 @@ class BasePolicyTest(test.TestCase): # For Example: # rules_without_deprecation{ # "os_compute_api:os-deferred-delete:restore": - # "rule:system_admin_or_owner"} + # "rule:project_admin_api"} rules_without_deprecation = {} def setUp(self): super(BasePolicyTest, self).setUp() + # TODO(gmann): enforce_scope and enforce_new_defaults are enabled + # by default in the code so disable them in base test class until + # we have deprecated rules and their tests. We have enforce_scope + # and no-legacy tests which are explicitly enabling scope and new + # defaults to test the new defaults and scope. In future, once + # we remove the deprecated rules, along with refactoring the unit + # tests we can remove overriding the oslo policy flags. + self.flags(enforce_scope=False, group="oslo_policy") + if not self.without_deprecated_rules: + self.flags(enforce_new_defaults=False, group="oslo_policy") self.useFixture(fixtures.NeutronFixture(self)) self.policy = self.useFixture(fixtures.RealPolicyFixture()) @@ -95,33 +122,135 @@ class BasePolicyTest(test.TestCase): project_id=self.project_id_other, roles=['reader']) - self.all_contexts = [ + self.all_contexts = set([ self.legacy_admin_context, self.system_admin_context, self.system_member_context, self.system_reader_context, self.system_foo_context, self.project_admin_context, self.project_member_context, self.project_reader_context, self.other_project_member_context, self.project_foo_context, self.other_project_reader_context - ] + ]) + + # All the project contexts for easy access. + self.all_project_contexts = set([ + self.legacy_admin_context, + self.project_admin_context, self.project_member_context, + self.project_reader_context, self.project_foo_context, + self.other_project_member_context, + self.other_project_reader_context, + ]) + # All the system contexts for easy access. + self.all_system_contexts = set([ + self.system_admin_context, self.system_foo_context, + self.system_member_context, self.system_reader_context, + ]) + # A few commmon set of contexts to be used in tests + # + # With scope disable and no legacy rule, any admin, + # project members have access. No other role in that project + # will have access. + self.project_member_or_admin_with_no_scope_no_legacy = set([ + self.legacy_admin_context, self.system_admin_context, + self.project_admin_context, self.project_member_context, + ]) + # With scope enable and legacy rule, only project scoped admin + # and any role in that project will have access. + self.project_m_r_or_admin_with_scope_and_legacy = set([ + self.legacy_admin_context, self.project_admin_context, + self.project_member_context, self.project_reader_context, + self.project_foo_context + ]) + # With scope enable and no legacy rule, only project scoped admin + # and project members have access. No other role in that project + # or system scoped token will have access. + self.project_member_or_admin_with_scope_no_legacy = set([ + self.legacy_admin_context, self.project_admin_context, + self.project_member_context + ]) + # With scope disable and no legacy rule, any admin, + # project members, and project reader have access. No other + # role in that project will have access. + self.project_reader_or_admin_with_no_scope_no_legacy = set([ + self.legacy_admin_context, self.system_admin_context, + self.project_admin_context, self.project_member_context, + self.project_reader_context + ]) + # With scope enable and no legacy rule, only project scoped admin, + # project members, and project reader have access. No other role + # in that project or system scoped token will have access. + self.project_reader_or_admin_with_scope_no_legacy = set([ + self.legacy_admin_context, self.project_admin_context, + self.project_member_context, self.project_reader_context + ]) if self.without_deprecated_rules: # To simulate the new world, remove deprecations by overriding # rules which has the deprecated rules. self.rules_without_deprecation.update({ - "system_admin_or_owner": - "rule:system_admin_api or rule:project_member_api", - "system_or_project_reader": - "rule:system_reader_api or rule:project_reader_api", - "system_admin_api": - "role:admin and system_scope:all", - "system_reader_api": - "role:reader and system_scope:all", + "context_is_admin": + "role:admin", + "project_reader_or_admin": + "rule:project_reader_api or rule:context_is_admin", + "project_admin_api": + "role:admin and project_id:%(project_id)s", "project_member_api": "role:member and project_id:%(project_id)s", + "project_reader_api": + "role:reader and project_id:%(project_id)s", + "project_member_or_admin": + "rule:project_member_api or rule:context_is_admin", + "project_reader_or_admin": + "rule:project_reader_api or rule:context_is_admin", }) self.policy.set_rules(self.rules_without_deprecation, overwrite=False) + def reduce_set(self, name, new_set): + """Reduce a named set of contexts in a subclass. + + This removes things from a set in a child test class by taking + a new set, but asserts that no *new* contexts are added over + what is defined in the parent. + + :param name: The name of a set of contexts on self + (i.e. 'project' for self.project_contexts + :param new_set: The new set of contexts that should be used in + the above set. The new_set is asserted to be a + perfect subset of the existing set + """ + current = getattr(self, '%s_contexts' % name) + + errors = ','.join(x.user_id for x in new_set - current) + self.assertEqual('', errors, + 'Attempt to reduce set would add %s' % errors) + + LOG.info('%s.%s_contexts: removing %s', + self.__class__.__name__, + name, + ','.join(x.user_id for x in current - new_set)) + setattr(self, '%s_contexts' % name, new_set) + + def common_policy_auth(self, authorized_contexts, + rule_name, + func, req, *arg, **kwarg): + """Check a policy rule against a set of authorized contexts. + + This is exactly like common_policy_check, except that it + assumes any contexts not in the authorized set are in the + unauthorized set. + """ + # The unauthorized users are any not in the authorized set. + unauth = list(set(self.all_contexts) - set(authorized_contexts)) + # In case a set was passed in, convert to list for stable ordering. + authorized_contexts = list(authorized_contexts) + # Log both sets in the order we will test them to aid debugging of + # fatal=False responses. + LOG.info('Authorized users: %s', list( + x.user_id for x in authorized_contexts)) + LOG.info('Unauthorized users: %s', list(x.user_id for x in unauth)) + return self.common_policy_check(authorized_contexts, unauth, + rule_name, func, req, *arg, **kwarg) + def common_policy_check(self, authorized_contexts, unauthorized_contexts, rule_name, func, req, *arg, **kwarg): @@ -146,15 +275,25 @@ class BasePolicyTest(test.TestCase): def ensure_raises(req, *args, **kwargs): exc = self.assertRaises( exception.PolicyNotAuthorized, func, req, *arg, **kwarg) + # NOTE(danms): We may need to check a different rule_name + # as the enforced policy, based on the context we are + # using. Examples are multi-policy APIs for similar + # reasons as below. If we are passed a function for + # rule_name, call it with the context being used to + # determine the rule_name we should verify. + if callable(rule_name): + actual_rule_name = rule_name(req.environ['nova.context']) + else: + actual_rule_name = rule_name # NOTE(gmann): In case of multi-policy APIs, PolicyNotAuthorized # exception can be raised from either of the policy so checking # the error message, which includes the rule name, can mismatch. # Tests verifying the multi policy can pass rule_name as None # to skip the error message assert. - if rule_name is not None: + if actual_rule_name is not None: self.assertEqual( "Policy doesn't allow %s to be performed." % - rule_name, exc.format_message()) + actual_rule_name, exc.format_message()) # Verify all the context having allowed scope and roles pass # the policy check. for context in authorized_contexts: |