diff options
Diffstat (limited to 'neutron/tests/unit/db')
-rw-r--r-- | neutron/tests/unit/db/metering/test_metering_db.py | 42 | ||||
-rw-r--r-- | neutron/tests/unit/db/test_agentschedulers_db.py | 63 | ||||
-rw-r--r-- | neutron/tests/unit/db/test_db_base_plugin_v2.py | 567 | ||||
-rw-r--r-- | neutron/tests/unit/db/test_dvr_mac_db.py | 6 | ||||
-rw-r--r-- | neutron/tests/unit/db/test_ipam_backend_mixin.py | 6 | ||||
-rw-r--r-- | neutron/tests/unit/db/test_ipam_pluggable_backend.py | 3 | ||||
-rw-r--r-- | neutron/tests/unit/db/test_l3_db.py | 3 | ||||
-rw-r--r-- | neutron/tests/unit/db/test_ovn_revision_numbers_db.py | 2 |
8 files changed, 374 insertions, 318 deletions
diff --git a/neutron/tests/unit/db/metering/test_metering_db.py b/neutron/tests/unit/db/metering/test_metering_db.py index 35d7f733c1..1c2af72a4e 100644 --- a/neutron/tests/unit/db/metering/test_metering_db.py +++ b/neutron/tests/unit/db/metering/test_metering_db.py @@ -16,7 +16,6 @@ import contextlib from neutron_lib.api.definitions import metering as metering_apidef from neutron_lib import constants as n_consts -from neutron_lib import context from neutron_lib.db import constants as db_const from neutron_lib.plugins import constants from oslo_utils import uuidutils @@ -42,18 +41,12 @@ _fake_uuid = uuidutils.generate_uuid class MeteringPluginDbTestCaseMixin(object): def _create_metering_label(self, fmt, name, description, **kwargs): data = {'metering_label': {'name': name, - 'tenant_id': kwargs.get('tenant_id', - 'test-tenant'), 'shared': kwargs.get('shared', False), 'description': description}} - req = self.new_create_request('metering-labels', data, - fmt) - - if kwargs.get('set_context') and 'tenant_id' in kwargs: - # create a specific auth context for this request - req.environ['neutron.context'] = ( - context.Context('', kwargs['tenant_id'], - is_admin=kwargs.get('is_admin', True))) + req = self.new_create_request( + 'metering-labels', data, fmt, + tenant_id=kwargs.get('tenant_id', self._tenant_id), + as_admin=kwargs.get('is_admin', True)) return req.get_response(self.ext_api) @@ -71,7 +64,6 @@ class MeteringPluginDbTestCaseMixin(object): data = { 'metering_label_rule': { 'metering_label_id': metering_label_id, - 'tenant_id': kwargs.get('tenant_id', 'test-tenant'), 'direction': direction, 'excluded': excluded, } @@ -87,13 +79,10 @@ class MeteringPluginDbTestCaseMixin(object): data['metering_label_rule']['destination_ip_prefix'] =\ destination_ip_prefix - req = self.new_create_request('metering-label-rules', - data, fmt) - - if kwargs.get('set_context') and 'tenant_id' in kwargs: - # create a specific auth context for this request - req.environ['neutron.context'] = ( - context.Context('', kwargs['tenant_id'])) + req = self.new_create_request( + 'metering-label-rules', data, fmt, + tenant_id=kwargs.get('tenant_id', self._tenant_id), + as_admin=kwargs.get('is_admin', True)) return req.get_response(self.ext_api) @@ -203,7 +192,8 @@ class TestMetering(MeteringPluginDbTestCase): with self.metering_label(name, description) as metering_label: metering_label_id = metering_label['metering_label']['id'] - self._delete('metering-labels', metering_label_id, 204) + self._delete('metering-labels', metering_label_id, 204, + as_admin=True) def test_list_metering_label(self): name = 'my label' @@ -258,7 +248,7 @@ class TestMetering(MeteringPluginDbTestCase): remote_ip_prefix=remote_ip_prefix) as label_rule: rule_id = label_rule['metering_label_rule']['id'] self._update('metering-label-rules', rule_id, data, - webob.exc.HTTPNotImplemented.code) + webob.exc.HTTPNotImplemented.code, as_admin=True) def test_delete_metering_label_rule(self): name = 'my label' @@ -275,7 +265,8 @@ class TestMetering(MeteringPluginDbTestCase): metering_label_id, direction, excluded, remote_ip_prefix=remote_ip_prefix) as label_rule: rule_id = label_rule['metering_label_rule']['id'] - self._delete('metering-label-rules', rule_id, 204) + self._delete('metering-label-rules', rule_id, 204, + as_admin=True) def test_list_metering_label_rule(self): name = 'my label' @@ -297,7 +288,7 @@ class TestMetering(MeteringPluginDbTestCase): metering_label_rule = (v1, v2) self._test_list_resources('metering-label-rule', - metering_label_rule) + metering_label_rule, as_admin=True) def test_create_metering_label_rules(self): name = 'my label' @@ -319,7 +310,7 @@ class TestMetering(MeteringPluginDbTestCase): metering_label_rule = (v1, v2) self._test_list_resources('metering-label-rule', - metering_label_rule) + metering_label_rule, as_admin=True) def test_create_overlap_metering_label_rules(self): name = 'my label' @@ -365,4 +356,5 @@ class TestMetering(MeteringPluginDbTestCase): metering_label_rule = (v1, v2) self._test_list_resources('metering-label-rule', - metering_label_rule) + metering_label_rule, + as_admin=True) diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index d503aee1ec..df4edfaa9b 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -45,6 +45,7 @@ from neutron.db.models import agent as agent_model from neutron.extensions import l3agentscheduler from neutron.objects import agent as ag_obj from neutron.objects import l3agent as rb_obj +from neutron import policy from neutron.tests.common import helpers from neutron.tests.unit.api import test_extensions from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin @@ -78,18 +79,21 @@ class AgentSchedulerTestMixIn(object): def _path_req(self, path, method='GET', data=None, query_string=None, - admin_context=True): + admin_context=True, + req_tenant_id=None): content_type = 'application/%s' % self.fmt body = None if data is not None: # empty dict is valid body = wsgi.Serializer().serialize(data, content_type) + roles = ['member', 'reader'] + req_tenant_id = req_tenant_id or self._tenant_id if admin_context: - return testlib_api.create_request( - path, body, content_type, method, query_string=query_string) - else: - return testlib_api.create_request( - path, body, content_type, method, query_string=query_string, - context=context.Context('', 'tenant_id')) + roles.append('admin') + req = testlib_api.create_request( + path, body, content_type, method, query_string=query_string) + req.environ['neutron.context'] = context.Context( + '', req_tenant_id, roles=roles, is_admin=admin_context) + return req def _path_create_request(self, path, data, admin_context=True): return self._path_req(path, method='POST', data=data, @@ -218,7 +222,7 @@ class AgentSchedulerTestMixIn(object): new_agent = {} new_agent['agent'] = {} new_agent['agent']['admin_state_up'] = admin_state_up - self._update('agents', agent_id, new_agent) + self._update('agents', agent_id, new_agent, as_admin=True) def _get_agent_id(self, agent_type, host): agents = self._list_agents() @@ -269,6 +273,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin, self.dhcp_notify_p = mock.patch( 'neutron.extensions.dhcpagentscheduler.notify') self.patched_dhcp_notify = self.dhcp_notify_p.start() + policy.init() class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase): @@ -911,10 +916,12 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase): self.assertNotEqual(agent['host'], new_agent_host) def test_router_auto_schedule_with_invalid_router(self): - with self.router() as router: + project_id = uuidutils.generate_uuid() + with self.router(project_id=project_id) as router: l3_rpc_cb = l3_rpc.L3RpcCallback() self._register_agent_states() - self._delete('routers', router['router']['id']) + self._delete('routers', router['router']['id'], + tenant_id=project_id) # deleted router ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA, @@ -1106,19 +1113,22 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase): self.assertEqual(0, len(router_ids)) def test_router_without_l3_agents(self): + project_id = uuidutils.generate_uuid() with self.subnet() as s: self._set_net_external(s['subnet']['network_id']) - data = {'router': {'tenant_id': uuidutils.generate_uuid()}} + data = {'router': {'tenant_id': project_id}} data['router']['name'] = 'router1' data['router']['external_gateway_info'] = { 'network_id': s['subnet']['network_id']} - router_req = self.new_create_request('routers', data, self.fmt) + router_req = self.new_create_request( + 'routers', data, self.fmt, tenant_id=project_id) res = router_req.get_response(self.ext_api) router = self.deserialize(self.fmt, res) l3agents = ( self.l3plugin.get_l3_agents_hosting_routers( self.adminContext, [router['router']['id']])) - self._delete('routers', router['router']['id']) + self._delete( + 'routers', router['router']['id'], tenant_id=project_id) self.assertEqual(0, len(l3agents)) def test_dvr_router_scheduling_to_only_dvr_snat_agent(self): @@ -1217,26 +1227,30 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase): self.assertEqual(agent['id'], new_agent['id']) def test_router_sync_data(self): - with self.subnet() as s1,\ - self.subnet(cidr='10.0.2.0/24') as s2,\ - self.subnet(cidr='10.0.3.0/24') as s3: + project_id = uuidutils.generate_uuid() + with self.subnet(project_id=project_id) as s1,\ + self.subnet(project_id=project_id, cidr='10.0.2.0/24') as s2,\ + self.subnet(project_id=project_id, cidr='10.0.3.0/24') as s3: self._register_agent_states() self._set_net_external(s1['subnet']['network_id']) - data = {'router': {'tenant_id': uuidutils.generate_uuid()}} + data = {'router': {'tenant_id': project_id}} data['router']['name'] = 'router1' data['router']['external_gateway_info'] = { 'network_id': s1['subnet']['network_id']} - router_req = self.new_create_request('routers', data, self.fmt) + router_req = self.new_create_request( + 'routers', data, self.fmt, tenant_id=project_id) res = router_req.get_response(self.ext_api) router = self.deserialize(self.fmt, res) self._router_interface_action('add', router['router']['id'], s2['subnet']['id'], - None) + None, + tenant_id=project_id) self._router_interface_action('add', router['router']['id'], s3['subnet']['id'], - None) + None, + tenant_id=project_id) l3agents = self._list_l3_agents_hosting_router( router['router']['id']) self.assertEqual(1, len(l3agents['agents'])) @@ -1267,7 +1281,8 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase): self._router_interface_action('remove', router['router']['id'], s2['subnet']['id'], - None) + None, + tenant_id=project_id) l3agents = self._list_l3_agents_hosting_router( router['router']['id']) self.assertEqual(1, @@ -1275,8 +1290,10 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase): self._router_interface_action('remove', router['router']['id'], s3['subnet']['id'], - None) - self._delete('routers', router['router']['id']) + None, + tenant_id=project_id) + self._delete('routers', router['router']['id'], + tenant_id=project_id) def _test_router_add_to_l3_agent(self, admin_state_up=True): with self.router() as router1: diff --git a/neutron/tests/unit/db/test_db_base_plugin_v2.py b/neutron/tests/unit/db/test_db_base_plugin_v2.py index a25ccc4f0d..b66492c3ea 100644 --- a/neutron/tests/unit/db/test_db_base_plugin_v2.py +++ b/neutron/tests/unit/db/test_db_base_plugin_v2.py @@ -246,60 +246,117 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): query_string=params, context=context, headers=headers) + def _admin_req(self, method, resource, data=None, fmt=None, id=None, + params=None, action=None, subresource=None, sub_id=None, + ctx=None, headers=None, tenant_id=None): + tenant_id = tenant_id or self._tenant_id + req = self._req(method, resource, data, fmt, id, params, action, + subresource, sub_id, ctx, headers) + req.environ['neutron.context'] = context.Context( + '', tenant_id, is_admin=True, + roles=['admin', 'member', 'reader']) + return req + + def _member_req(self, method, resource, data=None, fmt=None, id=None, + params=None, action=None, subresource=None, sub_id=None, + ctx=None, headers=None, tenant_id=None): + tenant_id = tenant_id or self._tenant_id + req = self._req(method, resource, data, fmt, id, params, action, + subresource, sub_id, ctx, headers) + req.environ['neutron.context'] = context.Context( + '', tenant_id, roles=['member', 'reader']) + return req + + def _reader_req(self, method, resource, data=None, fmt=None, id=None, + params=None, action=None, subresource=None, sub_id=None, + ctx=None, headers=None, tenant_id=None): + tenant_id = tenant_id or self._tenant_id + req = self._req(method, resource, data, fmt, id, params, action, + subresource, sub_id, ctx, headers) + req.environ['neutron.context'] = context.Context( + '', tenant_id, roles=['reader']) + return req + def new_create_request(self, resource, data, fmt=None, id=None, - subresource=None, context=None): - return self._req('POST', resource, data, fmt, id=id, - subresource=subresource, context=context) + subresource=None, context=None, tenant_id=None, + as_admin=False): + tenant_id = tenant_id or self._tenant_id + if as_admin: + return self._admin_req( + 'POST', resource, data, fmt, id=id, + subresource=subresource, ctx=context, tenant_id=tenant_id) + return self._member_req('POST', resource, data, fmt, id=id, + subresource=subresource, ctx=context, + tenant_id=tenant_id) def new_list_request(self, resource, fmt=None, params=None, - subresource=None, parent_id=None): - return self._req( + subresource=None, parent_id=None, tenant_id=None, + as_admin=False): + tenant_id = tenant_id or self._tenant_id + if as_admin: + return self._admin_req( + 'GET', resource, None, fmt, params=params, id=parent_id, + subresource=subresource, tenant_id=tenant_id + ) + return self._reader_req( 'GET', resource, None, fmt, params=params, id=parent_id, - subresource=subresource + subresource=subresource, tenant_id=tenant_id ) def new_show_request(self, resource, id, fmt=None, - subresource=None, fields=None, sub_id=None): + subresource=None, fields=None, sub_id=None, + tenant_id=None, as_admin=False): + tenant_id = tenant_id or self._tenant_id if fields: params = "&".join(["fields=%s" % x for x in fields]) else: params = None - return self._req('GET', resource, None, fmt, id=id, - params=params, subresource=subresource, sub_id=sub_id) + if as_admin: + return self._admin_req('GET', resource, None, fmt, id=id, + params=params, subresource=subresource, + sub_id=sub_id, tenant_id=tenant_id) + return self._reader_req('GET', resource, None, fmt, id=id, + params=params, subresource=subresource, + sub_id=sub_id, tenant_id=tenant_id) def new_delete_request(self, resource, id, fmt=None, subresource=None, - sub_id=None, data=None, headers=None): - return self._req( - 'DELETE', - resource, - data, - fmt, - id=id, - subresource=subresource, - sub_id=sub_id, - headers=headers - ) + sub_id=None, data=None, headers=None, + tenant_id=None, as_admin=False): + tenant_id = tenant_id or self._tenant_id + if as_admin: + return self._admin_req('DELETE', resource, data, fmt, id=id, + subresource=subresource, sub_id=sub_id, + headers=headers, tenant_id=tenant_id) + return self._member_req('DELETE', resource, data, fmt, id=id, + subresource=subresource, sub_id=sub_id, + headers=headers, tenant_id=tenant_id) def new_update_request(self, resource, data, id, fmt=None, subresource=None, context=None, sub_id=None, - headers=None): - return self._req( + headers=None, as_admin=False, tenant_id=None): + tenant_id = tenant_id or self._tenant_id + if as_admin: + return self._admin_req( + 'PUT', resource, data, fmt, id=id, subresource=subresource, + sub_id=sub_id, ctx=context, headers=headers, + tenant_id=tenant_id + ) + return self._member_req( 'PUT', resource, data, fmt, id=id, subresource=subresource, - sub_id=sub_id, context=context, headers=headers + sub_id=sub_id, ctx=context, headers=headers, tenant_id=tenant_id ) def new_action_request(self, resource, data, id, action, fmt=None, - subresource=None, sub_id=None): - return self._req( - 'PUT', - resource, - data, - fmt, - id=id, - action=action, - subresource=subresource, - sub_id=sub_id - ) + subresource=None, sub_id=None, tenant_id=None, + as_admin=False): + tenant_id = tenant_id or self._tenant_id + if as_admin: + return self._admin_req('PUT', resource, data, fmt, id=id, + action=action, subresource=subresource, + sub_id=sub_id, tenant_id=tenant_id) + return self._member_req('PUT', resource, data, fmt, id=id, + action=action, subresource=subresource, + sub_id=sub_id, tenant_id=tenant_id) def deserialize(self, content_type, response): ctype = 'application/%s' % content_type @@ -328,23 +385,19 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): return random.choice(ip_list) return ip_list[0] - def _create_bulk_from_list(self, fmt, resource, objects, **kwargs): + def _create_bulk_from_list(self, fmt, resource, objects, tenant_id=None, + as_admin=False, **kwargs): """Creates a bulk request from a list of objects.""" collection = "%ss" % resource req_data = {collection: objects} - req = self.new_create_request(collection, req_data, fmt) - if ('set_context' in kwargs and - kwargs['set_context'] is True and - 'tenant_id' in kwargs): - # create a specific auth context for this request - req.environ['neutron.context'] = context.Context( - '', kwargs['tenant_id']) - elif 'context' in kwargs: - req.environ['neutron.context'] = kwargs['context'] + req = self.new_create_request(collection, req_data, fmt, + tenant_id=tenant_id, as_admin=as_admin) return req.get_response(self.api) - def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs): + def _create_bulk(self, fmt, number, resource, data, name='test', + tenant_id=None, as_admin=False, **kwargs): """Creates a bulk request for any kind of resource.""" + tenant_id = tenant_id or self._tenant_id objects = [] collection = "%ss" % resource for i in range(number): @@ -354,19 +407,13 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): obj[resource].update(kwargs['override'][i]) objects.append(obj) req_data = {collection: objects} - req = self.new_create_request(collection, req_data, fmt) - if ('set_context' in kwargs and - kwargs['set_context'] is True and - 'tenant_id' in kwargs): - # create a specific auth context for this request - req.environ['neutron.context'] = context.Context( - '', kwargs['tenant_id']) - elif 'context' in kwargs: - req.environ['neutron.context'] = kwargs['context'] + req = self.new_create_request(collection, req_data, fmt, + tenant_id=tenant_id, + as_admin=as_admin) return req.get_response(self.api) def _create_network(self, fmt, name, admin_state_up, - arg_list=None, set_context=False, tenant_id=None, + arg_list=None, tenant_id=None, as_admin=False, **kwargs): tenant_id = tenant_id or self._tenant_id data = {'network': {'name': name, @@ -378,11 +425,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): # Arg must be present if arg in kwargs: data['network'][arg] = kwargs[arg] - network_req = self.new_create_request('networks', data, fmt) - if set_context and tenant_id: - # create a specific auth context for this request - network_req.environ['neutron.context'] = context.Context( - '', tenant_id) + network_req = self.new_create_request('networks', data, fmt, + tenant_id=tenant_id, + as_admin=as_admin) return network_req.get_response(self.api) @@ -392,11 +437,12 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): 'tenant_id': self._tenant_id}} return self._create_bulk(fmt, number, 'network', base_data, **kwargs) - def _create_subnet(self, fmt, net_id, cidr, - expected_res_status=None, **kwargs): + def _create_subnet(self, fmt, net_id, cidr, expected_res_status=None, + tenant_id=None, as_admin=False, **kwargs): + tenant_id = tenant_id or self._tenant_id data = {'subnet': {'network_id': net_id, 'ip_version': constants.IP_VERSION_4, - 'tenant_id': self._tenant_id}} + 'tenant_id': tenant_id}} if cidr: data['subnet']['cidr'] = cidr for arg in ('ip_version', 'tenant_id', 'subnetpool_id', 'prefixlen', @@ -412,11 +458,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): kwargs['gateway_ip'] is not constants.ATTR_NOT_SPECIFIED): data['subnet']['gateway_ip'] = kwargs['gateway_ip'] - subnet_req = self.new_create_request('subnets', data, fmt) - if (kwargs.get('set_context') and 'tenant_id' in kwargs): - # create a specific auth context for this request - subnet_req.environ['neutron.context'] = context.Context( - '', kwargs['tenant_id']) + subnet_req = self.new_create_request('subnets', data, fmt, + tenant_id=tenant_id, + as_admin=as_admin) subnet_res = subnet_req.get_response(self.api) if expected_res_status: @@ -443,24 +487,25 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs) def _create_subnetpool(self, fmt, prefixes, - expected_res_status=None, admin=False, **kwargs): + expected_res_status=None, admin=False, + tenant_id=None, **kwargs): + tenant_id = tenant_id or self._tenant_id subnetpool = {'subnetpool': {'prefixes': prefixes}} for k, v in kwargs.items(): subnetpool['subnetpool'][k] = str(v) api = self._api_for_resource('subnetpools') subnetpools_req = self.new_create_request('subnetpools', - subnetpool, fmt) - if not admin: - neutron_context = context.Context('', kwargs['tenant_id']) - subnetpools_req.environ['neutron.context'] = neutron_context + subnetpool, fmt, + tenant_id=tenant_id, + as_admin=admin) subnetpool_res = subnetpools_req.get_response(api) if expected_res_status: self.assertEqual(expected_res_status, subnetpool_res.status_int) return subnetpool_res def _create_port(self, fmt, net_id, expected_res_status=None, - arg_list=None, set_context=False, is_admin=False, + arg_list=None, is_admin=False, tenant_id=None, **kwargs): tenant_id = tenant_id or self._tenant_id data = {'port': {'network_id': net_id, @@ -481,11 +526,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): 'device_id' not in kwargs): device_id = utils.get_dhcp_agent_device_id(net_id, kwargs['host']) data['port']['device_id'] = device_id - port_req = self.new_create_request('ports', data, fmt) - if set_context and tenant_id: - # create a specific auth context for this request - port_req.environ['neutron.context'] = context.Context( - '', tenant_id, is_admin=is_admin) + port_req = self.new_create_request('ports', data, fmt, + tenant_id=tenant_id, + as_admin=is_admin) port_res = port_req.get_response(self.api) if expected_res_status: @@ -499,28 +542,26 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): query_params.append("network_id=%s" % net_id) if kwargs.get('device_owner'): query_params.append("device_owner=%s" % kwargs.get('device_owner')) - port_req = self.new_list_request('ports', fmt, '&'.join(query_params)) - if ('set_context' in kwargs and - kwargs['set_context'] is True and - 'tenant_id' in kwargs): - # create a specific auth context for this request - port_req.environ['neutron.context'] = context.Context( - '', kwargs['tenant_id']) - + port_req = self.new_list_request('ports', fmt, '&'.join(query_params), + tenant_id=kwargs.get('tenant_id')) port_res = port_req.get_response(self.api) if expected_res_status: self.assertEqual(expected_res_status, port_res.status_int) return port_res def _create_port_bulk(self, fmt, number, net_id, name, - admin_state_up, **kwargs): + admin_state_up, tenant_id=None, as_admin=False, + **kwargs): base_data = {'port': {'network_id': net_id, - 'admin_state_up': admin_state_up, - 'tenant_id': self._tenant_id}} - return self._create_bulk(fmt, number, 'port', base_data, **kwargs) - - def _make_network(self, fmt, name, admin_state_up, **kwargs): - res = self._create_network(fmt, name, admin_state_up, **kwargs) + 'admin_state_up': admin_state_up}} + return self._create_bulk(fmt, number, 'port', base_data, + tenant_id=tenant_id, as_admin=as_admin, + **kwargs) + + def _make_network(self, fmt, name, admin_state_up, as_admin=False, + **kwargs): + res = self._create_network(fmt, name, admin_state_up, + as_admin=as_admin, **kwargs) # TODO(salvatore-orlando): do exception handling in this test module # in a uniform way (we do it differently for ports, subnets, and nets # Things can go wrong - raise HTTP exc with res code only @@ -533,7 +574,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): allocation_pools=None, ip_version=constants.IP_VERSION_4, enable_dhcp=True, dns_nameservers=None, host_routes=None, shared=None, ipv6_ra_mode=None, ipv6_address_mode=None, - tenant_id=None, set_context=False, segment_id=None): + tenant_id=None, segment_id=None, as_admin=False): res = self._create_subnet(fmt, net_id=network['network']['id'], cidr=cidr, @@ -550,7 +591,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): shared=shared, ipv6_ra_mode=ipv6_ra_mode, ipv6_address_mode=ipv6_address_mode, - set_context=set_context) + as_admin=as_admin) # Things can go wrong - raise HTTP exc with res code only # so it can be caught by unit tests if res.status_int >= webob.exc.HTTPClientError.code: @@ -572,11 +613,13 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): ipv6_ra_mode=ra_addr_mode, ipv6_address_mode=ra_addr_mode)) - def _make_subnetpool(self, fmt, prefixes, admin=False, **kwargs): + def _make_subnetpool(self, fmt, prefixes, admin=False, tenant_id=None, + **kwargs): res = self._create_subnetpool(fmt, prefixes, None, admin, + tenant_id=tenant_id, **kwargs) # Things can go wrong - raise HTTP exc with res code only # so it can be caught by unit tests @@ -584,8 +627,10 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): raise webob.exc.HTTPClientError(code=res.status_int) return self.deserialize(fmt, res) - def _make_port(self, fmt, net_id, expected_res_status=None, **kwargs): - res = self._create_port(fmt, net_id, expected_res_status, **kwargs) + def _make_port(self, fmt, net_id, expected_res_status=None, + as_admin=False, **kwargs): + res = self._create_port(fmt, net_id, expected_res_status, + is_admin=as_admin, **kwargs) # Things can go wrong - raise HTTP exc with res code only # so it can be caught by unit tests if res.status_int >= webob.exc.HTTPClientError.code: @@ -596,7 +641,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): max_burst_kbps=None, dscp_mark=None, min_kbps=None, direction=constants.EGRESS_DIRECTION, expected_res_status=None, project_id=None, - set_context=False, is_admin=False): + is_admin=False): # Accepted rule types: "bandwidth_limit", "dscp_marking" and # "minimum_bandwidth" self.assertIn(rule_type, [qos_const.RULE_TYPE_BANDWIDTH_LIMIT, @@ -615,11 +660,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): data[type_req][qos_const.MIN_KBPS] = min_kbps data[type_req][qos_const.DIRECTION] = direction route = 'qos/policies/%s/%s' % (qos_policy_id, type_req + 's') - qos_rule_req = self.new_create_request(route, data, fmt) - if set_context and project_id: - # create a specific auth context for this request - qos_rule_req.environ['neutron.context'] = context.Context( - '', project_id, is_admin=is_admin) + qos_rule_req = self.new_create_request(route, data, fmt, + tenant_id=project_id, + as_admin=is_admin) qos_rule_res = qos_rule_req.get_response(self.api) if expected_res_status: @@ -628,16 +671,14 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): def _create_qos_policy(self, fmt, qos_policy_name=None, expected_res_status=None, project_id=None, - set_context=False, is_admin=False): + is_admin=False): project_id = project_id or self._tenant_id name = qos_policy_name or uuidutils.generate_uuid() data = {'policy': {'name': name, 'project_id': project_id}} - qos_req = self.new_create_request('policies', data, fmt) - if set_context and project_id: - # create a specific auth context for this request - qos_req.environ['neutron.context'] = context.Context( - '', project_id, is_admin=is_admin) + qos_req = self.new_create_request('policies', data, fmt, + tenant_id=project_id, + as_admin=is_admin) qos_policy_res = qos_req.get_response(self.api) if expected_res_status: @@ -653,54 +694,49 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): def _delete(self, collection, id, expected_code=webob.exc.HTTPNoContent.code, - neutron_context=None, headers=None, subresource=None, - sub_id=None): + headers=None, subresource=None, sub_id=None, + tenant_id=None, as_admin=False): req = self.new_delete_request(collection, id, headers=headers, - subresource=subresource, sub_id=sub_id) - if neutron_context: - # create a specific auth context for this request - req.environ['neutron.context'] = neutron_context + subresource=subresource, sub_id=sub_id, + tenant_id=tenant_id, as_admin=as_admin) + res = req.get_response(self._api_for_resource(collection)) self.assertEqual(expected_code, res.status_int) - def _show_response(self, resource, id, neutron_context=None): - req = self.new_show_request(resource, id) - if neutron_context: - # create a specific auth context for this request - req.environ['neutron.context'] = neutron_context - elif hasattr(self, 'tenant_id'): - req.environ['neutron.context'] = context.Context('', - self.tenant_id) + def _show_response(self, resource, id, tenant_id=None, as_admin=False): + req = self.new_show_request(resource, id, + tenant_id=tenant_id, + as_admin=as_admin) return req.get_response(self._api_for_resource(resource)) def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code, - neutron_context=None): - res = self._show_response(resource, id, - neutron_context=neutron_context) + tenant_id=None, as_admin=False): + res = self._show_response(resource, id, tenant_id=tenant_id, + as_admin=as_admin) self.assertEqual(expected_code, res.status_int) return self.deserialize(self.fmt, res) def _update(self, resource, id, new_data, - expected_code=webob.exc.HTTPOk.code, - neutron_context=None, headers=None): - req = self.new_update_request(resource, new_data, id, headers=headers) - if neutron_context: - # create a specific auth context for this request - req.environ['neutron.context'] = neutron_context + expected_code=webob.exc.HTTPOk.code, headers=None, + request_tenant_id=None, as_admin=False): + req = self.new_update_request( + resource, new_data, id, headers=headers, + tenant_id=request_tenant_id, as_admin=as_admin) res = req.get_response(self._api_for_resource(resource)) self.assertEqual(expected_code, res.status_int) return self.deserialize(self.fmt, res) - def _list(self, resource, fmt=None, neutron_context=None, + def _list(self, resource, fmt=None, query_params=None, expected_code=webob.exc.HTTPOk.code, - parent_id=None, subresource=None): + parent_id=None, subresource=None, + tenant_id=None, as_admin=False): fmt = fmt or self.fmt req = self.new_list_request(resource, fmt, query_params, subresource=subresource, - parent_id=parent_id) - if neutron_context: - req.environ['neutron.context'] = neutron_context + parent_id=parent_id, + tenant_id=tenant_id, + as_admin=as_admin) res = req.get_response(self._api_for_resource(resource)) self.assertEqual(expected_code, res.status_int) return self.deserialize(fmt, res) @@ -730,13 +766,14 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): self.assertEqual(items[0]['name'], 'test_0') self.assertEqual(items[1]['name'], 'test_1') - def _test_list_resources(self, resource, items, neutron_context=None, - query_params=None, - expected_code=webob.exc.HTTPOk.code): + def _test_list_resources(self, resource, items, query_params=None, + expected_code=webob.exc.HTTPOk.code, + tenant_id=None, as_admin=False): res = self._list('%ss' % resource, - neutron_context=neutron_context, query_params=query_params, - expected_code=expected_code) + expected_code=expected_code, + tenant_id=tenant_id, + as_admin=as_admin) if expected_code == webob.exc.HTTPOk.code: resource = resource.replace('-', '_') self.assertCountEqual([i['id'] for i in res['%ss' % resource]], @@ -771,7 +808,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): tenant_id=None, project_id=None, service_types=None, - set_context=False): + as_admin=False): if project_id: tenant_id = project_id cidr = netaddr.IPNetwork(cidr) if cidr else None @@ -780,7 +817,6 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): gateway_ip = netaddr.IPAddress(gateway_ip) with optional_ctx(network, self.network, - set_context=set_context, tenant_id=tenant_id) as network_to_use: subnet = self._make_subnet(fmt or self.fmt, network_to_use, @@ -797,7 +833,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): ipv6_ra_mode=ipv6_ra_mode, ipv6_address_mode=ipv6_address_mode, tenant_id=tenant_id, - set_context=set_context) + as_admin=as_admin) yield subnet @contextlib.contextmanager @@ -811,22 +847,22 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): yield subnetpool @contextlib.contextmanager - def port(self, subnet=None, fmt=None, set_context=False, project_id=None, + def port(self, subnet=None, fmt=None, project_id=None, is_admin=False, **kwargs): tenant_id = project_id if project_id else kwargs.pop( 'tenant_id', None) with optional_ctx( subnet, self.subnet, - set_context=set_context, tenant_id=tenant_id) as subnet_to_use: + tenant_id=tenant_id) as subnet_to_use: net_id = subnet_to_use['subnet']['network_id'] port = self._make_port( - fmt or self.fmt, net_id, - set_context=set_context, tenant_id=tenant_id, - **kwargs) + fmt or self.fmt, net_id, tenant_id=tenant_id, + as_admin=is_admin, **kwargs) yield port def _test_list_with_sort(self, resource, - items, sorts, resources=None, query_params=''): + items, sorts, resources=None, query_params='', + tenant_id=None, as_admin=False): query_str = query_params for key, direction in sorts: query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key, @@ -834,7 +870,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): if not resources: resources = '%ss' % resource req = self.new_list_request(resources, - params=query_str) + params=query_str, + tenant_id=tenant_id, + as_admin=as_admin) api = self._api_for_resource(resources) res = self.deserialize(self.fmt, req.get_response(api)) resource = resource.replace('-', '_') @@ -846,13 +884,17 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): limit, expected_page_num, resources=None, query_params='', - verify_key='id'): + verify_key='id', + tenant_id=None, + as_admin=False): if not resources: resources = '%ss' % resource query_str = query_params + '&' if query_params else '' query_str = query_str + ("limit=%s&sort_key=%s&" "sort_dir=%s") % (limit, sort[0], sort[1]) - req = self.new_list_request(resources, params=query_str) + req = self.new_list_request(resources, params=query_str, + tenant_id=tenant_id, as_admin=as_admin) + neutron_ctx = req.environ['neutron.context'] items_res = [] page_num = 0 api = self._api_for_resource(resources) @@ -871,6 +913,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): content_type = 'application/%s' % self.fmt req = testlib_api.create_request(link['href'], '', content_type) + req.environ['neutron.context'] = neutron_ctx self.assertEqual(len(res[resources]), limit) self.assertEqual(expected_page_num, page_num) @@ -880,7 +923,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): def _test_list_with_pagination_reverse(self, resource, items, sort, limit, expected_page_num, resources=None, - query_params=''): + query_params='', + tenant_id=None, + as_admin=False): if not resources: resources = '%ss' % resource resource = resource.replace('-', '_') @@ -891,7 +936,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): "sort_key=%s&sort_dir=%s&" "marker=%s") % (limit, sort[0], sort[1], marker) - req = self.new_list_request(resources, params=query_str) + req = self.new_list_request(resources, params=query_str, + tenant_id=tenant_id, as_admin=as_admin) + neutron_ctx = req.environ['neutron.context'] item_res = [items[-1][resource]] page_num = 0 resources = resources.replace('-', '_') @@ -909,6 +956,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): content_type = 'application/%s' % self.fmt req = testlib_api.create_request(link['href'], '', content_type) + req.environ['neutron.context'] = neutron_ctx self.assertEqual(len(res[resources]), limit) self.assertEqual(expected_page_num, page_num) @@ -1001,10 +1049,9 @@ class TestV2HTTPResponse(NeutronDbPluginV2TestCase): self._create_network(self.fmt, 'some_net', True, - tenant_id=tenant_id, - set_context=True) - req = self.new_list_request('networks', params="fields=name") - req.environ['neutron.context'] = context.Context('', tenant_id) + tenant_id=tenant_id) + req = self.new_list_request( + 'networks', params="fields=name", tenant_id=tenant_id) res = req.get_response(self.api) self._check_list_with_fields(res, 'name') @@ -1020,10 +1067,9 @@ class TestV2HTTPResponse(NeutronDbPluginV2TestCase): self._create_network(self.fmt, 'some_net', True, - tenant_id=tenant_id, - set_context=True) - req = self.new_list_request('networks', params="fields=tenant_id") - req.environ['neutron.context'] = context.Context('', tenant_id) + tenant_id=tenant_id) + req = self.new_list_request( + 'networks', params="fields=tenant_id", tenant_id=tenant_id) res = req.get_response(self.api) self._check_list_with_fields(res, 'tenant_id') @@ -1086,7 +1132,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase): def test_create_port_json(self): keys = [('admin_state_up', True), ('status', self.port_create_status)] - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: with self.subnet(network=network) as subnet: with self.port(name='myname', subnet=subnet) as port: for k, v in keys: @@ -1108,7 +1154,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase): device_id='fake_device', device_owner='fake_owner', fixed_ips=[], - set_context=False) + is_admin=True) def test_create_port_bad_tenant(self): with self.network() as network: @@ -1118,17 +1164,15 @@ class TestPortsV2(NeutronDbPluginV2TestCase): tenant_id='bad_tenant_id', device_id='fake_device', device_owner='fake_owner', - fixed_ips=[], - set_context=True) + fixed_ips=[]) def test_create_port_public_network(self): keys = [('admin_state_up', True), ('status', self.port_create_status)] - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: port_res = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id='another_tenant', - set_context=True) + tenant_id='another_tenant') port = self.deserialize(self.fmt, port_res) for k, v in keys: self.assertEqual(port['port'][k], v) @@ -1147,11 +1191,10 @@ class TestPortsV2(NeutronDbPluginV2TestCase): webob.exc.HTTPClientError.code, tenant_id='tenant_id', fixed_ips=[], - set_context=False, **kwargs) def test_create_port_public_network_with_ip(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: ip_net = netaddr.IPNetwork('10.0.0.0/24') with self.subnet(network=network, cidr=str(ip_net)): keys = [('admin_state_up', True), @@ -1159,8 +1202,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase): port_res = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id='another_tenant', - set_context=True) + tenant_id='another_tenant') port = self.deserialize(self.fmt, port_res) for k, v in keys: self.assertEqual(port['port'][k], v) @@ -1170,7 +1212,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase): self._delete('ports', port['port']['id']) def test_create_port_anticipating_allocation(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: with self.subnet(network=network, cidr='10.0.0.0/24') as subnet: fixed_ips = [{'subnet_id': subnet['subnet']['id']}, {'subnet_id': subnet['subnet']['id'], @@ -1181,14 +1223,13 @@ class TestPortsV2(NeutronDbPluginV2TestCase): def test_create_port_public_network_with_invalid_ip_no_subnet_id(self, expected_error='InvalidIpForNetwork'): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: with self.subnet(network=network, cidr='10.0.0.0/24'): ips = [{'ip_address': '1.1.1.1'}] res = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPBadRequest.code, - fixed_ips=ips, - set_context=True) + fixed_ips=ips) data = self.deserialize(self.fmt, res) msg = str(lib_exc.InvalidIpForNetwork(ip_address='1.1.1.1')) self.assertEqual(expected_error, data['NeutronError']['type']) @@ -1196,15 +1237,14 @@ class TestPortsV2(NeutronDbPluginV2TestCase): def test_create_port_public_network_with_invalid_ip_and_subnet_id(self, expected_error='InvalidIpForSubnet'): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: with self.subnet(network=network, cidr='10.0.0.0/24') as subnet: ips = [{'subnet_id': subnet['subnet']['id'], 'ip_address': '1.1.1.1'}] res = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPBadRequest.code, - fixed_ips=ips, - set_context=True) + fixed_ips=ips) data = self.deserialize(self.fmt, res) msg = str(lib_exc.InvalidIpForSubnet(ip_address='1.1.1.1')) self.assertEqual(expected_error, data['NeutronError']['type']) @@ -1342,29 +1382,29 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s self._test_list_ports_filtered_by_fixed_ip(limit=500) def test_list_ports_public_network(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: with self.subnet(network) as subnet: with self.port(subnet, tenant_id='tenant_1') as port1,\ self.port(subnet, tenant_id='tenant_2') as port2: # Admin request - must return both ports - self._test_list_resources('port', [port1, port2]) + self._test_list_resources( + 'port', [port1, port2], as_admin=True) # Tenant_1 request - must return single port - n_context = context.Context('', 'tenant_1') self._test_list_resources('port', [port1], - neutron_context=n_context) + tenant_id='tenant_1') # Tenant_2 request - must return single port - n_context = context.Context('', 'tenant_2') self._test_list_resources('port', [port2], - neutron_context=n_context) + tenant_id='tenant_2') def test_list_ports_for_network_owner(self): with self.network(tenant_id='tenant_1') as network: - with self.subnet(network) as subnet: - with self.port(subnet, tenant_id='tenant_1') as port1,\ - self.port(subnet, tenant_id='tenant_2') as port2: + with self.subnet(network, tenant_id='tenant_1') as subnet: + with self.port(subnet, project_id='tenant_1') as port1,\ + self.port(subnet, project_id='tenant_2', + is_admin=True) as port2: # network owner request, should return all ports port_res = self._list_ports( - 'json', set_context=True, tenant_id='tenant_1') + 'json', tenant_id='tenant_1') port_list = self.deserialize('json', port_res)['ports'] port_ids = [p['id'] for p in port_list] self.assertEqual(2, len(port_list)) @@ -1373,7 +1413,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s # another tenant request, only return ports belong to it port_res = self._list_ports( - 'json', set_context=True, tenant_id='tenant_2') + 'json', tenant_id='tenant_2') port_list = self.deserialize('json', port_res)['ports'] port_ids = [p['id'] for p in port_list] self.assertEqual(1, len(port_list)) @@ -1467,12 +1507,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s expected_code=webob.exc.HTTPNotFound.code) def test_delete_port_public_network(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: port_res = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id='another_tenant', - set_context=True) + tenant_id='another_tenant') port = self.deserialize(self.fmt, port_res) self._delete('ports', port['port']['id']) @@ -1482,15 +1521,15 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s def test_delete_port_by_network_owner(self): with self.network(tenant_id='tenant_1') as network: with self.subnet(network) as subnet: - with self.port(subnet, tenant_id='tenant_2') as port: + with self.port(subnet, tenant_id='tenant_2', + is_admin=True) as port: self._delete( - 'ports', port['port']['id'], - neutron_context=context.Context('', 'tenant_1')) + 'ports', port['port']['id'], tenant_id='tenant_1') self._show('ports', port['port']['id'], expected_code=webob.exc.HTTPNotFound.code) def test_update_port_with_stale_subnet(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: port = self._make_port(self.fmt, network['network']['id']) subnet = self._make_subnet(self.fmt, network, '10.0.0.1', '10.0.0.0/24') @@ -1528,7 +1567,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s data = {'port': {'mac_address': new_mac}} if updated_fixed_ips: data['port']['fixed_ips'] = updated_fixed_ips - req = self.new_update_request('ports', data, port['id']) + req = self.new_update_request( + 'ports', data, port['id'], as_admin=True) return req.get_response(self.api), new_mac def _verify_ips_after_mac_change(self, orig_port, new_port): @@ -1553,6 +1593,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s host_arg = host_arg or {} arg_list = arg_list or [] with self.port(device_owner=device_owner, subnet=subnet, + is_admin=True, arg_list=arg_list, **host_arg) as port: self.assertIn('mac_address', port['port']) res, new_mac = self.update_port_mac( @@ -1634,7 +1675,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s new_mac = port2['port']['mac_address'] data = {'port': {'mac_address': new_mac}} req = self.new_update_request('ports', data, - port['port']['id']) + port['port']['id'], + as_admin=True) res = req.get_response(self.api) self.assertEqual(webob.exc.HTTPConflict.code, res.status_int) @@ -1647,16 +1689,14 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s def test_update_port_not_admin(self): res = self._create_network(self.fmt, 'net1', True, - tenant_id='not_admin', - set_context=True) + tenant_id='not_admin') net1 = self.deserialize(self.fmt, res) res = self._create_port(self.fmt, net1['network']['id'], - tenant_id='not_admin', set_context=True) + tenant_id='not_admin') port = self.deserialize(self.fmt, res) data = {'port': {'admin_state_up': False}} - neutron_context = context.Context('', 'not_admin') port = self._update('ports', port['port']['id'], data, - neutron_context=neutron_context) + request_tenant_id='not_admin') self.assertFalse(port['port']['admin_state_up']) def test_update_device_id_unchanged(self): @@ -2746,7 +2786,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): name = 'public_net' keys = [('subnets', []), ('name', name), ('admin_state_up', True), ('status', self.net_create_status), ('shared', True)] - with self.network(name=name, shared=True) as net: + with self.network(name=name, shared=True, as_admin=True) as net: for k, v in keys: self.assertEqual(net['network'][k], v) @@ -2756,8 +2796,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): webob.exc.HTTPClientError) as ctx_manager: with self.network(name=name, shared=True, - tenant_id="another_tenant", - set_context=True): + tenant_id="another_tenant"): pass self.assertEqual(webob.exc.HTTPForbidden.code, ctx_manager.exception.code) @@ -2773,12 +2812,12 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): res['network']['name']) def test_update_shared_network_noadmin_returns_403(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: data = {'network': {'name': 'a_brand_new_name'}} req = self.new_update_request('networks', data, - network['network']['id']) - req.environ['neutron.context'] = context.Context('', 'somebody') + network['network']['id'], + tenant_id='other-tenant') res = req.get_response(self.api) self.assertEqual(403, res.status_int) @@ -2787,7 +2826,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): data = {'network': {'shared': True}} req = self.new_update_request('networks', data, - network['network']['id']) + network['network']['id'], + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertTrue(res['network']['shared']) @@ -2808,7 +2848,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): data = {'network': {'shared': True}} req = self.new_update_request('networks', data, - network['network']['id']) + network['network']['id'], + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertTrue(res['network']['shared']) # must query db to see whether subnet's shared attribute @@ -2819,39 +2860,38 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): self.assertTrue(subnet_db['shared']) def test_update_network_set_not_shared_single_tenant(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: res1 = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id=network['network']['tenant_id'], - set_context=True) + tenant_id=network['network']['tenant_id']) data = {'network': {'shared': False}} req = self.new_update_request('networks', data, - network['network']['id']) + network['network']['id'], + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertFalse(res['network']['shared']) port1 = self.deserialize(self.fmt, res1) self._delete('ports', port1['port']['id']) - def test_update_network_set_not_shared_other_tenant_returns_409(self): - with self.network(shared=True) as network: + def test_update_network_set_not_shared_other_tenant_returns_403(self): + with self.network(shared=True, as_admin=True) as network: res1 = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id='somebody_else', - set_context=True) + tenant_id='somebody_else') data = {'network': {'shared': False}} req = self.new_update_request('networks', data, network['network']['id']) - self.assertEqual(webob.exc.HTTPConflict.code, + self.assertEqual(webob.exc.HTTPForbidden.code, req.get_response(self.api).status_int) port1 = self.deserialize(self.fmt, res1) self._delete('ports', port1['port']['id']) def test_update_network_set_not_shared_other_tenant_access_via_rbac(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: ctx = context.get_admin_context() with db_api.CONTEXT_WRITER.using(ctx): network_obj.NetworkRBAC( @@ -2867,33 +2907,32 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): res1 = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id='somebody_else', - set_context=True) + tenant_id='somebody_else') data = {'network': {'shared': False}} req = self.new_update_request('networks', data, - network['network']['id']) + network['network']['id'], + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertFalse(res['network']['shared']) port1 = self.deserialize(self.fmt, res1) self._delete('ports', port1['port']['id']) def test_update_network_set_not_shared_multi_tenants_returns_409(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: res1 = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id='somebody_else', - set_context=True) + tenant_id='somebody_else') res2 = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id=network['network']['tenant_id'], - set_context=True) + tenant_id=network['network']['tenant_id']) data = {'network': {'shared': False}} req = self.new_update_request('networks', data, - network['network']['id']) + network['network']['id'], + as_admin=True) self.assertEqual(webob.exc.HTTPConflict.code, req.get_response(self.api).status_int) port1 = self.deserialize(self.fmt, res1) @@ -2902,22 +2941,21 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): self._delete('ports', port2['port']['id']) def test_update_network_set_not_shared_multi_tenants2_returns_409(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: res1 = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, - tenant_id='somebody_else', - set_context=True) + tenant_id='somebody_else') self._create_subnet(self.fmt, network['network']['id'], '10.0.0.0/24', webob.exc.HTTPCreated.code, - tenant_id=network['network']['tenant_id'], - set_context=True) + tenant_id=network['network']['tenant_id']) data = {'network': {'shared': False}} req = self.new_update_request('networks', data, - network['network']['id']) + network['network']['id'], + as_admin=True) self.assertEqual(webob.exc.HTTPConflict.code, req.get_response(self.api).status_int) @@ -2967,7 +3005,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): {'network': {'name': 'n2', 'tenant_id': 't1'}}] - res = self._create_bulk_from_list(self.fmt, 'network', networks) + res = self._create_bulk_from_list(self.fmt, 'network', networks, + as_admin=True) self.assertEqual(webob.exc.HTTPCreated.code, res.status_int) def test_create_networks_bulk_tenants_and_quotas_fail(self): @@ -2987,7 +3026,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): {'network': {'name': 'n2', 'tenant_id': 't1'}}] - res = self._create_bulk_from_list(self.fmt, 'network', networks) + res = self._create_bulk_from_list(self.fmt, 'network', networks, + as_admin=True) self.assertEqual(webob.exc.HTTPConflict.code, res.status_int) def test_create_networks_bulk_emulated(self): @@ -3136,9 +3176,9 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): 'neutron.api.v2.base.Controller._get_pagination_helper', new=_fake_get_pagination_helper) helper_patcher.start() - with self.network(name='net1', shared=True) as net1,\ + with self.network(name='net1', shared=True, as_admin=True) as net1,\ self.network(name='net2', shared=False) as net2,\ - self.network(name='net3', shared=True) as net3: + self.network(name='net3', shared=True, as_admin=True) as net3: self._test_list_with_pagination('network', (net1, net2, net3), ('name', 'asc'), 2, 2, @@ -3215,14 +3255,13 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): tenant_id='tenant1') as net1,\ self.network(shared=True, name='net2', + as_admin=True, tenant_id='another_tenant') as net2,\ self.network(shared=False, name='net3', tenant_id='another_tenant'): - ctx = context.Context(user_id='non_admin', - tenant_id='tenant1', - is_admin=False) - self._test_list_resources('network', (net1, net2), ctx) + self._test_list_resources('network', (net1, net2), + tenant_id='tenant1') def test_show_network(self): with self.network(name='net1') as net: @@ -3760,8 +3799,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): ip_version=constants.IP_VERSION_4, tenant_id='bad_tenant_id', gateway_ip='10.0.2.1', - device_owner='fake_owner', - set_context=True) + device_owner='fake_owner') def test_create_subnet_as_admin(self): with self.network() as network: @@ -3773,7 +3811,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): tenant_id='bad_tenant_id', gateway_ip='10.0.2.1', device_owner='fake_owner', - set_context=False) + as_admin=True) def test_create_subnet_nonzero_cidr(self): # Pass None as gateway_ip to prevent ip auto allocation for gw @@ -4464,7 +4502,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): 'tenant_id': 'tenant_id', 'device_id': 'fake_device', 'device_owner': constants.DEVICE_OWNER_ROUTER_GW} - res = self._create_port(self.fmt, net_id=net_id, **kwargs) + res = self._create_port(self.fmt, net_id=net_id, + is_admin=True, **kwargs) self.assertEqual(webob.exc.HTTPCreated.code, res.status_int) def test_create_subnet_ipv6_first_ip_owned_by_non_router(self): @@ -4480,7 +4519,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): 'tenant_id': 'tenant_id', 'device_id': 'fake_device', 'device_owner': 'fake_owner'} - res = self._create_port(self.fmt, net_id=net_id, **kwargs) + res = self._create_port(self.fmt, net_id=net_id, + is_admin=True, **kwargs) self.assertEqual(webob.exc.HTTPClientError.code, res.status_int) @@ -4804,7 +4844,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): ra_addr_mode=constants.DHCPV6_STATELESS) def test_update_subnet_shared_returns_400(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: with self.subnet(network=network) as subnet: data = {'subnet': {'shared': True}} req = self.new_update_request('subnets', data, @@ -5294,7 +5334,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): with self.subnet(network=network, gateway_ip='10.0.0.1', cidr='10.0.0.0/24', - tenant_id=project_id),\ + tenant_id=project_id, + as_admin=True),\ self.subnet(network=network, gateway_ip='10.0.1.1', cidr='10.0.1.0/24'),\ @@ -5351,7 +5392,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): self._test_list_resources('subnet', subnets) def test_list_subnets_shared(self): - with self.network(shared=True) as network: + with self.network(shared=True, as_admin=True) as network: with self.subnet(network=network, cidr='10.0.0.0/24') as subnet: with self.subnet(cidr='10.0.1.0/24') as priv_subnet: # normal user should see only 1 subnet @@ -6117,8 +6158,7 @@ class TestSubnetPoolsV2(NeutronDbPluginV2TestCase): min_prefixlen='24', shared=True) admin_res = self._list('subnetpools') - mortal_res = self._list('subnetpools', - neutron_context=context.Context('', 'not-the-owner')) + mortal_res = self._list('subnetpools', tenant_id='not-the-owner') self.assertEqual(1, len(admin_res['subnetpools'])) self.assertEqual(1, len(mortal_res['subnetpools'])) @@ -6130,8 +6170,7 @@ class TestSubnetPoolsV2(NeutronDbPluginV2TestCase): min_prefixlen='24', shared=False) admin_res = self._list('subnetpools') - mortal_res = self._list('subnetpools', - neutron_context=context.Context('', 'not-the-owner')) + mortal_res = self._list('subnetpools', tenant_id='not-the-owner') self.assertEqual(1, len(admin_res['subnetpools'])) self.assertEqual(0, len(mortal_res['subnetpools'])) @@ -7197,10 +7236,10 @@ class DbOperationBoundMixin(object): def get_api_kwargs(self): context_ = self._get_context() - return {'set_context': True, 'tenant_id': context_.project_id} + return {'tenant_id': context_.project_id} def _list_and_record_queries(self, resource, query_params=None): - kwargs = {'neutron_context': self._get_context()} + kwargs = {} if query_params: kwargs['query_params'] = query_params # list once before tracking to flush out any quota recalculations. diff --git a/neutron/tests/unit/db/test_dvr_mac_db.py b/neutron/tests/unit/db/test_dvr_mac_db.py index 80d650a7d8..6f87672712 100644 --- a/neutron/tests/unit/db/test_dvr_mac_db.py +++ b/neutron/tests/unit/db/test_dvr_mac_db.py @@ -188,22 +188,28 @@ class DvrDbMixinTestCase(test_plugin.Ml2PluginV2TestCase): arg_list = (portbindings.HOST_ID,) with self.subnet() as subnet,\ self.port(subnet=subnet, + is_admin=True, device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX, arg_list=arg_list, **host_arg) as compute_port,\ self.port(subnet=subnet, device_owner=constants.DEVICE_OWNER_DHCP, + is_admin=True, arg_list=arg_list, **host_arg) as dhcp_port,\ self.port(subnet=subnet, device_owner=constants.DEVICE_OWNER_LOADBALANCER, + is_admin=True, arg_list=arg_list, **host_arg) as lb_port,\ self.port(device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX, + is_admin=True, arg_list=arg_list, **host_arg),\ self.port(subnet=subnet, device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX, + is_admin=True, arg_list=arg_list, **{portbindings.HOST_ID: 'other'}),\ self.port(subnet=subnet, device_owner=constants.DEVICE_OWNER_NETWORK_PREFIX, + is_admin=True, arg_list=arg_list, **host_arg): expected_ids = [port['port']['id'] for port in [compute_port, dhcp_port, lb_port]] diff --git a/neutron/tests/unit/db/test_ipam_backend_mixin.py b/neutron/tests/unit/db/test_ipam_backend_mixin.py index e81a908ec2..fa2872a317 100644 --- a/neutron/tests/unit/db/test_ipam_backend_mixin.py +++ b/neutron/tests/unit/db/test_ipam_backend_mixin.py @@ -373,7 +373,8 @@ class TestPortUpdateIpam(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): net_id=network['network']['id'], tenant_id=network['network']['tenant_id'], arg_list=(portbindings.HOST_ID,), - **{portbindings.HOST_ID: 'fakehost'}) + **{portbindings.HOST_ID: 'fakehost'}, + is_admin=True) port = self.deserialize(self.fmt, response) # Create the subnet and try to update the port to get an IP @@ -381,7 +382,8 @@ class TestPortUpdateIpam(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): data = {'port': { 'fixed_ips': [{'subnet_id': subnet['subnet']['id']}]}} port_id = port['port']['id'] - port_req = self.new_update_request('ports', data, port_id) + port_req = self.new_update_request('ports', data, port_id, + as_admin=True) response = port_req.get_response(self.api) res = self.deserialize(self.fmt, response) diff --git a/neutron/tests/unit/db/test_ipam_pluggable_backend.py b/neutron/tests/unit/db/test_ipam_pluggable_backend.py index b77ea16458..958c0abbc6 100644 --- a/neutron/tests/unit/db/test_ipam_pluggable_backend.py +++ b/neutron/tests/unit/db/test_ipam_pluggable_backend.py @@ -71,7 +71,6 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase): plugin = 'neutron.tests.unit.db.test_ipam_backend_mixin.TestPlugin' super(TestDbBasePluginIpam, self).setUp(plugin=plugin) cfg.CONF.set_override("ipam_driver", 'internal') - self.tenant_id = uuidutils.generate_uuid() self.subnet_id = uuidutils.generate_uuid() self.admin_context = ncontext.get_admin_context() @@ -89,7 +88,7 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase): 'device_owner': constants.DEVICE_OWNER_COMPUTE_PREFIX + 'None' }, 'subnet_request': ipam_req.SpecificSubnetRequest( - self.tenant_id, + self._tenant_id, self.subnet_id, '10.0.0.0/24', '10.0.0.1', diff --git a/neutron/tests/unit/db/test_l3_db.py b/neutron/tests/unit/db/test_l3_db.py index 9b65b60f09..43d4ec4a49 100644 --- a/neutron/tests/unit/db/test_l3_db.py +++ b/neutron/tests/unit/db/test_l3_db.py @@ -928,7 +928,8 @@ class L3TestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): with db_api.CONTEXT_WRITER.using(self.ctx): res = self._create_network( self.fmt, name, True, - arg_list=(extnet_apidef.EXTERNAL,), **kwargs) + arg_list=(extnet_apidef.EXTERNAL,), + as_admin=True, **kwargs) if res.status_int >= webob.exc.HTTPClientError.code: raise webob.exc.HTTPClientError(code=res.status_int) return self.deserialize(self.fmt, res) diff --git a/neutron/tests/unit/db/test_ovn_revision_numbers_db.py b/neutron/tests/unit/db/test_ovn_revision_numbers_db.py index 62dfc9caa1..f375d3602d 100644 --- a/neutron/tests/unit/db/test_ovn_revision_numbers_db.py +++ b/neutron/tests/unit/db/test_ovn_revision_numbers_db.py @@ -237,7 +237,7 @@ class TestRevisionNumberMaintenance(test_securitygroup.SecurityGroupsTestCase, '10.0.0.0/24')['subnet'] self._set_net_external(self.net['id']) info = {'network_id': self.net['id']} - router = self._make_router(self.fmt, None, + router = self._make_router(self.fmt, self._tenant_id, external_gateway_info=info)['router'] fip = self._make_floatingip(self.fmt, self.net['id'])['floatingip'] port = self._make_port(self.fmt, self.net['id'])['port'] |