diff options
Diffstat (limited to 'neutron/tests/unit/extensions/test_securitygroup.py')
-rw-r--r-- | neutron/tests/unit/extensions/test_securitygroup.py | 88 |
1 files changed, 41 insertions, 47 deletions
diff --git a/neutron/tests/unit/extensions/test_securitygroup.py b/neutron/tests/unit/extensions/test_securitygroup.py index d0ca88b81a..bbf752e4cb 100644 --- a/neutron/tests/unit/extensions/test_securitygroup.py +++ b/neutron/tests/unit/extensions/test_securitygroup.py @@ -92,41 +92,39 @@ class SecurityGroupTestExtensionManager(object): class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): - def _build_security_group(self, name, description, **kwargs): + def _build_security_group(self, name, description): data = { 'security_group': { 'name': name, - 'tenant_id': kwargs.get( - 'tenant_id', test_db_base_plugin_v2.TEST_TENANT_ID), 'description': description}} return data - def _create_security_group_response(self, fmt, data, **kwargs): - security_group_req = self.new_create_request('security-groups', data, - fmt) - if (kwargs.get('set_context') and 'tenant_id' in kwargs): - # create a specific auth context for this request - security_group_req.environ['neutron.context'] = ( - context.Context('', kwargs['tenant_id'])) + def _create_security_group_response(self, fmt, data, tenant_id=None, + as_admin=False, **kwargs): + security_group_req = self.new_create_request( + 'security-groups', data, fmt, tenant_id=tenant_id, + as_admin=as_admin) return security_group_req.get_response(self.ext_api) - def _create_security_group(self, fmt, name, description, **kwargs): - data = self._build_security_group(name, description, **kwargs) - return self._create_security_group_response(fmt, data, **kwargs) + def _create_security_group(self, fmt, name, description, tenant_id=None, + as_admin=False, **kwargs): + data = self._build_security_group(name, description) + return self._create_security_group_response( + fmt, data, tenant_id=tenant_id, as_admin=as_admin, **kwargs) def _build_security_group_rule( self, security_group_id, direction, proto, port_range_min=None, port_range_max=None, remote_ip_prefix=None, remote_group_id=None, remote_address_group_id=None, - tenant_id=test_db_base_plugin_v2.TEST_TENANT_ID, - ethertype=const.IPv4): + tenant_id=None, + ethertype=const.IPv4, + as_admin=False): data = {'security_group_rule': {'security_group_id': security_group_id, 'direction': direction, 'protocol': proto, - 'ethertype': ethertype, - 'tenant_id': tenant_id}} + 'ethertype': ethertype}} if port_range_min: data['security_group_rule']['port_range_min'] = port_range_min @@ -145,19 +143,13 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): return data - def _create_security_group_rule(self, fmt, rules, **kwargs): + def _create_security_group_rule(self, fmt, rules, tenant_id=None, + as_admin=False, **kwargs): security_group_rule_req = self.new_create_request( - 'security-group-rules', rules, fmt) - - if (kwargs.get('set_context') and 'tenant_id' in kwargs): - # create a specific auth context for this request - security_group_rule_req.environ['neutron.context'] = ( - context.Context('', kwargs['tenant_id'])) - elif kwargs.get('admin_context'): - security_group_rule_req.environ['neutron.context'] = ( - context.Context(user_id='admin', tenant_id='admin-tenant', - is_admin=True)) + 'security-group-rules', rules, fmt, tenant_id=tenant_id, + as_admin=as_admin) + return security_group_rule_req.get_response(self.ext_api) def _make_security_group(self, fmt, name, description, **kwargs): @@ -166,8 +158,10 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): raise webob.exc.HTTPClientError(code=res.status_int) return self.deserialize(fmt, res) - def _make_security_group_rule(self, fmt, rules, **kwargs): - res = self._create_security_group_rule(self.fmt, rules) + def _make_security_group_rule(self, fmt, rules, tenant_id=None, + as_admin=False, **kwargs): + res = self._create_security_group_rule( + self.fmt, rules, tenant_id=tenant_id, as_admin=as_admin) if res.status_int >= webob.exc.HTTPBadRequest.code: raise webob.exc.HTTPClientError(code=res.status_int) return self.deserialize(fmt, res) @@ -819,9 +813,10 @@ class TestSecurityGroups(SecurityGroupDBTestCase): sg['security_group']['id'], "ingress", const.PROTO_NAME_TCP, port_range_min=22, port_range_max=22, remote_ip_prefix="10.0.2.0/24", - ethertype=const.IPv4, - tenant_id='admin-tenant') - self._make_security_group_rule(self.fmt, rule, admin_context=True) + ethertype=const.IPv4) + self._make_security_group_rule(self.fmt, rule, + tenant_id='admin-tenant', + as_admin=True) # Now, let's make sure all the rules are there, with their odd # tenant_id behavior. @@ -878,23 +873,20 @@ class TestSecurityGroups(SecurityGroupDBTestCase): res = self.new_list_request('security-groups') sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) self._delete('security-groups', sg['security_groups'][0]['id'], - webob.exc.HTTPNoContent.code) + webob.exc.HTTPNoContent.code, as_admin=True) def test_delete_default_security_group_nonadmin(self): with self.network(): res = self.new_list_request('security-groups') sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) - neutron_context = context.Context( - '', test_db_base_plugin_v2.TEST_TENANT_ID) self._delete('security-groups', sg['security_groups'][0]['id'], webob.exc.HTTPConflict.code, - neutron_context=neutron_context) + tenant_id=test_db_base_plugin_v2.TEST_TENANT_ID) def test_security_group_list_creates_default_security_group(self): - neutron_context = context.Context( - '', test_db_base_plugin_v2.TEST_TENANT_ID) sg = self._list('security-groups', - neutron_context=neutron_context).get('security_groups') + tenant_id=test_db_base_plugin_v2.TEST_TENANT_ID).get( + 'security_groups') self.assertEqual(1, len(sg)) def test_security_group_port_create_creates_default_security_group(self): @@ -2112,13 +2104,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase): with self.security_group() as sg: rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NUM_TCP) - rule['security_group_rule'].update({'id': specified_id, - 'port_range_min': None, - 'port_range_max': None, - 'remote_ip_prefix': None, - 'remote_group_id': None, - 'remote_address_group_id': - None}) + rule['security_group_rule'].update({ + 'id': specified_id, + 'port_range_min': None, + 'port_range_max': None, + 'remote_ip_prefix': None, + 'remote_group_id': None, + 'tenant_id': test_db_base_plugin_v2.TEST_TENANT_ID, + 'remote_address_group_id': + None}) result = self.plugin.create_security_group_rule( neutron_context, rule) self.assertEqual(specified_id, result['id']) |