summaryrefslogtreecommitdiff
path: root/neutron/tests/unit/extensions/test_securitygroup.py
diff options
context:
space:
mode:
Diffstat (limited to 'neutron/tests/unit/extensions/test_securitygroup.py')
-rw-r--r--neutron/tests/unit/extensions/test_securitygroup.py88
1 files changed, 41 insertions, 47 deletions
diff --git a/neutron/tests/unit/extensions/test_securitygroup.py b/neutron/tests/unit/extensions/test_securitygroup.py
index d0ca88b81a..bbf752e4cb 100644
--- a/neutron/tests/unit/extensions/test_securitygroup.py
+++ b/neutron/tests/unit/extensions/test_securitygroup.py
@@ -92,41 +92,39 @@ class SecurityGroupTestExtensionManager(object):
class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
- def _build_security_group(self, name, description, **kwargs):
+ def _build_security_group(self, name, description):
data = {
'security_group': {
'name': name,
- 'tenant_id': kwargs.get(
- 'tenant_id', test_db_base_plugin_v2.TEST_TENANT_ID),
'description': description}}
return data
- def _create_security_group_response(self, fmt, data, **kwargs):
- security_group_req = self.new_create_request('security-groups', data,
- fmt)
- if (kwargs.get('set_context') and 'tenant_id' in kwargs):
- # create a specific auth context for this request
- security_group_req.environ['neutron.context'] = (
- context.Context('', kwargs['tenant_id']))
+ def _create_security_group_response(self, fmt, data, tenant_id=None,
+ as_admin=False, **kwargs):
+ security_group_req = self.new_create_request(
+ 'security-groups', data, fmt, tenant_id=tenant_id,
+ as_admin=as_admin)
return security_group_req.get_response(self.ext_api)
- def _create_security_group(self, fmt, name, description, **kwargs):
- data = self._build_security_group(name, description, **kwargs)
- return self._create_security_group_response(fmt, data, **kwargs)
+ def _create_security_group(self, fmt, name, description, tenant_id=None,
+ as_admin=False, **kwargs):
+ data = self._build_security_group(name, description)
+ return self._create_security_group_response(
+ fmt, data, tenant_id=tenant_id, as_admin=as_admin, **kwargs)
def _build_security_group_rule(
self, security_group_id, direction, proto,
port_range_min=None, port_range_max=None,
remote_ip_prefix=None, remote_group_id=None,
remote_address_group_id=None,
- tenant_id=test_db_base_plugin_v2.TEST_TENANT_ID,
- ethertype=const.IPv4):
+ tenant_id=None,
+ ethertype=const.IPv4,
+ as_admin=False):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
'protocol': proto,
- 'ethertype': ethertype,
- 'tenant_id': tenant_id}}
+ 'ethertype': ethertype}}
if port_range_min:
data['security_group_rule']['port_range_min'] = port_range_min
@@ -145,19 +143,13 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
return data
- def _create_security_group_rule(self, fmt, rules, **kwargs):
+ def _create_security_group_rule(self, fmt, rules, tenant_id=None,
+ as_admin=False, **kwargs):
security_group_rule_req = self.new_create_request(
- 'security-group-rules', rules, fmt)
-
- if (kwargs.get('set_context') and 'tenant_id' in kwargs):
- # create a specific auth context for this request
- security_group_rule_req.environ['neutron.context'] = (
- context.Context('', kwargs['tenant_id']))
- elif kwargs.get('admin_context'):
- security_group_rule_req.environ['neutron.context'] = (
- context.Context(user_id='admin', tenant_id='admin-tenant',
- is_admin=True))
+ 'security-group-rules', rules, fmt, tenant_id=tenant_id,
+ as_admin=as_admin)
+
return security_group_rule_req.get_response(self.ext_api)
def _make_security_group(self, fmt, name, description, **kwargs):
@@ -166,8 +158,10 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
- def _make_security_group_rule(self, fmt, rules, **kwargs):
- res = self._create_security_group_rule(self.fmt, rules)
+ def _make_security_group_rule(self, fmt, rules, tenant_id=None,
+ as_admin=False, **kwargs):
+ res = self._create_security_group_rule(
+ self.fmt, rules, tenant_id=tenant_id, as_admin=as_admin)
if res.status_int >= webob.exc.HTTPBadRequest.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
@@ -819,9 +813,10 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_group']['id'], "ingress", const.PROTO_NAME_TCP,
port_range_min=22, port_range_max=22,
remote_ip_prefix="10.0.2.0/24",
- ethertype=const.IPv4,
- tenant_id='admin-tenant')
- self._make_security_group_rule(self.fmt, rule, admin_context=True)
+ ethertype=const.IPv4)
+ self._make_security_group_rule(self.fmt, rule,
+ tenant_id='admin-tenant',
+ as_admin=True)
# Now, let's make sure all the rules are there, with their odd
# tenant_id behavior.
@@ -878,23 +873,20 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
res = self.new_list_request('security-groups')
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
self._delete('security-groups', sg['security_groups'][0]['id'],
- webob.exc.HTTPNoContent.code)
+ webob.exc.HTTPNoContent.code, as_admin=True)
def test_delete_default_security_group_nonadmin(self):
with self.network():
res = self.new_list_request('security-groups')
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
- neutron_context = context.Context(
- '', test_db_base_plugin_v2.TEST_TENANT_ID)
self._delete('security-groups', sg['security_groups'][0]['id'],
webob.exc.HTTPConflict.code,
- neutron_context=neutron_context)
+ tenant_id=test_db_base_plugin_v2.TEST_TENANT_ID)
def test_security_group_list_creates_default_security_group(self):
- neutron_context = context.Context(
- '', test_db_base_plugin_v2.TEST_TENANT_ID)
sg = self._list('security-groups',
- neutron_context=neutron_context).get('security_groups')
+ tenant_id=test_db_base_plugin_v2.TEST_TENANT_ID).get(
+ 'security_groups')
self.assertEqual(1, len(sg))
def test_security_group_port_create_creates_default_security_group(self):
@@ -2112,13 +2104,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
with self.security_group() as sg:
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', const.PROTO_NUM_TCP)
- rule['security_group_rule'].update({'id': specified_id,
- 'port_range_min': None,
- 'port_range_max': None,
- 'remote_ip_prefix': None,
- 'remote_group_id': None,
- 'remote_address_group_id':
- None})
+ rule['security_group_rule'].update({
+ 'id': specified_id,
+ 'port_range_min': None,
+ 'port_range_max': None,
+ 'remote_ip_prefix': None,
+ 'remote_group_id': None,
+ 'tenant_id': test_db_base_plugin_v2.TEST_TENANT_ID,
+ 'remote_address_group_id':
+ None})
result = self.plugin.create_security_group_rule(
neutron_context, rule)
self.assertEqual(specified_id, result['id'])