summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-03-09 20:25:38 +0000
committerGerrit Code Review <review@openstack.org>2021-03-09 20:25:38 +0000
commit1010805bfe4d8e91aeb6b6bf302803de793634f2 (patch)
tree2763f05b2647f917a7daf62c0fec54385668c535
parenteecb2f057ead37bf73beb0a734ca5784037db44b (diff)
parent31414b9f61ac4c923ede30a3c7e32662e9c14e1a (diff)
downloadglance-1010805bfe4d8e91aeb6b6bf302803de793634f2.tar.gz
Merge "Implement project personas for image actions"
-rw-r--r--glance/api/middleware/cache.py10
-rw-r--r--glance/api/policy.py138
-rw-r--r--glance/api/v2/image_data.py3
-rw-r--r--glance/policies/base.py48
-rw-r--r--glance/policies/image.py333
-rw-r--r--glance/tests/unit/test_policy.py299
-rw-r--r--glance/tests/unit/v2/test_image_data_resource.py10
7 files changed, 687 insertions, 154 deletions
diff --git a/glance/api/middleware/cache.py b/glance/api/middleware/cache.py
index 0e54aaaf2..a71e46922 100644
--- a/glance/api/middleware/cache.py
+++ b/glance/api/middleware/cache.py
@@ -99,6 +99,16 @@ class CacheFilter(wsgi.Middleware):
"""Authorize an action against our policies"""
if target is None:
target = {}
+ if isinstance(target, policy.ImageTarget):
+ # The _get_v2_image_metadata() method returns an instance of
+ # ImageTarget(), which isn't mutable and we need to make sure we
+ # update the project_id attribute of the target to match the owner.
+ # If the target isn't of dict() type already, we should explicitly
+ # handle that here. We take a similar approach in the policy layer
+ # (glance.api.policy) to make sure we're build accurate image
+ # targets.
+ target = dict(target)
+ target['project_id'] = target.get('owner', None)
try:
self.policy.enforce(req.context, action, target)
except exception.Forbidden as e:
diff --git a/glance/api/policy.py b/glance/api/policy.py
index 62ba6c246..6c494fd90 100644
--- a/glance/api/policy.py
+++ b/glance/api/policy.py
@@ -127,20 +127,73 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
def get(self, image_id):
image = super(ImageRepoProxy, self).get(image_id)
- self.policy.enforce(self.context, 'get_image',
- dict(ImageTarget(image)))
+ target = self._build_image_target(image)
+ try:
+ self.policy.enforce(self.context, 'get_image', target)
+ except exception.Forbidden:
+ # NOTE (abhishekk): Returning 404 Not Found as the
+ # image is outside of this user's project
+ msg = _("No image found with ID %s") % image_id
+ raise exception.NotFound(msg)
return image
+ def _build_image_target(self, image):
+ """Build a policy enforcement target for an image.
+
+ :param image: An instance of `glance.domain.Image`
+ :returns: a dictionary representing the image for policy enforcement
+
+ """
+ target = dict(ImageTarget(image))
+ target['project_id'] = image.owner
+
+ # We do this so that members of the image can pass the policy for
+ # getting an image shared with their project. An alternative would be
+ # to update the image owner, or project_id, to match the member ID,
+ # tricking the policy enforcer into thinking image members are actually
+ # image owners. But, that feels less clear without understanding the
+ # code that makes that assumption, especially for operators reading
+ # check strings. Using this approach forces the check_str to be more
+ # descriptive.
+ members = self.image_repo.db_api.image_member_find(
+ self.context, image_id=image.image_id)
+ # FIXME(lbragstad): Remove this if statement if/when oslo.policy
+ # supports lists of target attributes via substitution, allowing us to
+ # do something like:
+ #
+ # target['member_ids'] = set(m['member'] for m in members)
+ for member in members:
+ if member['member'] == self.context.project_id:
+ target['member_id'] = member['member']
+ break
+ return target
+
def list(self, *args, **kwargs):
- self.policy.enforce(self.context, 'get_images', {})
+ # FIXME(lbragstad): This is a hack to get policy to pass because we
+ # don't have a reasonable target to use for all images. We set the
+ # target project_id to the context project_id, which effectively
+ # ensures the context project_id matches itself in policy enforcement.
+ #
+ # A more accurate and cleaner way to implement this, and filtering,
+ # would be to query all images from the database, build a target for
+ # each image, and then iterate over each image and call policy
+ # enforcement. If the user passes policy enforcement, append the image
+ # to the list of filtered images. If not, the image should be removed
+ # from the list of images returned to the user.
+ target = {'project_id': self.context.project_id}
+ self.policy.enforce(self.context, 'get_images', target)
return super(ImageRepoProxy, self).list(*args, **kwargs)
def save(self, image, from_state=None):
- self.policy.enforce(self.context, 'modify_image', dict(image.target))
+ target = dict(image.target)
+ target['project_id'] = target.get('owner', None)
+ self.policy.enforce(self.context, 'modify_image', target)
return super(ImageRepoProxy, self).save(image, from_state=from_state)
def add(self, image):
- self.policy.enforce(self.context, 'add_image', dict(image.target))
+ target = dict(image.target)
+ target['project_id'] = target.get('owner', None)
+ self.policy.enforce(self.context, 'add_image', target)
return super(ImageRepoProxy, self).add(image)
@@ -166,8 +219,9 @@ class ImageProxy(glance.domain.proxy.Image):
@visibility.setter
def visibility(self, value):
- _enforce_image_visibility(self.policy, self.context, value,
- self.target)
+ target = dict(self.target)
+ target['project_id'] = target.get('owner', None)
+ _enforce_image_visibility(self.policy, self.context, value, target)
self.image.visibility = value
@property
@@ -188,25 +242,31 @@ class ImageProxy(glance.domain.proxy.Image):
self.image.locations = new_locations
def delete(self):
- self.policy.enforce(self.context, 'delete_image', dict(self.target))
+ target = dict(self.target)
+ target['project_id'] = target.get('owner', None)
+ self.policy.enforce(self.context, 'delete_image', target)
return self.image.delete()
def deactivate(self):
LOG.debug('Attempting deactivate')
- target = ImageTarget(self.image)
+ target = dict(ImageTarget(self.image))
+ target['project_id'] = target.get('owner', None)
self.policy.enforce(self.context, 'deactivate', target=target)
LOG.debug('Deactivate allowed, continue')
self.image.deactivate()
def reactivate(self):
LOG.debug('Attempting reactivate')
- target = ImageTarget(self.image)
+ target = dict(ImageTarget(self.image))
+ target['project_id'] = target.get('owner', None)
self.policy.enforce(self.context, 'reactivate', target=target)
LOG.debug('Reactivate allowed, continue')
self.image.reactivate()
def get_data(self, *args, **kwargs):
- self.policy.enforce(self.context, 'download_image', self.target)
+ target = dict(ImageTarget(self.image))
+ target['project_id'] = target.get('owner', None)
+ self.policy.enforce(self.context, 'download_image', target)
return self.image.get_data(*args, **kwargs)
def set_data(self, *args, **kwargs):
@@ -234,8 +294,14 @@ class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
proxy_kwargs=proxy_kwargs)
def new_image(self, **kwargs):
+ # If we reversed the order of this method and did the policy
+ # enforcement on the way out instead of before we build the image
+ # target reference, we could use the actual image as a target instead
+ # of building a faux target with one attribute.
+ target = {}
+ target['project_id'] = kwargs.get('owner', None)
_enforce_image_visibility(self.policy, self.context,
- kwargs.get('visibility'), {})
+ kwargs.get('visibility'), target)
return super(ImageFactoryProxy, self).new_image(**kwargs)
@@ -258,23 +324,46 @@ class ImageMemberRepoProxy(glance.domain.proxy.Repo):
self.policy = policy
def add(self, member):
- self.policy.enforce(self.context, 'add_member', self.target)
+ target = dict(self.target)
+ target['project_id'] = self.context.project_id
+ self.policy.enforce(self.context, 'add_member', target)
self.member_repo.add(member)
def get(self, member_id):
- self.policy.enforce(self.context, 'get_member', self.target)
+ # NOTE(lbragstad): We set the project_id of the target to be the
+ # project_id of the context object, which is effectively a no-op
+ # because we're checking the context.project_id matches the
+ # context.project_id. This is a bandaid to allow project-members and
+ # project-readers to view shared images owned by their project, or to
+ # view images shared with them by another project. Glance's database
+ # layer filters the images by ownership and membership, based on the
+ # context object and administrative checks. If or when that code is
+ # pulled into a higher level and if we have a list of members for an
+ # image, we can write a more accurate target.
+ target = dict(self.target)
+ # We can't set the project_id as the image project_id because that
+ # wouldn't allow image members to pass the policy check. We need the
+ # list of image members to build an accurate target.
+ target['project_id'] = self.context.project_id
+ self.policy.enforce(self.context, 'get_member', target)
return self.member_repo.get(member_id)
def save(self, member, from_state=None):
- self.policy.enforce(self.context, 'modify_member', self.target)
+ target = dict(self.target)
+ target['project_id'] = self.context.project_id
+ self.policy.enforce(self.context, 'modify_member', target)
self.member_repo.save(member, from_state=from_state)
def list(self, *args, **kwargs):
- self.policy.enforce(self.context, 'get_members', self.target)
+ target = dict(self.target)
+ target['project_id'] = self.context.project_id
+ self.policy.enforce(self.context, 'get_members', target)
return self.member_repo.list(*args, **kwargs)
def remove(self, member):
- self.policy.enforce(self.context, 'delete_member', self.target)
+ target = dict(self.target)
+ target['project_id'] = self.context.project_id
+ self.policy.enforce(self.context, 'delete_member', target)
self.member_repo.remove(member)
@@ -297,7 +386,11 @@ class ImageLocationsProxy(object):
def _get_checker(action, func_name):
def _checker(self, *args, **kwargs):
- self.policy.enforce(self.context, action, {})
+ target = {}
+ if self.context.project_id:
+ target['project_id'] = self.context.project_id
+
+ self.policy.enforce(self.context, action, target)
method = getattr(self.locations, func_name)
return method(*args, **kwargs)
return _checker
@@ -410,6 +503,15 @@ class ImageTarget(abc.Mapping):
self.target = target
self._target_keys = [k for k in dir(ImageProxy)
if not k.startswith('__')
+ # NOTE(lbragstad): The locations attributes is an
+ # instance of ImageLocationsProxy, which isn't
+ # serialized into anything oslo.policy can use. If
+ # we need to use locations in policies, we need to
+ # modify how we represent those location objects
+ # before we call enforcement with target
+ # information. Omitting for not until that is a
+ # necessity.
+ if not k == 'locations'
if not callable(getattr(ImageProxy, k))]
def __getitem__(self, key):
diff --git a/glance/api/v2/image_data.py b/glance/api/v2/image_data.py
index 8c25b134a..a3235e9a6 100644
--- a/glance/api/v2/image_data.py
+++ b/glance/api/v2/image_data.py
@@ -138,8 +138,9 @@ class ImageDataController(object):
refresher = None
cxt = req.context
try:
- self.policy.enforce(cxt, 'upload_image', {})
image = image_repo.get(image_id)
+ target = {'project_id': image.owner}
+ self.policy.enforce(cxt, 'upload_image', target)
image.status = 'saving'
try:
# create a trust if backend is registry
diff --git a/glance/policies/base.py b/glance/policies/base.py
index b12fd9c74..046e4557c 100644
--- a/glance/policies/base.py
+++ b/glance/policies/base.py
@@ -14,13 +14,55 @@ from oslo_policy import policy
# Generic check string for checking if a user is authorized on a particular
# project, specifically with the member role.
-PROJECT_MEMBER = 'role:member and project_id:%(project_id)s'
-
+PROJECT_MEMBER = 'role:member and (project_id:%(project_id)s)'
# Generic check string for checking if a user is authorized on a particular
# project but with read-only access. For example, this persona would be able to
# list private images owned by a project but cannot make any writeable changes
# to those images.
-PROJECT_READER = 'role:reader and project_id:%(project_id)s'
+PROJECT_READER = 'role:reader and (project_id:%(project_id)s)'
+
+# Make sure the member_id of the supplied target matches the project_id from
+# the context object, which is derived from keystone tokens.
+IMAGE_MEMBER_CHECK = 'project_id:%(member_id)s'
+# Check if the visibility of the image supplied in the target matches
+# "community"
+COMMUNITY_VISIBILITY_CHECK = '"community":%(visibility)s'
+# Check if the visibility of the image supplied in the target matches "public"
+PUBLIC_VISIBILITY_CHECK = '"public":%(visibility)s'
+
+PROJECT_MEMBER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC = (
+ f'role:member and (project_id:%(project_id)s or {IMAGE_MEMBER_CHECK} '
+ f'or {COMMUNITY_VISIBILITY_CHECK} or {PUBLIC_VISIBILITY_CHECK})'
+)
+PROJECT_READER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC = (
+ f'role:reader and (project_id:%(project_id)s or {IMAGE_MEMBER_CHECK} '
+ f'or {COMMUNITY_VISIBILITY_CHECK} or {PUBLIC_VISIBILITY_CHECK})'
+)
+
+# FIXME(lbragstad): These are composite check strings that represents glance's
+# authorization code, some of which is implemented in the authorization wrapper
+# and some is in the database driver.
+#
+# These check strings do not support tenancy with the `admin` role. This means
+# anyone with the `admin` role on any project can execute a policy, which is
+# typical in OpenStack services. Eventually, these check strings will be
+# superceded by check strings that implement scope checking and system-scope
+# for applicable APIs (e.g., making an image public). But, we have a lot of
+# cleanup to do in different parts of glance to sweep all the authorization
+# code into a single layer before we can safely consume system-scope and
+# implement scope checking. This refactoring also needs significant API testing
+# to ensure we don't leave doors open to unintended users, or expose
+# authoritative regressions. In the mean time, we can use the following check
+# strings to offer formal support for project membership and a read-only
+# variant consistent with other OpenStack services.
+ADMIN_OR_PROJECT_MEMBER = f'role:admin or ({PROJECT_MEMBER})'
+ADMIN_OR_PROJECT_READER = f'role:admin or ({PROJECT_READER})'
+ADMIN_OR_PROJECT_READER_GET_IMAGE = (
+ f'role:admin or ({PROJECT_READER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC})'
+)
+ADMIN_OR_PROJECT_MEMBER_DOWNLOAD_IMAGE = (
+ f'role:admin or ({PROJECT_MEMBER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC})'
+)
rules = [
diff --git a/glance/policies/image.py b/glance/policies/image.py
index 19ba9843f..2b24ff57c 100644
--- a/glance/policies/image.py
+++ b/glance/policies/image.py
@@ -9,38 +9,327 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
+from oslo_log import versionutils
from oslo_policy import policy
+from glance.policies import base
+
+
+DEPRECATED_REASON = """
+The image API now supports and default roles.
+"""
+
image_policies = [
- policy.RuleDefault(name="add_image", check_str="rule:default"),
- policy.RuleDefault(name="delete_image", check_str="rule:default"),
- policy.RuleDefault(name="get_image", check_str="rule:default"),
- policy.RuleDefault(name="get_images", check_str="rule:default"),
- policy.RuleDefault(name="modify_image", check_str="rule:default"),
- policy.RuleDefault(name="publicize_image", check_str="role:admin"),
- policy.RuleDefault(name="communitize_image", check_str="rule:default"),
+ policy.DocumentedRuleDefault(
+ name="add_image",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Create new image',
+ operations=[
+ {'path': '/v2/images',
+ 'method': 'POST'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="add_image", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="delete_image",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Deletes the image',
+ operations=[
+ {'path': '/v2/images/{image_id}',
+ 'method': 'DELETE'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="delete_image", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="get_image",
+ check_str=base.ADMIN_OR_PROJECT_READER_GET_IMAGE,
+ scope_types=['system', 'project'],
+ description='Get specified image',
+ operations=[
+ {'path': '/v2/images/{image_id}',
+ 'method': 'GET'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="get_image", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="get_images",
+ check_str=base.ADMIN_OR_PROJECT_READER,
+ scope_types=['system', 'project'],
+ description='Get all available images',
+ operations=[
+ {'path': '/v2/images',
+ 'method': 'GET'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="get_images", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="modify_image",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Updates given image',
+ operations=[
+ {'path': '/v2/images/{image_id}',
+ 'method': 'PATCH'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="modify_image", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="publicize_image",
+ check_str='role:admin',
+ scope_types=['system', 'project'],
+ description='Publicize given image',
+ operations=[
+ {'path': '/v2/images/{image_id}',
+ 'method': 'PATCH'}
+ ]
+ ),
+ policy.DocumentedRuleDefault(
+ name="communitize_image",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Communitize given image',
+ operations=[
+ {'path': '/v2/images/{image_id}',
+ 'method': 'PATCH'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="communitize_image", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
- policy.RuleDefault(name="download_image", check_str="rule:default"),
- policy.RuleDefault(name="upload_image", check_str="rule:default"),
+ policy.DocumentedRuleDefault(
+ name="download_image",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER_DOWNLOAD_IMAGE,
+ scope_types=['system', 'project'],
+ description='Downloads given image',
+ operations=[
+ {'path': '/v2/images/{image_id}/file',
+ 'method': 'GET'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="download_image", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="upload_image",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Uploads data to specified image',
+ operations=[
+ {'path': '/v2/images/{image_id}/file',
+ 'method': 'PUT'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="upload_image", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
- policy.RuleDefault(name="delete_image_location", check_str="rule:default"),
- policy.RuleDefault(name="get_image_location", check_str="rule:default"),
- policy.RuleDefault(name="set_image_location", check_str="rule:default"),
+ policy.DocumentedRuleDefault(
+ name="delete_image_location",
+ check_str='role:admin',
+ scope_types=['system', 'project'],
+ description='Deletes the location of given image',
+ operations=[
+ {'path': '/v2/images/{image_id}',
+ 'method': 'PATCH'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="delete_image_location", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="get_image_location",
+ check_str=base.ADMIN_OR_PROJECT_READER,
+ scope_types=['system', 'project'],
+ description='Reads the location of the image',
+ operations=[
+ {'path': '/v2/images/{image_id}',
+ 'method': 'GET'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="get_image_location", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="set_image_location",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Sets location URI to given image',
+ operations=[
+ {'path': '/v2/images/{image_id}',
+ 'method': 'PATCH'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="set_image_location", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
- policy.RuleDefault(name="add_member", check_str="rule:default"),
- policy.RuleDefault(name="delete_member", check_str="rule:default"),
- policy.RuleDefault(name="get_member", check_str="rule:default"),
- policy.RuleDefault(name="get_members", check_str="rule:default"),
- policy.RuleDefault(name="modify_member", check_str="rule:default"),
+ policy.DocumentedRuleDefault(
+ name="add_member",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Create image member',
+ operations=[
+ {'path': '/v2/images/{image_id}/members',
+ 'method': 'POST'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="add_member", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="delete_member",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Delete image member',
+ operations=[
+ {'path': '/v2/images/{image_id}/members/{member_id}',
+ 'method': 'DELETE'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="delete_member", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="get_member",
+ check_str=base.ADMIN_OR_PROJECT_READER,
+ scope_types=['system', 'project'],
+ description='Show image member details',
+ operations=[
+ {'path': '/v2/images/{image_id}/members/{member_id}',
+ 'method': 'GET'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="get_member", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="get_members",
+ check_str=base.ADMIN_OR_PROJECT_READER,
+ scope_types=['system', 'project'],
+ description='List image members',
+ operations=[
+ {'path': '/v2/images/{image_id}/members',
+ 'method': 'GET'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="get_members", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="modify_member",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Update image member',
+ operations=[
+ {'path': '/v2/images/{image_id}/members/{member_id}',
+ 'method': 'PUT'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="modify_member", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
- policy.RuleDefault(name="manage_image_cache", check_str="role:admin"),
+ policy.RuleDefault(
+ name="manage_image_cache",
+ check_str='role:admin',
+ # NOTE(lbragstad): Remove 'project' from the list below when glance
+ # fully supports system-scope and this policy is updated to reflect
+ # that in the check string.
+ scope_types=['system', 'project'],
+ description='Manage image cache'
+ ),
- policy.RuleDefault(name="deactivate", check_str="rule:default"),
- policy.RuleDefault(name="reactivate", check_str="rule:default"),
+ policy.DocumentedRuleDefault(
+ name="deactivate",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Deactivate image',
+ operations=[
+ {'path': '/v2/images/{image_id}/actions/deactivate',
+ 'method': 'POST'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="deactivate", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
+ policy.DocumentedRuleDefault(
+ name="reactivate",
+ check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ scope_types=['system', 'project'],
+ description='Reactivate image',
+ operations=[
+ {'path': '/v2/images/{image_id}/actions/reactivate',
+ 'method': 'POST'}
+ ],
+ deprecated_rule=policy.DeprecatedRule(
+ name="reactivate", check_str="rule:default"
+ ),
+ deprecated_reason=DEPRECATED_REASON,
+ deprecated_since=versionutils.deprecated.WALLABY
+ ),
- policy.RuleDefault(name="copy_image", check_str="role:admin"),
+ policy.DocumentedRuleDefault(
+ name="copy_image",
+ check_str='role:admin',
+ # Eventually, we need to make sure we update the check string here to
+ # be scope-aware, but for now this is restricted to system-admins and
+ # project-admins. That might change in the future if we decide to push
+ # this functionality down to project-members.
+ scope_types=['system', 'project'],
+ description='Copy existing image to other stores',
+ operations=[
+ {'path': '/v2/images/{image_id}/import',
+ 'method': 'POST'}
+ ]
+ ),
]
diff --git a/glance/tests/unit/test_policy.py b/glance/tests/unit/test_policy.py
index 903dbb0be..9e77e0645 100644
--- a/glance/tests/unit/test_policy.py
+++ b/glance/tests/unit/test_policy.py
@@ -42,8 +42,18 @@ class IterableMock(mock.Mock, abc.Iterable):
class ImageRepoStub(object):
+ def __init__(self):
+ self.db_api = mock.Mock()
+ self.db_api.image_member_find.return_value = [
+ {'member': 'foo'}
+ ]
+
def get(self, *args, **kwargs):
- return 'image_from_get'
+ context = mock.Mock()
+ policy = mock.Mock()
+ return glance.api.policy.ImageProxy(
+ ImageStub(image_id=UUID1), context, policy
+ )
def save(self, *args, **kwargs):
return 'image_from_save'
@@ -400,6 +410,7 @@ class TestPolicyEnforcerNoFile(base.IsolatedUnitTest):
def test_policy_file_specified_but_not_found(self):
"""Missing defined policy file should result in a default ruleset"""
self.config(policy_file='gobble.gobble', group='oslo_policy')
+ self.config(enforce_new_defaults=True, group='oslo_policy')
enforcer = glance.api.policy.Enforcer()
context = glance.context.RequestContext(roles=[])
@@ -412,6 +423,8 @@ class TestPolicyEnforcerNoFile(base.IsolatedUnitTest):
def test_policy_file_default_not_found(self):
"""Missing default policy file should result in a default ruleset"""
+ self.config(enforce_new_defaults=True, group='oslo_policy')
+
def fake_find_file(self, name):
return None
@@ -436,173 +449,213 @@ class TestImagePolicy(test_utils.BaseTestCase):
self.image_factory_stub = ImageFactoryStub()
self.policy = mock.Mock()
self.policy.enforce = mock.Mock()
+ self.context = mock.Mock()
super(TestImagePolicy, self).setUp()
def test_publicize_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden,
setattr, image, 'visibility', 'public')
self.assertEqual('private', image.visibility)
- self.policy.enforce.assert_called_once_with({}, "publicize_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "publicize_image", expected_target)
def test_publicize_image_allowed(self):
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
image.visibility = 'public'
self.assertEqual('public', image.visibility)
- self.policy.enforce.assert_called_once_with({}, "publicize_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "publicize_image", expected_target)
def test_communitize_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden,
setattr, image, 'visibility', 'community')
self.assertEqual('private', image.visibility)
- self.policy.enforce.assert_called_once_with({}, "communitize_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "communitize_image", expected_target)
def test_communitize_image_allowed(self):
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
image.visibility = 'community'
self.assertEqual('community', image.visibility)
- self.policy.enforce.assert_called_once_with({}, "communitize_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "communitize_image", expected_target)
def test_delete_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden, image.delete)
self.assertEqual('active', image.status)
- self.policy.enforce.assert_called_once_with({}, "delete_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "delete_image", expected_target)
def test_delete_image_allowed(self):
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
- args = dict(image.target)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
image.delete()
self.assertEqual('deleted', image.status)
- self.policy.enforce.assert_called_once_with({}, "delete_image", args)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "delete_image", expected_target)
def test_get_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_target = IterableMock()
with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
target.return_value = image_target
- image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
- {}, self.policy)
- self.assertRaises(exception.Forbidden, image_repo.get, UUID1)
- self.policy.enforce.assert_called_once_with({}, "get_image",
- dict(image_target))
+ image_repo = glance.api.policy.ImageRepoProxy(
+ self.image_repo_stub, self.context, self.policy)
+ self.assertRaises(exception.NotFound, image_repo.get, UUID1)
+ expected_target = {'project_id': 'tenant1'}
+ self.policy.enforce.assert_called_once_with(
+ self.context, "get_image", expected_target)
def test_get_image_allowed(self):
- image_target = IterableMock()
- with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
- target.return_value = image_target
- image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
- {}, self.policy)
- output = image_repo.get(UUID1)
- self.assertIsInstance(output, glance.api.policy.ImageProxy)
- self.assertEqual('image_from_get', output.image)
- self.policy.enforce.assert_called_once_with({}, "get_image",
- dict(image_target))
+ image_repo = glance.api.policy.ImageRepoProxy(
+ self.image_repo_stub, self.context, self.policy)
+ image = image_repo.get(UUID1)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
+ self.assertIsInstance(image, glance.api.policy.ImageProxy)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "get_image", expected_target)
def test_get_images_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
- image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
- {}, self.policy)
+ image_repo = glance.api.policy.ImageRepoProxy(
+ self.image_repo_stub, self.context, self.policy)
self.assertRaises(exception.Forbidden, image_repo.list)
- self.policy.enforce.assert_called_once_with({}, "get_images", {})
+ expected_target = {'project_id': self.context.project_id}
+ self.policy.enforce.assert_called_once_with(
+ self.context, "get_images", expected_target)
def test_get_images_allowed(self):
- image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
- {}, self.policy)
+ expected_target = {'project_id': self.context.project_id}
+ image_repo = glance.api.policy.ImageRepoProxy(
+ self.image_repo_stub, self.context, self.policy)
images = image_repo.list()
for i, image in enumerate(images):
self.assertIsInstance(image, glance.api.policy.ImageProxy)
self.assertEqual('image_from_list_%d' % i, image.image)
- self.policy.enforce.assert_called_once_with({}, "get_images", {})
+ self.policy.enforce.assert_called_once_with(
+ self.context, "get_images", expected_target)
def test_modify_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
- image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
- {}, self.policy)
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image_repo = glance.api.policy.ImageRepoProxy(
+ self.image_repo_stub, self.context, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden, image_repo.save, image)
- self.policy.enforce.assert_called_once_with({}, "modify_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "modify_image", expected_target)
def test_modify_image_allowed(self):
- image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
- {}, self.policy)
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image_repo = glance.api.policy.ImageRepoProxy(
+ self.image_repo_stub, self.context, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
image_repo.save(image)
- self.policy.enforce.assert_called_once_with({}, "modify_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "modify_image", expected_target)
def test_add_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
- image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
- {}, self.policy)
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image_repo = glance.api.policy.ImageRepoProxy(
+ self.image_repo_stub, self.context, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden, image_repo.add, image)
- self.policy.enforce.assert_called_once_with({}, "add_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "add_image", expected_target)
def test_add_image_allowed(self):
- image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
- {}, self.policy)
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ image_repo = glance.api.policy.ImageRepoProxy(
+ self.image_repo_stub, self.context, self.policy)
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
image_repo.add(image)
- self.policy.enforce.assert_called_once_with({}, "add_image",
- image.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "add_image", expected_target)
def test_new_image_visibility_public_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_factory = glance.api.policy.ImageFactoryProxy(
- self.image_factory_stub, {}, self.policy)
+ self.image_factory_stub, self.context, self.policy)
self.assertRaises(exception.Forbidden, image_factory.new_image,
- visibility='public')
- self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
+ visibility='public', owner='tenant1')
+ expected_target = {'project_id': 'tenant1'}
+ self.policy.enforce.assert_called_once_with(
+ self.context, "publicize_image", expected_target)
def test_new_image_visibility_public_allowed(self):
image_factory = glance.api.policy.ImageFactoryProxy(
- self.image_factory_stub, {}, self.policy)
- image_factory.new_image(visibility='public')
- self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
+ self.image_factory_stub, self.context, self.policy)
+ image_factory.new_image(visibility='public', owner='tenant1')
+ expected_target = {'project_id': 'tenant1'}
+ self.policy.enforce.assert_called_once_with(
+ self.context, "publicize_image", expected_target)
def test_new_image_visibility_community_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_factory = glance.api.policy.ImageFactoryProxy(
- self.image_factory_stub, {}, self.policy)
+ self.image_factory_stub, self.context, self.policy)
self.assertRaises(exception.Forbidden, image_factory.new_image,
- visibility='community')
- self.policy.enforce.assert_called_once_with({},
- "communitize_image",
- {})
+ visibility='community', owner='tenant1')
+ expected_target = {'project_id': 'tenant1'}
+ self.policy.enforce.assert_called_once_with(
+ self.context, "communitize_image", expected_target)
def test_new_image_visibility_community_allowed(self):
image_factory = glance.api.policy.ImageFactoryProxy(
- self.image_factory_stub, {}, self.policy)
- image_factory.new_image(visibility='community')
- self.policy.enforce.assert_called_once_with({},
+ self.image_factory_stub, self.context, self.policy)
+ image_factory.new_image(visibility='community', owner='tenant1')
+ expected_target = {'project_id': 'tenant1'}
+ self.policy.enforce.assert_called_once_with(self.context,
"communitize_image",
- {})
+ expected_target)
def test_image_get_data_policy_enforced_with_target(self):
extra_properties = {
'test_key': 'test_4321'
}
image_stub = ImageStub(UUID1, extra_properties=extra_properties)
- with mock.patch('glance.api.policy.ImageTarget'):
- image = glance.api.policy.ImageProxy(image_stub, {}, self.policy)
- target = image.target
+ image = glance.api.policy.ImageProxy(
+ image_stub, self.context, self.policy)
+ expected_target = dict(image.target)
+ expected_target['project_id'] = image.owner
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, image.get_data)
- self.policy.enforce.assert_called_once_with({}, "download_image",
- target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "download_image", expected_target)
class TestMemberPolicy(test_utils.BaseTestCase):
@@ -611,74 +664,77 @@ class TestMemberPolicy(test_utils.BaseTestCase):
self.policy = mock.Mock()
self.policy.enforce = mock.Mock()
self.image_stub = ImageStub(UUID1)
- image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
+ self.context = mock.Mock()
+ image = glance.api.policy.ImageProxy(
+ self.image_stub, self.context, self.policy)
self.member_repo = glance.api.policy.ImageMemberRepoProxy(
- MemberRepoStub(), image, {}, self.policy)
- self.target = self.member_repo.target
+ MemberRepoStub(), image, self.context, self.policy)
+ self.target = dict(self.member_repo.target)
+ self.target['project_id'] = self.context.project_id
super(TestMemberPolicy, self).setUp()
def test_add_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.add, '')
- self.policy.enforce.assert_called_once_with({}, "add_member",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "add_member", self.target)
def test_add_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.add(image_member)
self.assertEqual('member_repo_add', image_member.output)
- self.policy.enforce.assert_called_once_with({}, "add_member",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "add_member", self.target)
def test_get_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.get, '')
- self.policy.enforce.assert_called_once_with({}, "get_member",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "get_member", self.target)
def test_get_member_allowed(self):
output = self.member_repo.get('')
self.assertEqual('member_repo_get', output)
- self.policy.enforce.assert_called_once_with({}, "get_member",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "get_member", self.target)
def test_modify_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.save, '')
- self.policy.enforce.assert_called_once_with({}, "modify_member",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "modify_member", self.target)
def test_modify_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.save(image_member)
self.assertEqual('member_repo_save', image_member.output)
- self.policy.enforce.assert_called_once_with({}, "modify_member",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "modify_member", self.target)
def test_get_members_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.list, '')
- self.policy.enforce.assert_called_once_with({}, "get_members",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "get_members", self.target)
def test_get_members_allowed(self):
output = self.member_repo.list('')
self.assertEqual('member_repo_list', output)
- self.policy.enforce.assert_called_once_with({}, "get_members",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "get_members", self.target)
def test_delete_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.remove, '')
- self.policy.enforce.assert_called_once_with({}, "delete_member",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "delete_member", self.target)
def test_delete_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.remove(image_member)
self.assertEqual('member_repo_remove', image_member.output)
- self.policy.enforce.assert_called_once_with({}, "delete_member",
- self.target)
+ self.policy.enforce.assert_called_once_with(
+ self.context, "delete_member", self.target)
class TestTaskPolicy(test_utils.BaseTestCase):
@@ -1006,9 +1062,38 @@ class TestContextPolicyEnforcer(base.IsolatedUnitTest):
class TestDefaultPolicyCheckStrings(base.IsolatedUnitTest):
def test_project_member_check_string(self):
- self.assertEqual('role:member and project_id:%(project_id)s',
- base_policy.PROJECT_MEMBER)
+ expected = 'role:member and (project_id:%(project_id)s)'
+ self.assertEqual(expected, base_policy.PROJECT_MEMBER)
+
+ def test_project_member_download_image_check_string(self):
+ expected = (
+ 'role:member and (project_id:%(project_id)s or '
+ 'project_id:%(member_id)s or "community":%(visibility)s or '
+ '"public":%(visibility)s)'
+ )
+ self.assertEqual(
+ expected,
+ base_policy.PROJECT_MEMBER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC
+ )
def test_project_reader_check_string(self):
- self.assertEqual('role:reader and project_id:%(project_id)s',
- base_policy.PROJECT_READER)
+ expected = 'role:reader and (project_id:%(project_id)s)'
+ self.assertEqual(expected, base_policy.PROJECT_READER)
+
+ def test_project_reader_get_image_check_string(self):
+ expected = (
+ 'role:reader and (project_id:%(project_id)s or '
+ 'project_id:%(member_id)s or "community":%(visibility)s or '
+ '"public":%(visibility)s)'
+ )
+ self.assertEqual(
+ expected,
+ base_policy.PROJECT_READER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC
+ )
+
+
+class TestImageTarget(base.IsolatedUnitTest):
+ def test_image_target_ignores_locations(self):
+ image = ImageStub()
+ target = glance.api.policy.ImageTarget(image)
+ self.assertNotIn('locations', list(target))
diff --git a/glance/tests/unit/v2/test_image_data_resource.py b/glance/tests/unit/v2/test_image_data_resource.py
index 2c36bfb8a..341178730 100644
--- a/glance/tests/unit/v2/test_image_data_resource.py
+++ b/glance/tests/unit/v2/test_image_data_resource.py
@@ -48,7 +48,7 @@ class FakeImage(object):
def __init__(self, image_id=None, data=None, checksum=None, size=0,
virtual_size=0, locations=None, container_format='bear',
- disk_format='rawr', status=None):
+ disk_format='rawr', status=None, owner=None):
self.image_id = image_id
self.data = data
self.checksum = checksum
@@ -57,6 +57,7 @@ class FakeImage(object):
self.locations = locations
self.container_format = container_format
self.disk_format = disk_format
+ self.owner = owner
self._status = status
self.extra_properties = {}
@@ -180,7 +181,7 @@ class TestImagesController(base.StoreClearingUnitTest):
def test_upload(self):
request = unit_test_utils.get_fake_request()
- image = FakeImage('abcd')
+ image = FakeImage('abcd', owner='tenant1')
self.image_repo.result = image
self.controller.upload(request, unit_test_utils.UUID2, 'YYYY', 4)
self.assertEqual('YYYY', image.data)
@@ -214,12 +215,15 @@ class TestImagesController(base.StoreClearingUnitTest):
@mock.patch.object(glance.api.policy.Enforcer, 'enforce')
def test_upload_image_forbidden(self, mock_enforce):
request = unit_test_utils.get_fake_request()
+ image = FakeImage('abcd', owner='tenant1')
+ self.image_repo.result = image
mock_enforce.side_effect = exception.Forbidden
self.assertRaises(webob.exc.HTTPForbidden, self.controller.upload,
request, unit_test_utils.UUID2, 'YYYY', 4)
+ expected_target = {'project_id': 'tenant1'}
mock_enforce.assert_called_once_with(request.context,
"upload_image",
- {})
+ expected_target)
def test_upload_invalid(self):
request = unit_test_utils.get_fake_request()