summaryrefslogtreecommitdiff
path: root/ironic/api/controllers/v1/utils.py
diff options
context:
space:
mode:
authorJulia Kreger <juliaashleykreger@gmail.com>2021-02-17 21:01:47 -0800
committerJulia Kreger <juliaashleykreger@gmail.com>2021-03-04 09:47:36 -0800
commite870bd34d0ccacbaef7f4e4def2535eb28f822b9 (patch)
treeb92287d93bd5f4eb3f7e4295a59f63a8d1cc0eb5 /ironic/api/controllers/v1/utils.py
parente9dfe5ddaad7324d8d89fef0661f41f18542028f (diff)
downloadironic-e870bd34d0ccacbaef7f4e4def2535eb28f822b9.tar.gz
Volume targets/connectors Project Scoped RBAC
This patch adds project scoped access, as part of the work to delineate system and project scope access. Adds policies: * baremetal:volume:list_all * baremetal:volume:list * baremetal:volume:view_target_properties Change-Id: I898310b515195b7065a3b1c7998ef3f29f5e8747
Diffstat (limited to 'ironic/api/controllers/v1/utils.py')
-rw-r--r--ironic/api/controllers/v1/utils.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/ironic/api/controllers/v1/utils.py b/ironic/api/controllers/v1/utils.py
index b618b2f45..01483c47b 100644
--- a/ironic/api/controllers/v1/utils.py
+++ b/ironic/api/controllers/v1/utils.py
@@ -1787,6 +1787,110 @@ def check_port_list_policy(portgroup=False, parent_node=None,
return owner
+def check_volume_list_policy(parent_node=None):
+ """Check if the specified policy authorizes this request on a port.
+
+ :param parent_node: The UUID of a node, if any, to apply a policy
+ check to as well before applying other policy
+ check operations.
+
+ :raises: HTTPForbidden if the policy forbids access.
+ :return: owner that should be used for list query, if needed
+ """
+
+ cdict = api.request.context.to_policy_values()
+
+ # No node is associated with this request, yet.
+ rpc_node = None
+ conceal_linked_node = None
+
+ if parent_node:
+ try:
+ rpc_node = objects.Node.get_by_uuid(api.request.context,
+ parent_node)
+ conceal_linked_node = rpc_node.uuid
+ except exception.NotFound:
+ raise exception.NodeNotFound(node=parent_node)
+ if parent_node:
+ try:
+ check_owner_policy(
+ 'node', 'baremetal:node:get',
+ rpc_node.owner, rpc_node.lessee,
+ conceal_node=conceal_linked_node)
+ except exception.NotAuthorized:
+ if parent_node:
+ # This should likely never be hit, because
+ # the existence of a parent node should
+ # trigger the node not found exception to be
+ # explicitly raised.
+ raise exception.NodeNotFound(
+ node=parent_node)
+ raise
+
+ try:
+ policy.authorize('baremetal:volume:list_all',
+ cdict, api.request.context)
+ except exception.HTTPForbidden:
+ owner = cdict.get('project_id')
+ if not owner:
+ raise
+ policy.authorize('baremetal:volume:list',
+ cdict, api.request.context)
+ return owner
+
+
+def check_volume_policy_and_retrieve(policy_name, vol_ident, target=False):
+ """Check if the specified policy authorizes this request on a port.
+
+ :param: policy_name: Name of the policy to check.
+ :param: vol_ident: The name, uuid, or other valid ID value to find
+ a port or portgroup by.
+ :param: target: Boolean value to indicate if the check is for a volume
+ target or connector. Default value is False, implying
+ connector.
+
+ :raises: HTTPForbidden if the policy forbids access.
+ :raises: VolumeConnectorNotFound if the node is not found.
+ :raises: VolumeTargetNotFound if the node is not found.
+ :return: RPC port identified by port_ident associated node
+ """
+ context = api.request.context
+ cdict = context.to_policy_values()
+ owner = None
+ lessee = None
+ try:
+ if not target:
+ rpc_vol = objects.VolumeConnector.get(context, vol_ident)
+ else:
+ rpc_vol = objects.VolumeTarget.get(context, vol_ident)
+ except (exception.VolumeConnectorNotFound, exception.VolumeTargetNotFound):
+ # don't expose non-existence of port unless requester
+ # has generic access to policy
+ raise
+
+ target_dict = dict(cdict)
+ try:
+ rpc_node = objects.Node.get_by_id(context, rpc_vol.node_id)
+ owner = rpc_node['owner']
+ lessee = rpc_node['lessee']
+ except exception.NodeNotFound:
+ pass
+ target_dict = dict(cdict)
+ target_dict['node.owner'] = owner
+ target_dict['node.lessee'] = lessee
+ try:
+ policy.authorize('baremetal:node:get', target_dict, context)
+ except exception.NotAuthorized:
+ if not target:
+ raise exception.VolumeConnectorNotFound(connector=vol_ident)
+ else:
+ raise exception.VolumeTargetNotFound(target=vol_ident)
+
+ policy.authorize(policy_name, target_dict, context)
+
+ return rpc_vol, rpc_node
+
+
def allow_build_configdrive():
"""Check if building configdrive is allowed.