summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Cordasco <ian.cordasco@rackspace.com>2015-02-05 11:45:49 -0600
committerSteve Martinelli <stevemar@ca.ibm.com>2015-02-06 00:38:14 -0500
commitdf5d80759e7c7a05d7a8ef8120d86bb934b23b37 (patch)
tree9c96599e4cae94d2a24eb0b4ff35100d4745f69b
parentc2872aa32fa1fb861c9fab54b1eb32401ea2b0cd (diff)
downloadoslo-policy-df5d80759e7c7a05d7a8ef8120d86bb934b23b37.tar.gz
Make use of private modules
Move the parser and checks logic into oslo_policy._parser and oslo_policy._checks respectively. As a consequence, this allows us to create separate test files for those modules so we now also have oslo_policy.tests.test_parser and oslo_policy.tests.test_checks. Since those modules needed some common classes and fixtures it was also necessary to add oslo_policy.tests.base to service the three test modules. Change-Id: I656dcb8fda7b953f5def8ddfaa4d119a8c881965
-rw-r--r--oslo_policy/_checks.py310
-rw-r--r--oslo_policy/_parser.py341
-rw-r--r--oslo_policy/policy.py619
-rw-r--r--oslo_policy/tests/base.py52
-rw-r--r--oslo_policy/tests/test_checks.py402
-rw-r--r--oslo_policy/tests/test_parser.py403
-rw-r--r--oslo_policy/tests/test_policy.py818
7 files changed, 1534 insertions, 1411 deletions
diff --git a/oslo_policy/_checks.py b/oslo_policy/_checks.py
new file mode 100644
index 0000000..7860b61
--- /dev/null
+++ b/oslo_policy/_checks.py
@@ -0,0 +1,310 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import abc
+import ast
+import copy
+
+from oslo_serialization import jsonutils
+import six
+import six.moves.urllib.parse as urlparse
+import six.moves.urllib.request as urlrequest
+
+
+registered_checks = {}
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseCheck(object):
+ """Abstract base class for Check classes."""
+
+ @abc.abstractmethod
+ def __str__(self):
+ """String representation of the Check tree rooted at this node."""
+
+ pass
+
+ @abc.abstractmethod
+ def __call__(self, target, cred, enforcer):
+ """Triggers if instance of the class is called.
+
+ Performs the check. Returns False to reject the access or a
+ true value (not necessary True) to accept the access.
+ """
+
+ pass
+
+
+class FalseCheck(BaseCheck):
+ """A policy check that always returns ``False`` (disallow)."""
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "!"
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy."""
+
+ return False
+
+
+class TrueCheck(BaseCheck):
+ """A policy check that always returns ``True`` (allow)."""
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "@"
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy."""
+
+ return True
+
+
+class Check(BaseCheck):
+ """A base class to allow for user-defined policy checks.
+
+ :param kind: The kind of the check, i.e., the field before the ``:``.
+ :param match: The match of the check, i.e., the field after the ``:``.
+
+ """
+
+ def __init__(self, kind, match):
+ self.kind = kind
+ self.match = match
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "%s:%s" % (self.kind, self.match)
+
+
+class NotCheck(BaseCheck):
+ """Implements the "not" logical operator.
+
+ A policy check that inverts the result of another policy check.
+
+ :param rule: The rule to negate. Must be a Check.
+
+ """
+
+ def __init__(self, rule):
+ self.rule = rule
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "not %s" % self.rule
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Returns the logical inverse of the wrapped check.
+ """
+
+ return not self.rule(target, cred, enforcer)
+
+
+class AndCheck(BaseCheck):
+ """Implements the "and" logical operator.
+
+ A policy check that requires that a list of other checks all return True.
+
+ :param list rules: rules that will be tested.
+
+ """
+
+ def __init__(self, rules):
+ self.rules = rules
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "(%s)" % ' and '.join(str(r) for r in self.rules)
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Requires that all rules accept in order to return True.
+ """
+
+ for rule in self.rules:
+ if not rule(target, cred, enforcer):
+ return False
+
+ return True
+
+ def add_check(self, rule):
+ """Adds rule to be tested.
+
+ Allows addition of another rule to the list of rules that will
+ be tested.
+
+ :returns: self
+ :rtype: :class:`.AndCheck`
+ """
+
+ self.rules.append(rule)
+ return self
+
+
+class OrCheck(BaseCheck):
+ """Implements the "or" operator.
+
+ A policy check that requires that at least one of a list of other
+ checks returns ``True``.
+
+ :param rules: A list of rules that will be tested.
+
+ """
+
+ def __init__(self, rules):
+ self.rules = rules
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "(%s)" % ' or '.join(str(r) for r in self.rules)
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Requires that at least one rule accept in order to return True.
+ """
+
+ for rule in self.rules:
+ if rule(target, cred, enforcer):
+ return True
+ return False
+
+ def add_check(self, rule):
+ """Adds rule to be tested.
+
+ Allows addition of another rule to the list of rules that will
+ be tested. Returns the OrCheck object for convenience.
+ """
+
+ self.rules.append(rule)
+ return self
+
+
+def register(name, func=None):
+ """Register a function or :class:`.Check` class as a policy check.
+
+ :param name: Gives the name of the check type, e.g., "rule",
+ "role", etc. If name is ``None``, a default check type
+ will be registered.
+ :param func: If given, provides the function or class to register.
+ If not given, returns a function taking one argument
+ to specify the function or class to register,
+ allowing use as a decorator.
+ """
+
+ # Perform the actual decoration by registering the function or
+ # class. Returns the function or class for compliance with the
+ # decorator interface.
+ def decorator(func):
+ registered_checks[name] = func
+ return func
+
+ # If the function or class is given, do the registration
+ if func:
+ return decorator(func)
+
+ return decorator
+
+
+@register("rule")
+class RuleCheck(Check):
+ """Recursively checks credentials based on the defined rules."""
+
+ def __call__(self, target, creds, enforcer):
+ try:
+ return enforcer.rules[self.match](target, creds, enforcer)
+ except KeyError:
+ # We don't have any matching rule; fail closed
+ return False
+
+
+@register("role")
+class RoleCheck(Check):
+ """Check that there is a matching role in the ``creds`` dict."""
+
+ def __call__(self, target, creds, enforcer):
+ return self.match.lower() in [x.lower() for x in creds['roles']]
+
+
+@register('http')
+class HttpCheck(Check):
+ """Check ``http:`` rules by calling to a remote server.
+
+ This example implementation simply verifies that the response
+ is exactly ``True``.
+ """
+
+ def __call__(self, target, creds, enforcer):
+ url = ('http:' + self.match) % target
+
+ # Convert instances of object() in target temporarily to
+ # empty dict to avoid circular reference detection
+ # errors in jsonutils.dumps().
+ temp_target = copy.deepcopy(target)
+ for key in target.keys():
+ element = target.get(key)
+ if type(element) is object:
+ temp_target[key] = {}
+
+ data = {'target': jsonutils.dumps(temp_target),
+ 'credentials': jsonutils.dumps(creds)}
+ post_data = urlparse.urlencode(data)
+ f = urlrequest.urlopen(url, post_data)
+ return f.read() == "True"
+
+
+@register(None)
+class GenericCheck(Check):
+ """Check an individual match.
+
+ Matches look like:
+
+ - tenant:%(tenant_id)s
+ - role:compute:admin
+ - True:%(user.enabled)s
+ - 'Member':%(role.name)s
+ """
+
+ def __call__(self, target, creds, enforcer):
+ try:
+ match = self.match % target
+ except KeyError:
+ # While doing GenericCheck if key not
+ # present in Target return false
+ return False
+
+ try:
+ # Try to interpret self.kind as a literal
+ leftval = ast.literal_eval(self.kind)
+ except ValueError:
+ try:
+ kind_parts = self.kind.split('.')
+ leftval = creds
+ for kind_part in kind_parts:
+ leftval = leftval[kind_part]
+ except KeyError:
+ return False
+ return match == six.text_type(leftval)
diff --git a/oslo_policy/_parser.py b/oslo_policy/_parser.py
new file mode 100644
index 0000000..9b296be
--- /dev/null
+++ b/oslo_policy/_parser.py
@@ -0,0 +1,341 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import re
+
+from oslo_log import log as logging
+import six
+
+from oslo_policy import _checks
+from oslo_policy import _i18n
+
+
+LOG = logging.getLogger(__name__)
+
+_, _LE, _LI = _i18n._, _i18n._LE, _i18n._LI
+
+
+def reducer(*tokens):
+ """Decorator for reduction methods.
+
+ Arguments are a sequence of tokens, in order, which should trigger running
+ this reduction method.
+ """
+
+ def decorator(func):
+ # Make sure we have a list of reducer sequences
+ if not hasattr(func, 'reducers'):
+ func.reducers = []
+
+ # Add the tokens to the list of reducer sequences
+ func.reducers.append(list(tokens))
+
+ return func
+
+ return decorator
+
+
+class ParseStateMeta(type):
+ """Metaclass for the :class:`.ParseState` class.
+
+ Facilitates identifying reduction methods.
+ """
+
+ def __new__(mcs, name, bases, cls_dict):
+ """Create the class.
+
+ Injects the 'reducers' list, a list of tuples matching token sequences
+ to the names of the corresponding reduction methods.
+ """
+
+ reducers = []
+
+ for key, value in cls_dict.items():
+ if not hasattr(value, 'reducers'):
+ continue
+ for reduction in value.reducers:
+ reducers.append((reduction, key))
+
+ cls_dict['reducers'] = reducers
+
+ return super(ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict)
+
+
+@six.add_metaclass(ParseStateMeta)
+class ParseState(object):
+ """Implement the core of parsing the policy language.
+
+ Uses a greedy reduction algorithm to reduce a sequence of tokens into
+ a single terminal, the value of which will be the root of the
+ :class:`Check` tree.
+
+ .. note::
+
+ Error reporting is rather lacking. The best we can get with this
+ parser formulation is an overall "parse failed" error. Fortunately, the
+ policy language is simple enough that this shouldn't be that big a
+ problem.
+ """
+
+ def __init__(self):
+ """Initialize the ParseState."""
+
+ self.tokens = []
+ self.values = []
+
+ def reduce(self):
+ """Perform a greedy reduction of the token stream.
+
+ If a reducer method matches, it will be executed, then the
+ :meth:`reduce` method will be called recursively to search for any more
+ possible reductions.
+ """
+
+ for reduction, methname in self.reducers:
+ if (len(self.tokens) >= len(reduction) and
+ self.tokens[-len(reduction):] == reduction):
+ # Get the reduction method
+ meth = getattr(self, methname)
+
+ # Reduce the token stream
+ results = meth(*self.values[-len(reduction):])
+
+ # Update the tokens and values
+ self.tokens[-len(reduction):] = [r[0] for r in results]
+ self.values[-len(reduction):] = [r[1] for r in results]
+
+ # Check for any more reductions
+ return self.reduce()
+
+ def shift(self, tok, value):
+ """Adds one more token to the state.
+
+ Calls :meth:`reduce`.
+ """
+
+ self.tokens.append(tok)
+ self.values.append(value)
+
+ # Do a greedy reduce...
+ self.reduce()
+
+ @property
+ def result(self):
+ """Obtain the final result of the parse.
+
+ :raises ValueError: If the parse failed to reduce to a single result.
+ """
+
+ if len(self.values) != 1:
+ raise ValueError("Could not parse rule")
+ return self.values[0]
+
+ @reducer('(', 'check', ')')
+ @reducer('(', 'and_expr', ')')
+ @reducer('(', 'or_expr', ')')
+ def _wrap_check(self, _p1, check, _p2):
+ """Turn parenthesized expressions into a 'check' token."""
+
+ return [('check', check)]
+
+ @reducer('check', 'and', 'check')
+ def _make_and_expr(self, check1, _and, check2):
+ """Create an 'and_expr'.
+
+ Join two checks by the 'and' operator.
+ """
+
+ return [('and_expr', _checks.AndCheck([check1, check2]))]
+
+ @reducer('and_expr', 'and', 'check')
+ def _extend_and_expr(self, and_expr, _and, check):
+ """Extend an 'and_expr' by adding one more check."""
+
+ return [('and_expr', and_expr.add_check(check))]
+
+ @reducer('check', 'or', 'check')
+ def _make_or_expr(self, check1, _or, check2):
+ """Create an 'or_expr'.
+
+ Join two checks by the 'or' operator.
+ """
+
+ return [('or_expr', _checks.OrCheck([check1, check2]))]
+
+ @reducer('or_expr', 'or', 'check')
+ def _extend_or_expr(self, or_expr, _or, check):
+ """Extend an 'or_expr' by adding one more check."""
+
+ return [('or_expr', or_expr.add_check(check))]
+
+ @reducer('not', 'check')
+ def _make_not_expr(self, _not, check):
+ """Invert the result of another check."""
+
+ return [('check', _checks.NotCheck(check))]
+
+
+def _parse_check(rule):
+ """Parse a single base check rule into an appropriate Check object."""
+
+ # Handle the special checks
+ if rule == '!':
+ return _checks.FalseCheck()
+ elif rule == '@':
+ return _checks.TrueCheck()
+
+ try:
+ kind, match = rule.split(':', 1)
+ except Exception:
+ LOG.exception(_LE("Failed to understand rule %s") % rule)
+ # If the rule is invalid, we'll fail closed
+ return _checks.FalseCheck()
+
+ # Find what implements the check
+ if kind in _checks.registered_checks:
+ return _checks.registered_checks[kind](kind, match)
+ elif None in _checks.registered_checks:
+ return _checks.registered_checks[None](kind, match)
+ else:
+ LOG.error(_LE("No handler for matches of kind %s") % kind)
+ return _checks.FalseCheck()
+
+
+def _parse_list_rule(rule):
+ """Translates the old list-of-lists syntax into a tree of Check objects.
+
+ Provided for backwards compatibility.
+ """
+
+ # Empty rule defaults to True
+ if not rule:
+ return _checks.TrueCheck()
+
+ # Outer list is joined by "or"; inner list by "and"
+ or_list = []
+ for inner_rule in rule:
+ # Elide empty inner lists
+ if not inner_rule:
+ continue
+
+ # Handle bare strings
+ if isinstance(inner_rule, six.string_types):
+ inner_rule = [inner_rule]
+
+ # Parse the inner rules into Check objects
+ and_list = [_parse_check(r) for r in inner_rule]
+
+ # Append the appropriate check to the or_list
+ if len(and_list) == 1:
+ or_list.append(and_list[0])
+ else:
+ or_list.append(_checks.AndCheck(and_list))
+
+ # If we have only one check, omit the "or"
+ if not or_list:
+ return _checks.FalseCheck()
+ elif len(or_list) == 1:
+ return or_list[0]
+
+ return _checks.OrCheck(or_list)
+
+
+# Used for tokenizing the policy language
+_tokenize_re = re.compile(r'\s+')
+
+
+def _parse_tokenize(rule):
+ """Tokenizer for the policy language.
+
+ Most of the single-character tokens are specified in the
+ _tokenize_re; however, parentheses need to be handled specially,
+ because they can appear inside a check string. Thankfully, those
+ parentheses that appear inside a check string can never occur at
+ the very beginning or end ("%(variable)s" is the correct syntax).
+ """
+
+ for tok in _tokenize_re.split(rule):
+ # Skip empty tokens
+ if not tok or tok.isspace():
+ continue
+
+ # Handle leading parens on the token
+ clean = tok.lstrip('(')
+ for i in range(len(tok) - len(clean)):
+ yield '(', '('
+
+ # If it was only parentheses, continue
+ if not clean:
+ continue
+ else:
+ tok = clean
+
+ # Handle trailing parens on the token
+ clean = tok.rstrip(')')
+ trail = len(tok) - len(clean)
+
+ # Yield the cleaned token
+ lowered = clean.lower()
+ if lowered in ('and', 'or', 'not'):
+ # Special tokens
+ yield lowered, clean
+ elif clean:
+ # Not a special token, but not composed solely of ')'
+ if len(tok) >= 2 and ((tok[0], tok[-1]) in
+ [('"', '"'), ("'", "'")]):
+ # It's a quoted string
+ yield 'string', tok[1:-1]
+ else:
+ yield 'check', _parse_check(clean)
+
+ # Yield the trailing parens
+ for i in range(trail):
+ yield ')', ')'
+
+
+def _parse_text_rule(rule):
+ """Parses policy to the tree.
+
+ Translates a policy written in the policy language into a tree of
+ Check objects.
+ """
+
+ # Empty rule means always accept
+ if not rule:
+ return _checks.TrueCheck()
+
+ # Parse the token stream
+ state = ParseState()
+ for tok, value in _parse_tokenize(rule):
+ state.shift(tok, value)
+
+ try:
+ return state.result
+ except ValueError:
+ # Couldn't parse the rule
+ LOG.exception(_LE("Failed to understand rule %s") % rule)
+
+ # Fail closed
+ return _checks.FalseCheck()
+
+
+def parse_rule(rule):
+ """Parses a policy rule into a tree of :class:`.Check` objects."""
+
+ # If the rule is a string, it's in the policy language
+ if isinstance(rule, six.string_types):
+ return _parse_text_rule(rule)
+ return _parse_list_rule(rule)
diff --git a/oslo_policy/policy.py b/oslo_policy/policy.py
index 583593d..343d622 100644
--- a/oslo_policy/policy.py
+++ b/oslo_policy/policy.py
@@ -202,22 +202,19 @@ by setting the ``policy_default_rule`` configuration setting to the
desired rule name.
"""
-import abc
-import ast
-import copy
import os
-import re
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
import six
-import six.moves.urllib.parse as urlparse
-import six.moves.urllib.request as urlrequest
-from oslo_policy._i18n import _, _LE, _LI
+from oslo_policy import _checks
+from oslo_policy import _i18n
+from oslo_policy import _parser
from oslo_policy.openstack.common import fileutils
+_, _LI = _i18n._, _i18n._LI
_opts = [
cfg.StrOpt('policy_file',
@@ -243,7 +240,9 @@ _opts = [
LOG = logging.getLogger(__name__)
-_checks = {}
+register = _checks.register
+BaseCheck = _checks.BaseCheck
+Check = _checks.Check
class PolicyNotAuthorized(Exception):
@@ -262,7 +261,7 @@ class Rules(dict):
"""Allow loading of JSON rule data."""
# Suck in the JSON data and parse the rules
- rules = dict((k, _parse_rule(v)) for k, v in
+ rules = dict((k, _parser.parse_rule(v)) for k, v in
jsonutils.loads(data).items())
return cls(rules, default_rule)
@@ -284,7 +283,7 @@ class Rules(dict):
if not self.default_rule:
raise KeyError(key)
- if isinstance(self.default_rule, BaseCheck):
+ if isinstance(self.default_rule, _checks.BaseCheck):
return self.default_rule
# We need to check this or we can get infinite recursion
@@ -301,7 +300,7 @@ class Rules(dict):
out_rules = {}
for key, value in self.items():
# Use empty string for singleton TrueCheck instances
- if isinstance(value, TrueCheck):
+ if isinstance(value, _checks.TrueCheck):
out_rules[key] = ''
else:
out_rules[key] = str(value)
@@ -460,7 +459,7 @@ class Enforcer(object):
self.load_rules()
# Allow the rule to be a Check tree
- if isinstance(rule, BaseCheck):
+ if isinstance(rule, _checks.BaseCheck):
result = rule(target, creds, self)
elif not self.rules:
# No rules to reference means we're going to fail closed
@@ -482,599 +481,3 @@ class Enforcer(object):
raise PolicyNotAuthorized(rule)
return result
-
-
-@six.add_metaclass(abc.ABCMeta)
-class BaseCheck(object):
- """Abstract base class for Check classes."""
-
- @abc.abstractmethod
- def __str__(self):
- """String representation of the Check tree rooted at this node."""
-
- pass
-
- @abc.abstractmethod
- def __call__(self, target, cred, enforcer):
- """Triggers if instance of the class is called.
-
- Performs the check. Returns False to reject the access or a
- true value (not necessary True) to accept the access.
- """
-
- pass
-
-
-class FalseCheck(BaseCheck):
- """A policy check that always returns ``False`` (disallow)."""
-
- def __str__(self):
- """Return a string representation of this check."""
-
- return "!"
-
- def __call__(self, target, cred, enforcer):
- """Check the policy."""
-
- return False
-
-
-class TrueCheck(BaseCheck):
- """A policy check that always returns ``True`` (allow)."""
-
- def __str__(self):
- """Return a string representation of this check."""
-
- return "@"
-
- def __call__(self, target, cred, enforcer):
- """Check the policy."""
-
- return True
-
-
-class Check(BaseCheck):
- """A base class to allow for user-defined policy checks.
-
- :param kind: The kind of the check, i.e., the field before the ``:``.
- :param match: The match of the check, i.e., the field after the ``:``.
-
- """
-
- def __init__(self, kind, match):
- self.kind = kind
- self.match = match
-
- def __str__(self):
- """Return a string representation of this check."""
-
- return "%s:%s" % (self.kind, self.match)
-
-
-class NotCheck(BaseCheck):
- """Implements the "not" logical operator.
-
- A policy check that inverts the result of another policy check.
-
- :param rule: The rule to negate. Must be a Check.
-
- """
-
- def __init__(self, rule):
- self.rule = rule
-
- def __str__(self):
- """Return a string representation of this check."""
-
- return "not %s" % self.rule
-
- def __call__(self, target, cred, enforcer):
- """Check the policy.
-
- Returns the logical inverse of the wrapped check.
- """
-
- return not self.rule(target, cred, enforcer)
-
-
-class AndCheck(BaseCheck):
- """Implements the "and" logical operator.
-
- A policy check that requires that a list of other checks all return True.
-
- :param list rules: rules that will be tested.
-
- """
-
- def __init__(self, rules):
- self.rules = rules
-
- def __str__(self):
- """Return a string representation of this check."""
-
- return "(%s)" % ' and '.join(str(r) for r in self.rules)
-
- def __call__(self, target, cred, enforcer):
- """Check the policy.
-
- Requires that all rules accept in order to return True.
- """
-
- for rule in self.rules:
- if not rule(target, cred, enforcer):
- return False
-
- return True
-
- def add_check(self, rule):
- """Adds rule to be tested.
-
- Allows addition of another rule to the list of rules that will
- be tested.
-
- :returns: self
- :rtype: :class:`.AndCheck`
- """
-
- self.rules.append(rule)
- return self
-
-
-class OrCheck(BaseCheck):
- """Implements the "or" operator.
-
- A policy check that requires that at least one of a list of other
- checks returns ``True``.
-
- :param rules: A list of rules that will be tested.
-
- """
-
- def __init__(self, rules):
- self.rules = rules
-
- def __str__(self):
- """Return a string representation of this check."""
-
- return "(%s)" % ' or '.join(str(r) for r in self.rules)
-
- def __call__(self, target, cred, enforcer):
- """Check the policy.
-
- Requires that at least one rule accept in order to return True.
- """
-
- for rule in self.rules:
- if rule(target, cred, enforcer):
- return True
- return False
-
- def add_check(self, rule):
- """Adds rule to be tested.
-
- Allows addition of another rule to the list of rules that will
- be tested. Returns the OrCheck object for convenience.
- """
-
- self.rules.append(rule)
- return self
-
-
-def _parse_check(rule):
- """Parse a single base check rule into an appropriate Check object."""
-
- # Handle the special checks
- if rule == '!':
- return FalseCheck()
- elif rule == '@':
- return TrueCheck()
-
- try:
- kind, match = rule.split(':', 1)
- except Exception:
- LOG.exception(_LE("Failed to understand rule %s") % rule)
- # If the rule is invalid, we'll fail closed
- return FalseCheck()
-
- # Find what implements the check
- if kind in _checks:
- return _checks[kind](kind, match)
- elif None in _checks:
- return _checks[None](kind, match)
- else:
- LOG.error(_LE("No handler for matches of kind %s") % kind)
- return FalseCheck()
-
-
-def _parse_list_rule(rule):
- """Translates the old list-of-lists syntax into a tree of Check objects.
-
- Provided for backwards compatibility.
- """
-
- # Empty rule defaults to True
- if not rule:
- return TrueCheck()
-
- # Outer list is joined by "or"; inner list by "and"
- or_list = []
- for inner_rule in rule:
- # Elide empty inner lists
- if not inner_rule:
- continue
-
- # Handle bare strings
- if isinstance(inner_rule, six.string_types):
- inner_rule = [inner_rule]
-
- # Parse the inner rules into Check objects
- and_list = [_parse_check(r) for r in inner_rule]
-
- # Append the appropriate check to the or_list
- if len(and_list) == 1:
- or_list.append(and_list[0])
- else:
- or_list.append(AndCheck(and_list))
-
- # If we have only one check, omit the "or"
- if not or_list:
- return FalseCheck()
- elif len(or_list) == 1:
- return or_list[0]
-
- return OrCheck(or_list)
-
-
-# Used for tokenizing the policy language
-_tokenize_re = re.compile(r'\s+')
-
-
-def _parse_tokenize(rule):
- """Tokenizer for the policy language.
-
- Most of the single-character tokens are specified in the
- _tokenize_re; however, parentheses need to be handled specially,
- because they can appear inside a check string. Thankfully, those
- parentheses that appear inside a check string can never occur at
- the very beginning or end ("%(variable)s" is the correct syntax).
- """
-
- for tok in _tokenize_re.split(rule):
- # Skip empty tokens
- if not tok or tok.isspace():
- continue
-
- # Handle leading parens on the token
- clean = tok.lstrip('(')
- for i in range(len(tok) - len(clean)):
- yield '(', '('
-
- # If it was only parentheses, continue
- if not clean:
- continue
- else:
- tok = clean
-
- # Handle trailing parens on the token
- clean = tok.rstrip(')')
- trail = len(tok) - len(clean)
-
- # Yield the cleaned token
- lowered = clean.lower()
- if lowered in ('and', 'or', 'not'):
- # Special tokens
- yield lowered, clean
- elif clean:
- # Not a special token, but not composed solely of ')'
- if len(tok) >= 2 and ((tok[0], tok[-1]) in
- [('"', '"'), ("'", "'")]):
- # It's a quoted string
- yield 'string', tok[1:-1]
- else:
- yield 'check', _parse_check(clean)
-
- # Yield the trailing parens
- for i in range(trail):
- yield ')', ')'
-
-
-class _ParseStateMeta(type):
- """Metaclass for the :class:`.ParseState` class.
-
- Facilitates identifying reduction methods.
- """
-
- def __new__(mcs, name, bases, cls_dict):
- """Create the class.
-
- Injects the 'reducers' list, a list of tuples matching token sequences
- to the names of the corresponding reduction methods.
- """
-
- reducers = []
-
- for key, value in cls_dict.items():
- if not hasattr(value, 'reducers'):
- continue
- for reduction in value.reducers:
- reducers.append((reduction, key))
-
- cls_dict['reducers'] = reducers
-
- return super(_ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict)
-
-
-def reducer(*tokens):
- """Decorator for reduction methods.
-
- Arguments are a sequence of tokens, in order, which should trigger running
- this reduction method.
- """
-
- def decorator(func):
- # Make sure we have a list of reducer sequences
- if not hasattr(func, 'reducers'):
- func.reducers = []
-
- # Add the tokens to the list of reducer sequences
- func.reducers.append(list(tokens))
-
- return func
-
- return decorator
-
-
-@six.add_metaclass(_ParseStateMeta)
-class _ParseState(object):
- """Implement the core of parsing the policy language.
-
- Uses a greedy reduction algorithm to reduce a sequence of tokens into
- a single terminal, the value of which will be the root of the
- :class:`Check` tree.
-
- .. note::
-
- Error reporting is rather lacking. The best we can get with this
- parser formulation is an overall "parse failed" error. Fortunately, the
- policy language is simple enough that this shouldn't be that big a
- problem.
- """
-
- def __init__(self):
- """Initialize the _ParseState."""
-
- self.tokens = []
- self.values = []
-
- def reduce(self):
- """Perform a greedy reduction of the token stream.
-
- If a reducer method matches, it will be executed, then the
- :meth:`reduce` method will be called recursively to search for any more
- possible reductions.
- """
-
- for reduction, methname in self.reducers:
- if (len(self.tokens) >= len(reduction) and
- self.tokens[-len(reduction):] == reduction):
- # Get the reduction method
- meth = getattr(self, methname)
-
- # Reduce the token stream
- results = meth(*self.values[-len(reduction):])
-
- # Update the tokens and values
- self.tokens[-len(reduction):] = [r[0] for r in results]
- self.values[-len(reduction):] = [r[1] for r in results]
-
- # Check for any more reductions
- return self.reduce()
-
- def shift(self, tok, value):
- """Adds one more token to the state.
-
- Calls :meth:`reduce`.
- """
-
- self.tokens.append(tok)
- self.values.append(value)
-
- # Do a greedy reduce...
- self.reduce()
-
- @property
- def result(self):
- """Obtain the final result of the parse.
-
- :raises ValueError: If the parse failed to reduce to a single result.
- """
-
- if len(self.values) != 1:
- raise ValueError("Could not parse rule")
- return self.values[0]
-
- @reducer('(', 'check', ')')
- @reducer('(', 'and_expr', ')')
- @reducer('(', 'or_expr', ')')
- def _wrap_check(self, _p1, check, _p2):
- """Turn parenthesized expressions into a 'check' token."""
-
- return [('check', check)]
-
- @reducer('check', 'and', 'check')
- def _make_and_expr(self, check1, _and, check2):
- """Create an 'and_expr'.
-
- Join two checks by the 'and' operator.
- """
-
- return [('and_expr', AndCheck([check1, check2]))]
-
- @reducer('and_expr', 'and', 'check')
- def _extend_and_expr(self, and_expr, _and, check):
- """Extend an 'and_expr' by adding one more check."""
-
- return [('and_expr', and_expr.add_check(check))]
-
- @reducer('check', 'or', 'check')
- def _make_or_expr(self, check1, _or, check2):
- """Create an 'or_expr'.
-
- Join two checks by the 'or' operator.
- """
-
- return [('or_expr', OrCheck([check1, check2]))]
-
- @reducer('or_expr', 'or', 'check')
- def _extend_or_expr(self, or_expr, _or, check):
- """Extend an 'or_expr' by adding one more check."""
-
- return [('or_expr', or_expr.add_check(check))]
-
- @reducer('not', 'check')
- def _make_not_expr(self, _not, check):
- """Invert the result of another check."""
-
- return [('check', NotCheck(check))]
-
-
-def _parse_text_rule(rule):
- """Parses policy to the tree.
-
- Translates a policy written in the policy language into a tree of
- Check objects.
- """
-
- # Empty rule means always accept
- if not rule:
- return TrueCheck()
-
- # Parse the token stream
- state = _ParseState()
- for tok, value in _parse_tokenize(rule):
- state.shift(tok, value)
-
- try:
- return state.result
- except ValueError:
- # Couldn't parse the rule
- LOG.exception(_LE("Failed to understand rule %s") % rule)
-
- # Fail closed
- return FalseCheck()
-
-
-def _parse_rule(rule):
- """Parses a policy rule into a tree of :class:`.Check` objects."""
-
- # If the rule is a string, it's in the policy language
- if isinstance(rule, six.string_types):
- return _parse_text_rule(rule)
- return _parse_list_rule(rule)
-
-
-def register(name, func=None):
- """Register a function or :class:`.Check` class as a policy check.
-
- :param name: Gives the name of the check type, e.g., "rule",
- "role", etc. If name is ``None``, a default check type
- will be registered.
- :param func: If given, provides the function or class to register.
- If not given, returns a function taking one argument
- to specify the function or class to register,
- allowing use as a decorator.
- """
-
- # Perform the actual decoration by registering the function or
- # class. Returns the function or class for compliance with the
- # decorator interface.
- def decorator(func):
- _checks[name] = func
- return func
-
- # If the function or class is given, do the registration
- if func:
- return decorator(func)
-
- return decorator
-
-
-@register("rule")
-class RuleCheck(Check):
- """Recursively checks credentials based on the defined rules."""
-
- def __call__(self, target, creds, enforcer):
- try:
- return enforcer.rules[self.match](target, creds, enforcer)
- except KeyError:
- # We don't have any matching rule; fail closed
- return False
-
-
-@register("role")
-class RoleCheck(Check):
- """Check that there is a matching role in the ``creds`` dict."""
-
- def __call__(self, target, creds, enforcer):
- return self.match.lower() in [x.lower() for x in creds['roles']]
-
-
-@register('http')
-class HttpCheck(Check):
- """Check ``http:`` rules by calling to a remote server.
-
- This example implementation simply verifies that the response
- is exactly ``True``.
- """
-
- def __call__(self, target, creds, enforcer):
- url = ('http:' + self.match) % target
-
- # Convert instances of object() in target temporarily to
- # empty dict to avoid circular reference detection
- # errors in jsonutils.dumps().
- temp_target = copy.deepcopy(target)
- for key in target.keys():
- element = target.get(key)
- if type(element) is object:
- temp_target[key] = {}
-
- data = {'target': jsonutils.dumps(temp_target),
- 'credentials': jsonutils.dumps(creds)}
- post_data = urlparse.urlencode(data)
- f = urlrequest.urlopen(url, post_data)
- return f.read() == "True"
-
-
-@register(None)
-class GenericCheck(Check):
- """Check an individual match.
-
- Matches look like:
-
- - tenant:%(tenant_id)s
- - role:compute:admin
- - True:%(user.enabled)s
- - 'Member':%(role.name)s
- """
-
- def __call__(self, target, creds, enforcer):
- try:
- match = self.match % target
- except KeyError:
- # While doing GenericCheck if key not
- # present in Target return false
- return False
-
- try:
- # Try to interpret self.kind as a literal
- leftval = ast.literal_eval(self.kind)
- except ValueError:
- try:
- kind_parts = self.kind.split('.')
- leftval = creds
- for kind_part in kind_parts:
- leftval = leftval[kind_part]
- except KeyError:
- return False
- return match == six.text_type(leftval)
diff --git a/oslo_policy/tests/base.py b/oslo_policy/tests/base.py
new file mode 100644
index 0000000..e166cb2
--- /dev/null
+++ b/oslo_policy/tests/base.py
@@ -0,0 +1,52 @@
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import os.path
+
+from oslo_concurrency.fixture import lockutils
+from oslo_config import cfg
+from oslo_config import fixture as config
+from oslotest import base as test_base
+
+from oslo_policy import policy
+
+TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ '..', 'tests/var'))
+ENFORCER = policy.Enforcer(cfg.CONF)
+
+
+class PolicyBaseTestCase(test_base.BaseTestCase):
+
+ def setUp(self):
+ super(PolicyBaseTestCase, self).setUp()
+ # NOTE(bnemec): Many of these tests use the same ENFORCER object, so
+ # I believe we need to serialize them.
+ self.useFixture(lockutils.LockFixture('policy-lock'))
+ self.CONF = self.useFixture(config.Config()).conf
+ self.CONF(args=['--config-dir', TEST_VAR_DIR])
+ self.enforcer = ENFORCER
+ self.addCleanup(self.enforcer.clear)
+
+
+class FakeCheck(policy.BaseCheck):
+ def __init__(self, result=None):
+ self.result = result
+
+ def __str__(self):
+ return str(self.result)
+
+ def __call__(self, target, creds, enforcer):
+ if self.result is not None:
+ return self.result
+ return (target, creds, enforcer)
diff --git a/oslo_policy/tests/test_checks.py b/oslo_policy/tests/test_checks.py
new file mode 100644
index 0000000..93818e4
--- /dev/null
+++ b/oslo_policy/tests/test_checks.py
@@ -0,0 +1,402 @@
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import mock
+from oslo_serialization import jsonutils
+from oslotest import base as test_base
+import six
+import six.moves.urllib.parse as urlparse
+import six.moves.urllib.request as urlrequest
+
+from oslo_policy import _checks
+from oslo_policy import policy
+from oslo_policy.tests import base
+
+
+ENFORCER = base.ENFORCER
+
+
+class CheckRegisterTestCase(test_base.BaseTestCase):
+ @mock.patch.object(_checks, 'registered_checks', {})
+ def test_register_check(self):
+ class TestCheck(_checks.Check):
+ pass
+
+ policy.register('spam', TestCheck)
+
+ self.assertEqual(_checks.registered_checks, dict(spam=TestCheck))
+
+ @mock.patch.object(_checks, 'registered_checks', {})
+ def test_register_check_decorator(self):
+ @policy.register('spam')
+ class TestCheck(_checks.Check):
+ pass
+
+ self.assertEqual(_checks.registered_checks, dict(spam=TestCheck))
+
+
+class RuleCheckTestCase(test_base.BaseTestCase):
+ @mock.patch.object(ENFORCER, 'rules', {})
+ def test_rule_missing(self):
+ check = _checks.RuleCheck('rule', 'spam')
+
+ self.assertEqual(check('target', 'creds', ENFORCER), False)
+
+ @mock.patch.object(ENFORCER, 'rules',
+ dict(spam=mock.Mock(return_value=False)))
+ def test_rule_false(self):
+ enforcer = ENFORCER
+
+ check = _checks.RuleCheck('rule', 'spam')
+
+ self.assertEqual(check('target', 'creds', enforcer), False)
+ enforcer.rules['spam'].assert_called_once_with('target', 'creds',
+ enforcer)
+
+ @mock.patch.object(ENFORCER, 'rules',
+ dict(spam=mock.Mock(return_value=True)))
+ def test_rule_true(self):
+ enforcer = ENFORCER
+ check = _checks.RuleCheck('rule', 'spam')
+
+ self.assertEqual(check('target', 'creds', enforcer), True)
+ enforcer.rules['spam'].assert_called_once_with('target', 'creds',
+ enforcer)
+
+
+class RoleCheckTestCase(base.PolicyBaseTestCase):
+ def test_accept(self):
+ check = _checks.RoleCheck('role', 'sPaM')
+
+ self.assertEqual(check('target', dict(roles=['SpAm']),
+ self.enforcer), True)
+
+ def test_reject(self):
+ check = _checks.RoleCheck('role', 'spam')
+
+ self.assertEqual(check('target', dict(roles=[]), self.enforcer), False)
+
+
+class HttpCheckTestCase(base.PolicyBaseTestCase):
+ def decode_post_data(self, post_data):
+ result = {}
+ for item in post_data.split('&'):
+ key, _sep, value = item.partition('=')
+ result[key] = jsonutils.loads(urlparse.unquote_plus(value))
+
+ return result
+
+ @mock.patch.object(urlrequest, 'urlopen',
+ return_value=six.StringIO('True'))
+ def test_accept(self, mock_urlopen):
+ check = _checks.HttpCheck('http', '//example.com/%(name)s')
+ self.assertEqual(check(dict(name='target', spam='spammer'),
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer),
+ True)
+ self.assertEqual(mock_urlopen.call_count, 1)
+
+ args = mock_urlopen.call_args[0]
+
+ self.assertEqual(args[0], 'http://example.com/target')
+ self.assertEqual(self.decode_post_data(args[1]), dict(
+ target=dict(name='target', spam='spammer'),
+ credentials=dict(user='user', roles=['a', 'b', 'c']),
+ ))
+
+ @mock.patch.object(urlrequest, 'urlopen',
+ return_value=six.StringIO('other'))
+ def test_reject(self, mock_urlopen):
+ check = _checks.HttpCheck('http', '//example.com/%(name)s')
+
+ self.assertEqual(check(dict(name='target', spam='spammer'),
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer),
+ False)
+ self.assertEqual(mock_urlopen.call_count, 1)
+
+ args = mock_urlopen.call_args[0]
+
+ self.assertEqual(args[0], 'http://example.com/target')
+ self.assertEqual(self.decode_post_data(args[1]), dict(
+ target=dict(name='target', spam='spammer'),
+ credentials=dict(user='user', roles=['a', 'b', 'c']),
+ ))
+
+ @mock.patch.object(urlrequest, 'urlopen',
+ return_value=six.StringIO('True'))
+ def test_http_with_objects_in_target(self, mock_urlopen):
+
+ check = _checks.HttpCheck('http', '//example.com/%(name)s')
+ target = {'a': object(),
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertEqual(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer),
+ True)
+
+ @mock.patch.object(urlrequest, 'urlopen',
+ return_value=six.StringIO('True'))
+ def test_http_with_strings_in_target(self, mock_urlopen):
+ check = _checks.HttpCheck('http', '//example.com/%(name)s')
+ target = {'a': 'some_string',
+ 'name': 'target',
+ 'b': 'test data'}
+ self.assertEqual(check(target,
+ dict(user='user', roles=['a', 'b', 'c']),
+ self.enforcer),
+ True)
+
+
+class GenericCheckTestCase(base.PolicyBaseTestCase):
+ def test_no_cred(self):
+ check = _checks.GenericCheck('name', '%(name)s')
+
+ self.assertEqual(check(dict(name='spam'), {}, self.enforcer), False)
+
+ def test_cred_mismatch(self):
+ check = _checks.GenericCheck('name', '%(name)s')
+
+ self.assertEqual(check(dict(name='spam'),
+ dict(name='ham'),
+ self.enforcer), False)
+
+ def test_accept(self):
+ check = _checks.GenericCheck('name', '%(name)s')
+
+ self.assertEqual(check(dict(name='spam'),
+ dict(name='spam'),
+ self.enforcer), True)
+
+ def test_no_key_match_in_target(self):
+ check = _checks.GenericCheck('name', '%(name)s')
+
+ self.assertEqual(check(dict(name1='spam'),
+ dict(name='spam'),
+ self.enforcer), False)
+
+ def test_constant_string_mismatch(self):
+ check = _checks.GenericCheck("'spam'", '%(name)s')
+
+ self.assertEqual(check(dict(name='ham'),
+ {},
+ self.enforcer), False)
+
+ def test_constant_string_accept(self):
+ check = _checks.GenericCheck("'spam'", '%(name)s')
+
+ self.assertEqual(check(dict(name='spam'),
+ {},
+ self.enforcer), True)
+
+ def test_constant_literal_mismatch(self):
+ check = _checks.GenericCheck("True", '%(enabled)s')
+
+ self.assertEqual(check(dict(enabled=False),
+ {},
+ self.enforcer), False)
+
+ def test_constant_literal_accept(self):
+ check = _checks.GenericCheck("True", '%(enabled)s')
+
+ self.assertEqual(check(dict(enabled=True),
+ {},
+ self.enforcer), True)
+
+ def test_deep_credentials_dictionary_lookup(self):
+ check = _checks.GenericCheck("a.b.c.d", 'APPLES')
+
+ credentials = {'a': {'b': {'c': {'d': 'APPLES'}}}}
+
+ self.assertEqual(check({},
+ credentials,
+ self.enforcer), True)
+
+ def test_missing_credentials_dictionary_lookup(self):
+ credentials = {'a': 'APPLES', 'o': {'t': 'ORANGES'}}
+
+ # First a valid check - rest of case is expecting failures
+ # Should prove the basic credentials structure before we test
+ # for failure cases.
+ check = _checks.GenericCheck("o.t", 'ORANGES')
+ self.assertEqual(check({},
+ credentials,
+ self.enforcer), True)
+
+ # Case where final key is missing
+ check = _checks.GenericCheck("o.v", 'ORANGES')
+ self.assertEqual(check({},
+ credentials,
+ self.enforcer), False)
+
+ # Attempt to access key under a missing dictionary
+ check = _checks.GenericCheck("q.v", 'APPLES')
+ self.assertEqual(check({},
+ credentials,
+ self.enforcer), False)
+
+
+class FalseCheckTestCase(test_base.BaseTestCase):
+ def test_str(self):
+ check = _checks.FalseCheck()
+
+ self.assertEqual(str(check), '!')
+
+ def test_call(self):
+ check = _checks.FalseCheck()
+
+ self.assertEqual(check('target', 'creds', None), False)
+
+
+class TrueCheckTestCase(test_base.BaseTestCase):
+ def test_str(self):
+ check = _checks.TrueCheck()
+
+ self.assertEqual(str(check), '@')
+
+ def test_call(self):
+ check = _checks.TrueCheck()
+
+ self.assertEqual(check('target', 'creds', None), True)
+
+
+class CheckForTest(_checks.Check):
+ def __call__(self, target, creds, enforcer):
+ pass
+
+
+class CheckTestCase(test_base.BaseTestCase):
+ def test_init(self):
+ check = CheckForTest('kind', 'match')
+
+ self.assertEqual(check.kind, 'kind')
+ self.assertEqual(check.match, 'match')
+
+ def test_str(self):
+ check = CheckForTest('kind', 'match')
+
+ self.assertEqual(str(check), 'kind:match')
+
+
+class NotCheckTestCase(test_base.BaseTestCase):
+ def test_init(self):
+ check = _checks.NotCheck('rule')
+
+ self.assertEqual(check.rule, 'rule')
+
+ def test_str(self):
+ check = _checks.NotCheck('rule')
+
+ self.assertEqual(str(check), 'not rule')
+
+ def test_call_true(self):
+ rule = mock.Mock(return_value=True)
+ check = _checks.NotCheck(rule)
+
+ self.assertEqual(check('target', 'cred', None), False)
+ rule.assert_called_once_with('target', 'cred', None)
+
+ def test_call_false(self):
+ rule = mock.Mock(return_value=False)
+ check = _checks.NotCheck(rule)
+
+ self.assertEqual(check('target', 'cred', None), True)
+ rule.assert_called_once_with('target', 'cred', None)
+
+
+class AndCheckTestCase(test_base.BaseTestCase):
+ def test_init(self):
+ check = _checks.AndCheck(['rule1', 'rule2'])
+
+ self.assertEqual(check.rules, ['rule1', 'rule2'])
+
+ def test_add_check(self):
+ check = _checks.AndCheck(['rule1', 'rule2'])
+ check.add_check('rule3')
+
+ self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
+
+ def test_str(self):
+ check = _checks.AndCheck(['rule1', 'rule2'])
+
+ self.assertEqual(str(check), '(rule1 and rule2)')
+
+ def test_call_all_false(self):
+ rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
+ check = _checks.AndCheck(rules)
+
+ self.assertEqual(check('target', 'cred', None), False)
+ rules[0].assert_called_once_with('target', 'cred', None)
+ self.assertFalse(rules[1].called)
+
+ def test_call_first_true(self):
+ rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
+ check = _checks.AndCheck(rules)
+
+ self.assertFalse(check('target', 'cred', None))
+ rules[0].assert_called_once_with('target', 'cred', None)
+ rules[1].assert_called_once_with('target', 'cred', None)
+
+ def test_call_second_true(self):
+ rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
+ check = _checks.AndCheck(rules)
+
+ self.assertFalse(check('target', 'cred', None))
+ rules[0].assert_called_once_with('target', 'cred', None)
+ self.assertFalse(rules[1].called)
+
+
+class OrCheckTestCase(test_base.BaseTestCase):
+ def test_init(self):
+ check = _checks.OrCheck(['rule1', 'rule2'])
+
+ self.assertEqual(check.rules, ['rule1', 'rule2'])
+
+ def test_add_check(self):
+ check = _checks.OrCheck(['rule1', 'rule2'])
+ check.add_check('rule3')
+
+ self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
+
+ def test_str(self):
+ check = _checks.OrCheck(['rule1', 'rule2'])
+
+ self.assertEqual(str(check), '(rule1 or rule2)')
+
+ def test_call_all_false(self):
+ rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
+ check = _checks.OrCheck(rules)
+
+ self.assertEqual(check('target', 'cred', None), False)
+ rules[0].assert_called_once_with('target', 'cred', None)
+ rules[1].assert_called_once_with('target', 'cred', None)
+
+ def test_call_first_true(self):
+ rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
+ check = _checks.OrCheck(rules)
+
+ self.assertEqual(check('target', 'cred', None), True)
+ rules[0].assert_called_once_with('target', 'cred', None)
+ self.assertFalse(rules[1].called)
+
+ def test_call_second_true(self):
+ rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
+ check = _checks.OrCheck(rules)
+
+ self.assertEqual(check('target', 'cred', None), True)
+ rules[0].assert_called_once_with('target', 'cred', None)
+ rules[1].assert_called_once_with('target', 'cred', None)
diff --git a/oslo_policy/tests/test_parser.py b/oslo_policy/tests/test_parser.py
new file mode 100644
index 0000000..bae3cb7
--- /dev/null
+++ b/oslo_policy/tests/test_parser.py
@@ -0,0 +1,403 @@
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+from oslotest import base as test_base
+import six
+
+from oslo_policy import _checks
+from oslo_policy import _parser
+from oslo_policy.tests import base
+
+
+class ParseCheckTestCase(test_base.BaseTestCase):
+ def test_false(self):
+ result = _parser._parse_check('!')
+
+ self.assertTrue(isinstance(result, _checks.FalseCheck))
+
+ def test_true(self):
+ result = _parser._parse_check('@')
+
+ self.assertTrue(isinstance(result, _checks.TrueCheck))
+
+ def test_bad_rule(self):
+ result = _parser._parse_check('foobar')
+
+ self.assertTrue(isinstance(result, _checks.FalseCheck))
+
+ @mock.patch.object(_checks, 'registered_checks', {})
+ def test_no_handler(self):
+ result = _parser._parse_check('no:handler')
+
+ self.assertTrue(isinstance(result, _checks.FalseCheck))
+
+ @mock.patch.object(_checks, 'registered_checks', {
+ 'spam': mock.Mock(return_value="spam_check"),
+ None: mock.Mock(return_value="none_check"),
+ })
+ def test_check(self):
+ result = _parser._parse_check('spam:handler')
+
+ self.assertEqual(result, 'spam_check')
+ _checks.registered_checks['spam'].assert_called_once_with('spam',
+ 'handler')
+ self.assertFalse(_checks.registered_checks[None].called)
+
+ @mock.patch.object(_checks, 'registered_checks', {
+ None: mock.Mock(return_value="none_check"),
+ })
+ def test_check_default(self):
+ result = _parser._parse_check('spam:handler')
+
+ self.assertEqual(result, 'none_check')
+ _checks.registered_checks[None].assert_called_once_with('spam',
+ 'handler')
+
+
+class ParseListRuleTestCase(test_base.BaseTestCase):
+ def test_empty(self):
+ result = _parser._parse_list_rule([])
+
+ self.assertTrue(isinstance(result, _checks.TrueCheck))
+ self.assertEqual(str(result), '@')
+
+ @mock.patch.object(_parser, '_parse_check', base.FakeCheck)
+ def test_oneele_zeroele(self):
+ result = _parser._parse_list_rule([[]])
+
+ self.assertTrue(isinstance(result, _checks.FalseCheck))
+ self.assertEqual(str(result), '!')
+
+ @mock.patch.object(_parser, '_parse_check', base.FakeCheck)
+ def test_oneele_bare(self):
+ result = _parser._parse_list_rule(['rule'])
+
+ self.assertTrue(isinstance(result, base.FakeCheck))
+ self.assertEqual(result.result, 'rule')
+ self.assertEqual(str(result), 'rule')
+
+ @mock.patch.object(_parser, '_parse_check', base.FakeCheck)
+ def test_oneele_oneele(self):
+ result = _parser._parse_list_rule([['rule']])
+
+ self.assertTrue(isinstance(result, base.FakeCheck))
+ self.assertEqual(result.result, 'rule')
+ self.assertEqual(str(result), 'rule')
+
+ @mock.patch.object(_parser, '_parse_check', base.FakeCheck)
+ def test_oneele_multi(self):
+ result = _parser._parse_list_rule([['rule1', 'rule2']])
+
+ self.assertTrue(isinstance(result, _checks.AndCheck))
+ self.assertEqual(len(result.rules), 2)
+ for i, value in enumerate(['rule1', 'rule2']):
+ self.assertTrue(isinstance(result.rules[i], base.FakeCheck))
+ self.assertEqual(result.rules[i].result, value)
+ self.assertEqual(str(result), '(rule1 and rule2)')
+
+ @mock.patch.object(_parser, '_parse_check', base.FakeCheck)
+ def test_multi_oneele(self):
+ result = _parser._parse_list_rule([['rule1'], ['rule2']])
+
+ self.assertTrue(isinstance(result, _checks.OrCheck))
+ self.assertEqual(len(result.rules), 2)
+ for i, value in enumerate(['rule1', 'rule2']):
+ self.assertTrue(isinstance(result.rules[i], base.FakeCheck))
+ self.assertEqual(result.rules[i].result, value)
+ self.assertEqual(str(result), '(rule1 or rule2)')
+
+ @mock.patch.object(_parser, '_parse_check', base.FakeCheck)
+ def test_multi_multi(self):
+ result = _parser._parse_list_rule([['rule1', 'rule2'],
+ ['rule3', 'rule4']])
+
+ self.assertTrue(isinstance(result, _checks.OrCheck))
+ self.assertEqual(len(result.rules), 2)
+ for i, values in enumerate([['rule1', 'rule2'], ['rule3', 'rule4']]):
+ self.assertTrue(isinstance(result.rules[i], _checks.AndCheck))
+ self.assertEqual(len(result.rules[i].rules), 2)
+ for j, value in enumerate(values):
+ self.assertTrue(isinstance(result.rules[i].rules[j],
+ base.FakeCheck))
+ self.assertEqual(result.rules[i].rules[j].result, value)
+ self.assertEqual(str(result),
+ '((rule1 and rule2) or (rule3 and rule4))')
+
+
+class ParseTokenizeTestCase(test_base.BaseTestCase):
+ @mock.patch.object(_parser, '_parse_check', lambda x: x)
+ def test_tokenize(self):
+ exemplar = ("(( ( ((() And)) or ) (check:%(miss)s) not)) "
+ "'a-string' \"another-string\"")
+ expected = [
+ ('(', '('), ('(', '('), ('(', '('), ('(', '('), ('(', '('),
+ ('(', '('), (')', ')'), ('and', 'And'),
+ (')', ')'), (')', ')'), ('or', 'or'), (')', ')'), ('(', '('),
+ ('check', 'check:%(miss)s'), (')', ')'), ('not', 'not'),
+ (')', ')'), (')', ')'),
+ ('string', 'a-string'),
+ ('string', 'another-string'),
+ ]
+
+ result = list(_parser._parse_tokenize(exemplar))
+
+ self.assertEqual(result, expected)
+
+
+class ParseStateMetaTestCase(test_base.BaseTestCase):
+ def test_reducer(self):
+ @_parser.reducer('a', 'b', 'c')
+ @_parser.reducer('d', 'e', 'f')
+ def spam():
+ pass
+
+ self.assertTrue(hasattr(spam, 'reducers'))
+ self.assertEqual(spam.reducers, [['d', 'e', 'f'], ['a', 'b', 'c']])
+
+ def test_parse_state_meta(self):
+ @six.add_metaclass(_parser.ParseStateMeta)
+ class FakeState(object):
+
+ @_parser.reducer('a', 'b', 'c')
+ @_parser.reducer('d', 'e', 'f')
+ def reduce1(self):
+ pass
+
+ @_parser.reducer('g', 'h', 'i')
+ def reduce2(self):
+ pass
+
+ self.assertTrue(hasattr(FakeState, 'reducers'))
+ for reduction, reducer in FakeState.reducers:
+ if (reduction == ['a', 'b', 'c'] or
+ reduction == ['d', 'e', 'f']):
+ self.assertEqual(reducer, 'reduce1')
+ elif reduction == ['g', 'h', 'i']:
+ self.assertEqual(reducer, 'reduce2')
+ else:
+ self.fail("Unrecognized reducer discovered")
+
+
+class ParseStateTestCase(test_base.BaseTestCase):
+ def test_init(self):
+ state = _parser.ParseState()
+
+ self.assertEqual(state.tokens, [])
+ self.assertEqual(state.values, [])
+
+ @mock.patch.object(_parser.ParseState, 'reducers', [(['tok1'], 'meth')])
+ @mock.patch.object(_parser.ParseState, 'meth', create=True)
+ def test_reduce_none(self, mock_meth):
+ state = _parser.ParseState()
+ state.tokens = ['tok2']
+ state.values = ['val2']
+
+ state.reduce()
+
+ self.assertEqual(state.tokens, ['tok2'])
+ self.assertEqual(state.values, ['val2'])
+ self.assertFalse(mock_meth.called)
+
+ @mock.patch.object(_parser.ParseState, 'reducers',
+ [(['tok1', 'tok2'], 'meth')])
+ @mock.patch.object(_parser.ParseState, 'meth', create=True)
+ def test_reduce_short(self, mock_meth):
+ state = _parser.ParseState()
+ state.tokens = ['tok1']
+ state.values = ['val1']
+
+ state.reduce()
+
+ self.assertEqual(state.tokens, ['tok1'])
+ self.assertEqual(state.values, ['val1'])
+ self.assertFalse(mock_meth.called)
+
+ @mock.patch.object(_parser.ParseState, 'reducers',
+ [(['tok1', 'tok2'], 'meth')])
+ @mock.patch.object(_parser.ParseState, 'meth', create=True,
+ return_value=[('tok3', 'val3')])
+ def test_reduce_one(self, mock_meth):
+ state = _parser.ParseState()
+ state.tokens = ['tok1', 'tok2']
+ state.values = ['val1', 'val2']
+
+ state.reduce()
+
+ self.assertEqual(state.tokens, ['tok3'])
+ self.assertEqual(state.values, ['val3'])
+ mock_meth.assert_called_once_with('val1', 'val2')
+
+ @mock.patch.object(_parser.ParseState, 'reducers', [
+ (['tok1', 'tok4'], 'meth2'),
+ (['tok2', 'tok3'], 'meth1'),
+ ])
+ @mock.patch.object(_parser.ParseState, 'meth1', create=True,
+ return_value=[('tok4', 'val4')])
+ @mock.patch.object(_parser.ParseState, 'meth2', create=True,
+ return_value=[('tok5', 'val5')])
+ def test_reduce_two(self, mock_meth2, mock_meth1):
+ state = _parser.ParseState()
+ state.tokens = ['tok1', 'tok2', 'tok3']
+ state.values = ['val1', 'val2', 'val3']
+
+ state.reduce()
+
+ self.assertEqual(state.tokens, ['tok5'])
+ self.assertEqual(state.values, ['val5'])
+ mock_meth1.assert_called_once_with('val2', 'val3')
+ mock_meth2.assert_called_once_with('val1', 'val4')
+
+ @mock.patch.object(_parser.ParseState, 'reducers',
+ [(['tok1', 'tok2'], 'meth')])
+ @mock.patch.object(_parser.ParseState, 'meth', create=True,
+ return_value=[('tok3', 'val3'), ('tok4', 'val4')])
+ def test_reduce_multi(self, mock_meth):
+ state = _parser.ParseState()
+ state.tokens = ['tok1', 'tok2']
+ state.values = ['val1', 'val2']
+
+ state.reduce()
+
+ self.assertEqual(state.tokens, ['tok3', 'tok4'])
+ self.assertEqual(state.values, ['val3', 'val4'])
+ mock_meth.assert_called_once_with('val1', 'val2')
+
+ def test_shift(self):
+ state = _parser.ParseState()
+
+ with mock.patch.object(_parser.ParseState, 'reduce') as mock_reduce:
+ state.shift('token', 'value')
+
+ self.assertEqual(state.tokens, ['token'])
+ self.assertEqual(state.values, ['value'])
+ mock_reduce.assert_called_once_with()
+
+ def test_result_empty(self):
+ state = _parser.ParseState()
+
+ self.assertRaises(ValueError, lambda: state.result)
+
+ def test_result_unreduced(self):
+ state = _parser.ParseState()
+ state.tokens = ['tok1', 'tok2']
+ state.values = ['val1', 'val2']
+
+ self.assertRaises(ValueError, lambda: state.result)
+
+ def test_result(self):
+ state = _parser.ParseState()
+ state.tokens = ['token']
+ state.values = ['value']
+
+ self.assertEqual(state.result, 'value')
+
+ def test_wrap_check(self):
+ state = _parser.ParseState()
+
+ result = state._wrap_check('(', 'the_check', ')')
+
+ self.assertEqual(result, [('check', 'the_check')])
+
+ @mock.patch.object(_checks, 'AndCheck', lambda x: x)
+ def test_make_and_expr(self):
+ state = _parser.ParseState()
+
+ result = state._make_and_expr('check1', 'and', 'check2')
+
+ self.assertEqual(result, [('and_expr', ['check1', 'check2'])])
+
+ def test_extend_and_expr(self):
+ state = _parser.ParseState()
+ mock_expr = mock.Mock()
+ mock_expr.add_check.return_value = 'newcheck'
+
+ result = state._extend_and_expr(mock_expr, 'and', 'check')
+
+ self.assertEqual(result, [('and_expr', 'newcheck')])
+ mock_expr.add_check.assert_called_once_with('check')
+
+ @mock.patch.object(_checks, 'OrCheck', lambda x: x)
+ def test_make_or_expr(self):
+ state = _parser.ParseState()
+
+ result = state._make_or_expr('check1', 'or', 'check2')
+
+ self.assertEqual(result, [('or_expr', ['check1', 'check2'])])
+
+ def test_extend_or_expr(self):
+ state = _parser.ParseState()
+ mock_expr = mock.Mock()
+ mock_expr.add_check.return_value = 'newcheck'
+
+ result = state._extend_or_expr(mock_expr, 'or', 'check')
+
+ self.assertEqual(result, [('or_expr', 'newcheck')])
+ mock_expr.add_check.assert_called_once_with('check')
+
+ @mock.patch.object(_checks, 'NotCheck', lambda x: 'not %s' % x)
+ def test_make_not_expr(self):
+ state = _parser.ParseState()
+
+ result = state._make_not_expr('not', 'check')
+
+ self.assertEqual(result, [('check', 'not check')])
+
+
+class ParseTextRuleTestCase(test_base.BaseTestCase):
+ def test_empty(self):
+ result = _parser._parse_text_rule('')
+
+ self.assertTrue(isinstance(result, _checks.TrueCheck))
+
+ @mock.patch.object(_parser, '_parse_tokenize',
+ return_value=[('tok1', 'val1'), ('tok2', 'val2')])
+ @mock.patch.object(_parser.ParseState, 'shift')
+ @mock.patch.object(_parser.ParseState, 'result', 'result')
+ def test_shifts(self, mock_shift, mock_parse_tokenize):
+ result = _parser._parse_text_rule('test rule')
+
+ self.assertEqual(result, 'result')
+ mock_parse_tokenize.assert_called_once_with('test rule')
+ mock_shift.assert_has_calls(
+ [mock.call('tok1', 'val1'), mock.call('tok2', 'val2')])
+
+ @mock.patch.object(_parser, '_parse_tokenize', return_value=[])
+ def test_fail(self, mock_parse_tokenize):
+ result = _parser._parse_text_rule('test rule')
+
+ self.assertTrue(isinstance(result, _checks.FalseCheck))
+ mock_parse_tokenize.assert_called_once_with('test rule')
+
+
+class ParseRuleTestCase(test_base.BaseTestCase):
+ @mock.patch.object(_parser, '_parse_text_rule', return_value='text rule')
+ @mock.patch.object(_parser, '_parse_list_rule', return_value='list rule')
+ def test_parse_rule_string(self, mock_parse_list_rule,
+ mock_parse_text_rule):
+ result = _parser.parse_rule("a string")
+
+ self.assertEqual(result, 'text rule')
+ self.assertFalse(mock_parse_list_rule.called)
+ mock_parse_text_rule.assert_called_once_with('a string')
+
+ @mock.patch.object(_parser, '_parse_text_rule', return_value='text rule')
+ @mock.patch.object(_parser, '_parse_list_rule', return_value='list rule')
+ def test_parse_rule_list(self, mock_parse_list_rule, mock_parse_text_rule):
+ result = _parser.parse_rule([['a'], ['list']])
+
+ self.assertEqual(result, 'list rule')
+ self.assertFalse(mock_parse_text_rule.called)
+ mock_parse_list_rule.assert_called_once_with([['a'], ['list']])
diff --git a/oslo_policy/tests/test_policy.py b/oslo_policy/tests/test_policy.py
index 145a957..aed785c 100644
--- a/oslo_policy/tests/test_policy.py
+++ b/oslo_policy/tests/test_policy.py
@@ -15,26 +15,16 @@
"""Test of Policy Engine"""
-import os
-
import mock
-from oslo_concurrency.fixture import lockutils
from oslo_config import cfg
-from oslo_config import fixture as config
from oslo_serialization import jsonutils
from oslotest import base as test_base
-import six
-import six.moves.urllib.parse as urlparse
-import six.moves.urllib.request as urlrequest
+from oslo_policy import _checks
+from oslo_policy import _parser
from oslo_policy.openstack.common import fileutils
from oslo_policy import policy
-
-
-TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
- '..', 'tests/var'))
-
-ENFORCER = policy.Enforcer(cfg.CONF)
+from oslo_policy.tests import base
class MyException(Exception):
@@ -79,7 +69,7 @@ class RulesTestCase(test_base.BaseTestCase):
self.assertEqual(rules['b'], 2)
self.assertEqual(rules['c'], 3)
- @mock.patch.object(policy, '_parse_rule', lambda x: x)
+ @mock.patch.object(_parser, 'parse_rule', lambda x: x)
def test_load_json(self):
exemplar = """{
"admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
@@ -108,26 +98,13 @@ class RulesTestCase(test_base.BaseTestCase):
"admin_or_owner": ""
}"""
rules = policy.Rules(dict(
- admin_or_owner=policy.TrueCheck(),
+ admin_or_owner=_checks.TrueCheck(),
))
self.assertEqual(str(rules), exemplar)
-class PolicyBaseTestCase(test_base.BaseTestCase):
-
- def setUp(self):
- super(PolicyBaseTestCase, self).setUp()
- # NOTE(bnemec): Many of these tests use the same ENFORCER object, so
- # I believe we need to serialize them.
- self.useFixture(lockutils.LockFixture('policy-lock'))
- self.CONF = self.useFixture(config.Config()).conf
- self.CONF(args=['--config-dir', TEST_VAR_DIR])
- self.enforcer = ENFORCER
- self.addCleanup(self.enforcer.clear)
-
-
-class EnforcerTest(PolicyBaseTestCase):
+class EnforcerTest(base.PolicyBaseTestCase):
def test_load_file(self):
self.CONF.set_override('policy_dirs', [], group='oslo_policy')
@@ -195,7 +172,7 @@ class EnforcerTest(PolicyBaseTestCase):
"cloudwatch:PutMetricData": ""
}"""
rules = policy.Rules.load_json(rules_json)
- default_rule = policy.TrueCheck()
+ default_rule = _checks.TrueCheck()
enforcer = policy.Enforcer(cfg.CONF, default_rule=default_rule)
enforcer.set_rules(rules)
action = "cloudwatch:PutMetricData"
@@ -204,9 +181,9 @@ class EnforcerTest(PolicyBaseTestCase):
def test_enforcer_force_reload_with_overwrite(self):
# Prepare in memory fake policies.
- self.enforcer.set_rules({'test': policy._parse_rule('role:test')},
+ self.enforcer.set_rules({'test': _parser.parse_rule('role:test')},
use_conf=True)
- self.enforcer.set_rules({'default': policy._parse_rule('role:fakeZ')},
+ self.enforcer.set_rules({'default': _parser.parse_rule('role:fakeZ')},
overwrite=False, # Keeps 'test' role.
use_conf=True)
@@ -232,9 +209,9 @@ class EnforcerTest(PolicyBaseTestCase):
def test_enforcer_force_reload_without_overwrite(self):
# Prepare in memory fake policies.
- self.enforcer.set_rules({'test': policy._parse_rule('role:test')},
+ self.enforcer.set_rules({'test': _parser.parse_rule('role:test')},
use_conf=True)
- self.enforcer.set_rules({'default': policy._parse_rule('role:fakeZ')},
+ self.enforcer.set_rules({'default': _parser.parse_rule('role:fakeZ')},
overwrite=False, # Keeps 'test' role.
use_conf=True)
@@ -346,23 +323,10 @@ class EnforcerTest(PolicyBaseTestCase):
self.assertEqual('bar_rule', enforcer.rules.default_rule)
-class FakeCheck(policy.BaseCheck):
- def __init__(self, result=None):
- self.result = result
-
- def __str__(self):
- return str(self.result)
-
- def __call__(self, target, creds, enforcer):
- if self.result is not None:
- return self.result
- return (target, creds, enforcer)
-
-
-class CheckFunctionTestCase(PolicyBaseTestCase):
+class CheckFunctionTestCase(base.PolicyBaseTestCase):
def test_check_explicit(self):
- rule = FakeCheck()
+ rule = base.FakeCheck()
result = self.enforcer.enforce(rule, "target", "creds")
self.assertEqual(result, ("target", "creds", self.enforcer))
@@ -376,13 +340,13 @@ class CheckFunctionTestCase(PolicyBaseTestCase):
self.assertEqual(result, False)
def test_check_with_rule(self):
- self.enforcer.set_rules(dict(default=FakeCheck()))
+ self.enforcer.set_rules(dict(default=base.FakeCheck()))
result = self.enforcer.enforce("default", "target", "creds")
self.assertEqual(result, ("target", "creds", self.enforcer))
def test_check_raises(self):
- self.enforcer.set_rules(dict(default=policy.FalseCheck()))
+ self.enforcer.set_rules(dict(default=_checks.FalseCheck()))
try:
self.enforcer.enforce('rule', 'target', 'creds',
@@ -393,755 +357,3 @@ class CheckFunctionTestCase(PolicyBaseTestCase):
self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2"))
else:
self.fail("enforcer.enforce() failed to raise requested exception")
-
-
-class FalseCheckTestCase(test_base.BaseTestCase):
- def test_str(self):
- check = policy.FalseCheck()
-
- self.assertEqual(str(check), '!')
-
- def test_call(self):
- check = policy.FalseCheck()
-
- self.assertEqual(check('target', 'creds', None), False)
-
-
-class TrueCheckTestCase(test_base.BaseTestCase):
- def test_str(self):
- check = policy.TrueCheck()
-
- self.assertEqual(str(check), '@')
-
- def test_call(self):
- check = policy.TrueCheck()
-
- self.assertEqual(check('target', 'creds', None), True)
-
-
-class CheckForTest(policy.Check):
- def __call__(self, target, creds, enforcer):
- pass
-
-
-class CheckTestCase(test_base.BaseTestCase):
- def test_init(self):
- check = CheckForTest('kind', 'match')
-
- self.assertEqual(check.kind, 'kind')
- self.assertEqual(check.match, 'match')
-
- def test_str(self):
- check = CheckForTest('kind', 'match')
-
- self.assertEqual(str(check), 'kind:match')
-
-
-class NotCheckTestCase(test_base.BaseTestCase):
- def test_init(self):
- check = policy.NotCheck('rule')
-
- self.assertEqual(check.rule, 'rule')
-
- def test_str(self):
- check = policy.NotCheck('rule')
-
- self.assertEqual(str(check), 'not rule')
-
- def test_call_true(self):
- rule = mock.Mock(return_value=True)
- check = policy.NotCheck(rule)
-
- self.assertEqual(check('target', 'cred', None), False)
- rule.assert_called_once_with('target', 'cred', None)
-
- def test_call_false(self):
- rule = mock.Mock(return_value=False)
- check = policy.NotCheck(rule)
-
- self.assertEqual(check('target', 'cred', None), True)
- rule.assert_called_once_with('target', 'cred', None)
-
-
-class AndCheckTestCase(test_base.BaseTestCase):
- def test_init(self):
- check = policy.AndCheck(['rule1', 'rule2'])
-
- self.assertEqual(check.rules, ['rule1', 'rule2'])
-
- def test_add_check(self):
- check = policy.AndCheck(['rule1', 'rule2'])
- check.add_check('rule3')
-
- self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
-
- def test_str(self):
- check = policy.AndCheck(['rule1', 'rule2'])
-
- self.assertEqual(str(check), '(rule1 and rule2)')
-
- def test_call_all_false(self):
- rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
- check = policy.AndCheck(rules)
-
- self.assertEqual(check('target', 'cred', None), False)
- rules[0].assert_called_once_with('target', 'cred', None)
- self.assertFalse(rules[1].called)
-
- def test_call_first_true(self):
- rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
- check = policy.AndCheck(rules)
-
- self.assertFalse(check('target', 'cred', None))
- rules[0].assert_called_once_with('target', 'cred', None)
- rules[1].assert_called_once_with('target', 'cred', None)
-
- def test_call_second_true(self):
- rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
- check = policy.AndCheck(rules)
-
- self.assertFalse(check('target', 'cred', None))
- rules[0].assert_called_once_with('target', 'cred', None)
- self.assertFalse(rules[1].called)
-
-
-class OrCheckTestCase(test_base.BaseTestCase):
- def test_init(self):
- check = policy.OrCheck(['rule1', 'rule2'])
-
- self.assertEqual(check.rules, ['rule1', 'rule2'])
-
- def test_add_check(self):
- check = policy.OrCheck(['rule1', 'rule2'])
- check.add_check('rule3')
-
- self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
-
- def test_str(self):
- check = policy.OrCheck(['rule1', 'rule2'])
-
- self.assertEqual(str(check), '(rule1 or rule2)')
-
- def test_call_all_false(self):
- rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
- check = policy.OrCheck(rules)
-
- self.assertEqual(check('target', 'cred', None), False)
- rules[0].assert_called_once_with('target', 'cred', None)
- rules[1].assert_called_once_with('target', 'cred', None)
-
- def test_call_first_true(self):
- rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
- check = policy.OrCheck(rules)
-
- self.assertEqual(check('target', 'cred', None), True)
- rules[0].assert_called_once_with('target', 'cred', None)
- self.assertFalse(rules[1].called)
-
- def test_call_second_true(self):
- rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
- check = policy.OrCheck(rules)
-
- self.assertEqual(check('target', 'cred', None), True)
- rules[0].assert_called_once_with('target', 'cred', None)
- rules[1].assert_called_once_with('target', 'cred', None)
-
-
-class ParseCheckTestCase(test_base.BaseTestCase):
- def test_false(self):
- result = policy._parse_check('!')
-
- self.assertTrue(isinstance(result, policy.FalseCheck))
-
- def test_true(self):
- result = policy._parse_check('@')
-
- self.assertTrue(isinstance(result, policy.TrueCheck))
-
- def test_bad_rule(self):
- result = policy._parse_check('foobar')
-
- self.assertTrue(isinstance(result, policy.FalseCheck))
-
- @mock.patch.object(policy, '_checks', {})
- def test_no_handler(self):
- result = policy._parse_check('no:handler')
-
- self.assertTrue(isinstance(result, policy.FalseCheck))
-
- @mock.patch.object(policy, '_checks', {
- 'spam': mock.Mock(return_value="spam_check"),
- None: mock.Mock(return_value="none_check"),
- })
- def test_check(self):
- result = policy._parse_check('spam:handler')
-
- self.assertEqual(result, 'spam_check')
- policy._checks['spam'].assert_called_once_with('spam', 'handler')
- self.assertFalse(policy._checks[None].called)
-
- @mock.patch.object(policy, '_checks', {
- None: mock.Mock(return_value="none_check"),
- })
- def test_check_default(self):
- result = policy._parse_check('spam:handler')
-
- self.assertEqual(result, 'none_check')
- policy._checks[None].assert_called_once_with('spam', 'handler')
-
-
-class ParseListRuleTestCase(test_base.BaseTestCase):
- def test_empty(self):
- result = policy._parse_list_rule([])
-
- self.assertTrue(isinstance(result, policy.TrueCheck))
- self.assertEqual(str(result), '@')
-
- @mock.patch.object(policy, '_parse_check', FakeCheck)
- def test_oneele_zeroele(self):
- result = policy._parse_list_rule([[]])
-
- self.assertTrue(isinstance(result, policy.FalseCheck))
- self.assertEqual(str(result), '!')
-
- @mock.patch.object(policy, '_parse_check', FakeCheck)
- def test_oneele_bare(self):
- result = policy._parse_list_rule(['rule'])
-
- self.assertTrue(isinstance(result, FakeCheck))
- self.assertEqual(result.result, 'rule')
- self.assertEqual(str(result), 'rule')
-
- @mock.patch.object(policy, '_parse_check', FakeCheck)
- def test_oneele_oneele(self):
- result = policy._parse_list_rule([['rule']])
-
- self.assertTrue(isinstance(result, FakeCheck))
- self.assertEqual(result.result, 'rule')
- self.assertEqual(str(result), 'rule')
-
- @mock.patch.object(policy, '_parse_check', FakeCheck)
- def test_oneele_multi(self):
- result = policy._parse_list_rule([['rule1', 'rule2']])
-
- self.assertTrue(isinstance(result, policy.AndCheck))
- self.assertEqual(len(result.rules), 2)
- for i, value in enumerate(['rule1', 'rule2']):
- self.assertTrue(isinstance(result.rules[i], FakeCheck))
- self.assertEqual(result.rules[i].result, value)
- self.assertEqual(str(result), '(rule1 and rule2)')
-
- @mock.patch.object(policy, '_parse_check', FakeCheck)
- def test_multi_oneele(self):
- result = policy._parse_list_rule([['rule1'], ['rule2']])
-
- self.assertTrue(isinstance(result, policy.OrCheck))
- self.assertEqual(len(result.rules), 2)
- for i, value in enumerate(['rule1', 'rule2']):
- self.assertTrue(isinstance(result.rules[i], FakeCheck))
- self.assertEqual(result.rules[i].result, value)
- self.assertEqual(str(result), '(rule1 or rule2)')
-
- @mock.patch.object(policy, '_parse_check', FakeCheck)
- def test_multi_multi(self):
- result = policy._parse_list_rule([['rule1', 'rule2'],
- ['rule3', 'rule4']])
-
- self.assertTrue(isinstance(result, policy.OrCheck))
- self.assertEqual(len(result.rules), 2)
- for i, values in enumerate([['rule1', 'rule2'], ['rule3', 'rule4']]):
- self.assertTrue(isinstance(result.rules[i], policy.AndCheck))
- self.assertEqual(len(result.rules[i].rules), 2)
- for j, value in enumerate(values):
- self.assertTrue(isinstance(result.rules[i].rules[j],
- FakeCheck))
- self.assertEqual(result.rules[i].rules[j].result, value)
- self.assertEqual(str(result),
- '((rule1 and rule2) or (rule3 and rule4))')
-
-
-class ParseTokenizeTestCase(test_base.BaseTestCase):
- @mock.patch.object(policy, '_parse_check', lambda x: x)
- def test_tokenize(self):
- exemplar = ("(( ( ((() And)) or ) (check:%(miss)s) not)) "
- "'a-string' \"another-string\"")
- expected = [
- ('(', '('), ('(', '('), ('(', '('), ('(', '('), ('(', '('),
- ('(', '('), (')', ')'), ('and', 'And'),
- (')', ')'), (')', ')'), ('or', 'or'), (')', ')'), ('(', '('),
- ('check', 'check:%(miss)s'), (')', ')'), ('not', 'not'),
- (')', ')'), (')', ')'),
- ('string', 'a-string'),
- ('string', 'another-string'),
- ]
-
- result = list(policy._parse_tokenize(exemplar))
-
- self.assertEqual(result, expected)
-
-
-class ParseStateMetaTestCase(test_base.BaseTestCase):
- def test_reducer(self):
- @policy.reducer('a', 'b', 'c')
- @policy.reducer('d', 'e', 'f')
- def spam():
- pass
-
- self.assertTrue(hasattr(spam, 'reducers'))
- self.assertEqual(spam.reducers, [['d', 'e', 'f'], ['a', 'b', 'c']])
-
- def test_parse_state_meta(self):
- @six.add_metaclass(policy._ParseStateMeta)
- class FakeState(object):
-
- @policy.reducer('a', 'b', 'c')
- @policy.reducer('d', 'e', 'f')
- def reduce1(self):
- pass
-
- @policy.reducer('g', 'h', 'i')
- def reduce2(self):
- pass
-
- self.assertTrue(hasattr(FakeState, 'reducers'))
- for reduction, reducer in FakeState.reducers:
- if (reduction == ['a', 'b', 'c'] or
- reduction == ['d', 'e', 'f']):
- self.assertEqual(reducer, 'reduce1')
- elif reduction == ['g', 'h', 'i']:
- self.assertEqual(reducer, 'reduce2')
- else:
- self.fail("Unrecognized reducer discovered")
-
-
-class ParseStateTestCase(test_base.BaseTestCase):
- def test_init(self):
- state = policy._ParseState()
-
- self.assertEqual(state.tokens, [])
- self.assertEqual(state.values, [])
-
- @mock.patch.object(policy._ParseState, 'reducers', [(['tok1'], 'meth')])
- @mock.patch.object(policy._ParseState, 'meth', create=True)
- def test_reduce_none(self, mock_meth):
- state = policy._ParseState()
- state.tokens = ['tok2']
- state.values = ['val2']
-
- state.reduce()
-
- self.assertEqual(state.tokens, ['tok2'])
- self.assertEqual(state.values, ['val2'])
- self.assertFalse(mock_meth.called)
-
- @mock.patch.object(policy._ParseState, 'reducers',
- [(['tok1', 'tok2'], 'meth')])
- @mock.patch.object(policy._ParseState, 'meth', create=True)
- def test_reduce_short(self, mock_meth):
- state = policy._ParseState()
- state.tokens = ['tok1']
- state.values = ['val1']
-
- state.reduce()
-
- self.assertEqual(state.tokens, ['tok1'])
- self.assertEqual(state.values, ['val1'])
- self.assertFalse(mock_meth.called)
-
- @mock.patch.object(policy._ParseState, 'reducers',
- [(['tok1', 'tok2'], 'meth')])
- @mock.patch.object(policy._ParseState, 'meth', create=True,
- return_value=[('tok3', 'val3')])
- def test_reduce_one(self, mock_meth):
- state = policy._ParseState()
- state.tokens = ['tok1', 'tok2']
- state.values = ['val1', 'val2']
-
- state.reduce()
-
- self.assertEqual(state.tokens, ['tok3'])
- self.assertEqual(state.values, ['val3'])
- mock_meth.assert_called_once_with('val1', 'val2')
-
- @mock.patch.object(policy._ParseState, 'reducers', [
- (['tok1', 'tok4'], 'meth2'),
- (['tok2', 'tok3'], 'meth1'),
- ])
- @mock.patch.object(policy._ParseState, 'meth1', create=True,
- return_value=[('tok4', 'val4')])
- @mock.patch.object(policy._ParseState, 'meth2', create=True,
- return_value=[('tok5', 'val5')])
- def test_reduce_two(self, mock_meth2, mock_meth1):
- state = policy._ParseState()
- state.tokens = ['tok1', 'tok2', 'tok3']
- state.values = ['val1', 'val2', 'val3']
-
- state.reduce()
-
- self.assertEqual(state.tokens, ['tok5'])
- self.assertEqual(state.values, ['val5'])
- mock_meth1.assert_called_once_with('val2', 'val3')
- mock_meth2.assert_called_once_with('val1', 'val4')
-
- @mock.patch.object(policy._ParseState, 'reducers',
- [(['tok1', 'tok2'], 'meth')])
- @mock.patch.object(policy._ParseState, 'meth', create=True,
- return_value=[('tok3', 'val3'), ('tok4', 'val4')])
- def test_reduce_multi(self, mock_meth):
- state = policy._ParseState()
- state.tokens = ['tok1', 'tok2']
- state.values = ['val1', 'val2']
-
- state.reduce()
-
- self.assertEqual(state.tokens, ['tok3', 'tok4'])
- self.assertEqual(state.values, ['val3', 'val4'])
- mock_meth.assert_called_once_with('val1', 'val2')
-
- def test_shift(self):
- state = policy._ParseState()
-
- with mock.patch.object(policy._ParseState, 'reduce') as mock_reduce:
- state.shift('token', 'value')
-
- self.assertEqual(state.tokens, ['token'])
- self.assertEqual(state.values, ['value'])
- mock_reduce.assert_called_once_with()
-
- def test_result_empty(self):
- state = policy._ParseState()
-
- self.assertRaises(ValueError, lambda: state.result)
-
- def test_result_unreduced(self):
- state = policy._ParseState()
- state.tokens = ['tok1', 'tok2']
- state.values = ['val1', 'val2']
-
- self.assertRaises(ValueError, lambda: state.result)
-
- def test_result(self):
- state = policy._ParseState()
- state.tokens = ['token']
- state.values = ['value']
-
- self.assertEqual(state.result, 'value')
-
- def test_wrap_check(self):
- state = policy._ParseState()
-
- result = state._wrap_check('(', 'the_check', ')')
-
- self.assertEqual(result, [('check', 'the_check')])
-
- @mock.patch.object(policy, 'AndCheck', lambda x: x)
- def test_make_and_expr(self):
- state = policy._ParseState()
-
- result = state._make_and_expr('check1', 'and', 'check2')
-
- self.assertEqual(result, [('and_expr', ['check1', 'check2'])])
-
- def test_extend_and_expr(self):
- state = policy._ParseState()
- mock_expr = mock.Mock()
- mock_expr.add_check.return_value = 'newcheck'
-
- result = state._extend_and_expr(mock_expr, 'and', 'check')
-
- self.assertEqual(result, [('and_expr', 'newcheck')])
- mock_expr.add_check.assert_called_once_with('check')
-
- @mock.patch.object(policy, 'OrCheck', lambda x: x)
- def test_make_or_expr(self):
- state = policy._ParseState()
-
- result = state._make_or_expr('check1', 'or', 'check2')
-
- self.assertEqual(result, [('or_expr', ['check1', 'check2'])])
-
- def test_extend_or_expr(self):
- state = policy._ParseState()
- mock_expr = mock.Mock()
- mock_expr.add_check.return_value = 'newcheck'
-
- result = state._extend_or_expr(mock_expr, 'or', 'check')
-
- self.assertEqual(result, [('or_expr', 'newcheck')])
- mock_expr.add_check.assert_called_once_with('check')
-
- @mock.patch.object(policy, 'NotCheck', lambda x: 'not %s' % x)
- def test_make_not_expr(self):
- state = policy._ParseState()
-
- result = state._make_not_expr('not', 'check')
-
- self.assertEqual(result, [('check', 'not check')])
-
-
-class ParseTextRuleTestCase(test_base.BaseTestCase):
- def test_empty(self):
- result = policy._parse_text_rule('')
-
- self.assertTrue(isinstance(result, policy.TrueCheck))
-
- @mock.patch.object(policy, '_parse_tokenize',
- return_value=[('tok1', 'val1'), ('tok2', 'val2')])
- @mock.patch.object(policy._ParseState, 'shift')
- @mock.patch.object(policy._ParseState, 'result', 'result')
- def test_shifts(self, mock_shift, mock_parse_tokenize):
- result = policy._parse_text_rule('test rule')
-
- self.assertEqual(result, 'result')
- mock_parse_tokenize.assert_called_once_with('test rule')
- mock_shift.assert_has_calls(
- [mock.call('tok1', 'val1'), mock.call('tok2', 'val2')])
-
- @mock.patch.object(policy, '_parse_tokenize', return_value=[])
- def test_fail(self, mock_parse_tokenize):
- result = policy._parse_text_rule('test rule')
-
- self.assertTrue(isinstance(result, policy.FalseCheck))
- mock_parse_tokenize.assert_called_once_with('test rule')
-
-
-class ParseRuleTestCase(test_base.BaseTestCase):
- @mock.patch.object(policy, '_parse_text_rule', return_value='text rule')
- @mock.patch.object(policy, '_parse_list_rule', return_value='list rule')
- def test_parse_rule_string(self, mock_parse_list_rule,
- mock_parse_text_rule):
- result = policy._parse_rule("a string")
-
- self.assertEqual(result, 'text rule')
- self.assertFalse(mock_parse_list_rule.called)
- mock_parse_text_rule.assert_called_once_with('a string')
-
- @mock.patch.object(policy, '_parse_text_rule', return_value='text rule')
- @mock.patch.object(policy, '_parse_list_rule', return_value='list rule')
- def test_parse_rule_list(self, mock_parse_list_rule, mock_parse_text_rule):
- result = policy._parse_rule([['a'], ['list']])
-
- self.assertEqual(result, 'list rule')
- self.assertFalse(mock_parse_text_rule.called)
- mock_parse_list_rule.assert_called_once_with([['a'], ['list']])
-
-
-class CheckRegisterTestCase(test_base.BaseTestCase):
- @mock.patch.object(policy, '_checks', {})
- def test_register_check(self):
- class TestCheck(policy.Check):
- pass
-
- policy.register('spam', TestCheck)
-
- self.assertEqual(policy._checks, dict(spam=TestCheck))
-
- @mock.patch.object(policy, '_checks', {})
- def test_register_check_decorator(self):
- @policy.register('spam')
- class TestCheck(policy.Check):
- pass
-
- self.assertEqual(policy._checks, dict(spam=TestCheck))
-
-
-class RuleCheckTestCase(test_base.BaseTestCase):
- @mock.patch.object(ENFORCER, 'rules', {})
- def test_rule_missing(self):
- check = policy.RuleCheck('rule', 'spam')
-
- self.assertEqual(check('target', 'creds', ENFORCER), False)
-
- @mock.patch.object(ENFORCER, 'rules',
- dict(spam=mock.Mock(return_value=False)))
- def test_rule_false(self):
- enforcer = ENFORCER
-
- check = policy.RuleCheck('rule', 'spam')
-
- self.assertEqual(check('target', 'creds', enforcer), False)
- enforcer.rules['spam'].assert_called_once_with('target', 'creds',
- enforcer)
-
- @mock.patch.object(ENFORCER, 'rules',
- dict(spam=mock.Mock(return_value=True)))
- def test_rule_true(self):
- enforcer = ENFORCER
- check = policy.RuleCheck('rule', 'spam')
-
- self.assertEqual(check('target', 'creds', enforcer), True)
- enforcer.rules['spam'].assert_called_once_with('target', 'creds',
- enforcer)
-
-
-class RoleCheckTestCase(PolicyBaseTestCase):
- def test_accept(self):
- check = policy.RoleCheck('role', 'sPaM')
-
- self.assertEqual(check('target', dict(roles=['SpAm']),
- self.enforcer), True)
-
- def test_reject(self):
- check = policy.RoleCheck('role', 'spam')
-
- self.assertEqual(check('target', dict(roles=[]), self.enforcer), False)
-
-
-class HttpCheckTestCase(PolicyBaseTestCase):
- def decode_post_data(self, post_data):
- result = {}
- for item in post_data.split('&'):
- key, _sep, value = item.partition('=')
- result[key] = jsonutils.loads(urlparse.unquote_plus(value))
-
- return result
-
- @mock.patch.object(urlrequest, 'urlopen',
- return_value=six.StringIO('True'))
- def test_accept(self, mock_urlopen):
- check = policy.HttpCheck('http', '//example.com/%(name)s')
- self.assertEqual(check(dict(name='target', spam='spammer'),
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer),
- True)
- self.assertEqual(mock_urlopen.call_count, 1)
-
- args = mock_urlopen.call_args[0]
-
- self.assertEqual(args[0], 'http://example.com/target')
- self.assertEqual(self.decode_post_data(args[1]), dict(
- target=dict(name='target', spam='spammer'),
- credentials=dict(user='user', roles=['a', 'b', 'c']),
- ))
-
- @mock.patch.object(urlrequest, 'urlopen',
- return_value=six.StringIO('other'))
- def test_reject(self, mock_urlopen):
- check = policy.HttpCheck('http', '//example.com/%(name)s')
-
- self.assertEqual(check(dict(name='target', spam='spammer'),
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer),
- False)
- self.assertEqual(mock_urlopen.call_count, 1)
-
- args = mock_urlopen.call_args[0]
-
- self.assertEqual(args[0], 'http://example.com/target')
- self.assertEqual(self.decode_post_data(args[1]), dict(
- target=dict(name='target', spam='spammer'),
- credentials=dict(user='user', roles=['a', 'b', 'c']),
- ))
-
- @mock.patch.object(urlrequest, 'urlopen',
- return_value=six.StringIO('True'))
- def test_http_with_objects_in_target(self, mock_urlopen):
-
- check = policy.HttpCheck('http', '//example.com/%(name)s')
- target = {'a': object(),
- 'name': 'target',
- 'b': 'test data'}
- self.assertEqual(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer),
- True)
-
- @mock.patch.object(urlrequest, 'urlopen',
- return_value=six.StringIO('True'))
- def test_http_with_strings_in_target(self, mock_urlopen):
- check = policy.HttpCheck('http', '//example.com/%(name)s')
- target = {'a': 'some_string',
- 'name': 'target',
- 'b': 'test data'}
- self.assertEqual(check(target,
- dict(user='user', roles=['a', 'b', 'c']),
- self.enforcer),
- True)
-
-
-class GenericCheckTestCase(PolicyBaseTestCase):
- def test_no_cred(self):
- check = policy.GenericCheck('name', '%(name)s')
-
- self.assertEqual(check(dict(name='spam'), {}, self.enforcer), False)
-
- def test_cred_mismatch(self):
- check = policy.GenericCheck('name', '%(name)s')
-
- self.assertEqual(check(dict(name='spam'),
- dict(name='ham'),
- self.enforcer), False)
-
- def test_accept(self):
- check = policy.GenericCheck('name', '%(name)s')
-
- self.assertEqual(check(dict(name='spam'),
- dict(name='spam'),
- self.enforcer), True)
-
- def test_no_key_match_in_target(self):
- check = policy.GenericCheck('name', '%(name)s')
-
- self.assertEqual(check(dict(name1='spam'),
- dict(name='spam'),
- self.enforcer), False)
-
- def test_constant_string_mismatch(self):
- check = policy.GenericCheck("'spam'", '%(name)s')
-
- self.assertEqual(check(dict(name='ham'),
- {},
- self.enforcer), False)
-
- def test_constant_string_accept(self):
- check = policy.GenericCheck("'spam'", '%(name)s')
-
- self.assertEqual(check(dict(name='spam'),
- {},
- self.enforcer), True)
-
- def test_constant_literal_mismatch(self):
- check = policy.GenericCheck("True", '%(enabled)s')
-
- self.assertEqual(check(dict(enabled=False),
- {},
- self.enforcer), False)
-
- def test_constant_literal_accept(self):
- check = policy.GenericCheck("True", '%(enabled)s')
-
- self.assertEqual(check(dict(enabled=True),
- {},
- self.enforcer), True)
-
- def test_deep_credentials_dictionary_lookup(self):
- check = policy.GenericCheck("a.b.c.d", 'APPLES')
-
- credentials = {'a': {'b': {'c': {'d': 'APPLES'}}}}
-
- self.assertEqual(check({},
- credentials,
- self.enforcer), True)
-
- def test_missing_credentials_dictionary_lookup(self):
- credentials = {'a': 'APPLES', 'o': {'t': 'ORANGES'}}
-
- # First a valid check - rest of case is expecting failures
- # Should prove the basic credentials structure before we test
- # for failure cases.
- check = policy.GenericCheck("o.t", 'ORANGES')
- self.assertEqual(check({},
- credentials,
- self.enforcer), True)
-
- # Case where final key is missing
- check = policy.GenericCheck("o.v", 'ORANGES')
- self.assertEqual(check({},
- credentials,
- self.enforcer), False)
-
- # Attempt to access key under a missing dictionary
- check = policy.GenericCheck("q.v", 'APPLES')
- self.assertEqual(check({},
- credentials,
- self.enforcer), False)