summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_policy/shell.py38
-rw-r--r--oslo_policy/tests/test_shell.py60
-rw-r--r--releasenotes/notes/Pass-target-dict-to-oslopolicy-checker-87185d40aec413ee.yaml6
3 files changed, 99 insertions, 5 deletions
diff --git a/oslo_policy/shell.py b/oslo_policy/shell.py
index 222ab6a..fe143fb 100644
--- a/oslo_policy/shell.py
+++ b/oslo_policy/shell.py
@@ -14,6 +14,7 @@
# limitations under the License.
import argparse
+import collections
import sys
from oslo_serialization import jsonutils
@@ -33,7 +34,25 @@ def _try_rule(key, rule, target, access_data, o):
print("exception: %s" % rule)
-def tool(policy_file, access_file, apply_rule, is_admin=False):
+def flatten(d, parent_key=''):
+ """Flatten a nested dictionary
+
+ Converts a dictionary with nested values to a single level flat
+ dictionary, with dotted notation for each key.
+
+ """
+ items = []
+ for k, v in d.items():
+ new_key = parent_key + '.' + k if parent_key else k
+ if isinstance(v, collections.MutableMapping):
+ items.extend(flatten(v, new_key).items())
+ else:
+ items.append((new_key, v))
+ return dict(items)
+
+
+def tool(policy_file, access_file, apply_rule, is_admin=False,
+ target_file=None):
access = access_file.read()
access_data = jsonutils.loads(access)['token']
access_data['roles'] = [role['name'] for role in access_data['roles']]
@@ -47,16 +66,20 @@ def tool(policy_file, access_file, apply_rule, is_admin=False):
o = Object()
o.rules = rules
- target = {"project_id": access_data['project_id']}
+ if target_file:
+ target = target_file.read()
+ target_data = flatten(jsonutils.loads(target))
+ else:
+ target_data = {"project_id": access_data['project_id']}
if apply_rule:
key = apply_rule
rule = rules[apply_rule]
- _try_rule(key, rule, target, access_data, o)
+ _try_rule(key, rule, target_data, access_data, o)
return
for key, rule in rules.items():
if ":" in key:
- _try_rule(key, rule, target, access_data, o)
+ _try_rule(key, rule, target_data, access_data, o)
def main():
@@ -73,6 +96,11 @@ def main():
help='path to a file containing OpenStack Identity API' +
' access info in JSON format')
parser.add_argument(
+ '--target',
+ type=argparse.FileType('rb', 0),
+ help='path to a file containing custom target info in' +
+ ' JSON format. This will be used to evaluate the policy with.')
+ parser.add_argument(
'--rule',
help='rule to test')
@@ -85,7 +113,7 @@ def main():
is_admin = args.is_admin.lower() == "true"
except Exception:
is_admin = False
- tool(args.policy, args.access, args.rule, is_admin)
+ tool(args.policy, args.access, args.rule, is_admin, args.target)
if __name__ == "__main__":
diff --git a/oslo_policy/tests/test_shell.py b/oslo_policy/tests/test_shell.py
index 70fa407..97c64c8 100644
--- a/oslo_policy/tests/test_shell.py
+++ b/oslo_policy/tests/test_shell.py
@@ -62,6 +62,39 @@ class CheckerTestCase(base.PolicyBaseTestCase):
'''
self.assertEqual(expected, stdout.getvalue())
+ @mock.patch("oslo_policy._checks.TrueCheck.__call__")
+ def test_pass_rule_parameters_with_custom_target(self, call_mock):
+ apply_rule = None
+ is_admin = False
+ access_data = token_fixture.SCOPED_TOKEN_FIXTURE["token"]
+ access_data['roles'] = [
+ role['name'] for role in access_data['roles']]
+ access_data['project_id'] = access_data['project']['id']
+ access_data['is_admin'] = is_admin
+
+ sample_target = {
+ "project_id": access_data["project"]["id"],
+ "domain_id": access_data["project"]["domain"]["id"]
+ }
+ self.create_config_file(
+ "target.json",
+ jsonutils.dumps(sample_target))
+
+ policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r')
+ access_file = open(self.get_config_file_fullname('access.json'), 'r')
+ target_file = open(self.get_config_file_fullname('target.json'), 'r')
+ stdout = self._capture_stdout()
+
+ shell.tool(policy_file, access_file, apply_rule, is_admin,
+ target_file)
+ call_mock.assert_called_once_with(
+ sample_target, access_data, mock.ANY,
+ current_rule="sampleservice:sample_rule")
+
+ expected = '''passed: sampleservice:sample_rule
+'''
+ self.assertEqual(expected, stdout.getvalue())
+
def test_all_nonadmin(self):
policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r')
@@ -75,3 +108,30 @@ class CheckerTestCase(base.PolicyBaseTestCase):
expected = '''passed: sampleservice:sample_rule
'''
self.assertEqual(expected, stdout.getvalue())
+
+ def test_flatten_from_dict(self):
+ target = {
+ "target": {
+ "secret": {
+ "project_id": "1234"
+ }
+ }
+ }
+ result = shell.flatten(target)
+ self.assertEqual(result, {"target.secret.project_id": "1234"})
+
+ def test_flatten_from_file(self):
+ target = {
+ "target": {
+ "secret": {
+ "project_id": "1234"
+ }
+ }
+ }
+ self.create_config_file(
+ "target.json",
+ jsonutils.dumps(target))
+ target_file = open(self.get_config_file_fullname('target.json'), 'r')
+ target_from_file = target_file.read()
+ result = shell.flatten(jsonutils.loads(target_from_file))
+ self.assertEqual(result, {"target.secret.project_id": "1234"})
diff --git a/releasenotes/notes/Pass-target-dict-to-oslopolicy-checker-87185d40aec413ee.yaml b/releasenotes/notes/Pass-target-dict-to-oslopolicy-checker-87185d40aec413ee.yaml
new file mode 100644
index 0000000..a844f3f
--- /dev/null
+++ b/releasenotes/notes/Pass-target-dict-to-oslopolicy-checker-87185d40aec413ee.yaml
@@ -0,0 +1,6 @@
+---
+features:
+ - |
+ oslopolicy-checker was added the ability to accept a file containing a hash
+ that represents the target. This makes it possible to check policies that
+ have non-conventional targets such as barbican.