diff options
Diffstat (limited to 'neutron/tests/unit/_test_extension_portbindings.py')
-rw-r--r-- | neutron/tests/unit/_test_extension_portbindings.py | 107 |
1 files changed, 40 insertions, 67 deletions
diff --git a/neutron/tests/unit/_test_extension_portbindings.py b/neutron/tests/unit/_test_extension_portbindings.py index ea18b96e10..763e28b6d8 100644 --- a/neutron/tests/unit/_test_extension_portbindings.py +++ b/neutron/tests/unit/_test_extension_portbindings.py @@ -55,24 +55,16 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): self.assertNotIn(portbindings.VIF_TYPE, port) self.assertNotIn(portbindings.VIF_DETAILS, port) - def _get_non_admin_context(self): - return context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False) - def test_port_vif_details(self): - with self.port(name='name') as port: + with self.port(is_admin=True, name='name') as port: port_id = port['port']['id'] # Check a response of create_port self._check_response_portbindings(port['port']) # Check a response of get_port - ctx = context.get_admin_context() - port = self._show('ports', port_id, neutron_context=ctx)['port'] + port = self._show('ports', port_id, as_admin=True)['port'] self._check_response_portbindings(port) # By default user is admin - now test non admin user - ctx = self._get_non_admin_context() - non_admin_port = self._show( - 'ports', port_id, neutron_context=ctx)['port'] + non_admin_port = self._show('ports', port_id)['port'] self._check_response_no_portbindings(non_admin_port) def test_ports_vif_details(self): @@ -83,9 +75,7 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): self.assertEqual(len(ports), 2) for port in ports: self._check_response_portbindings(port) - # By default user is admin - now test non admin user - ctx = self._get_non_admin_context() - ports = self._list('ports', neutron_context=ctx)['ports'] + ports = self._list('ports')['ports'] self.assertEqual(len(ports), 2) for non_admin_port in ports: self._check_response_no_portbindings(non_admin_port) @@ -97,11 +87,12 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): def _test_create_port_binding_profile(self, profile): profile_arg = {portbindings.PROFILE: profile} - with self.port(arg_list=(portbindings.PROFILE,), + with self.port(is_admin=True, + arg_list=(portbindings.PROFILE,), **profile_arg) as port: port_id = port['port']['id'] self._check_port_binding_profile(port['port'], profile) - port = self._show('ports', port_id) + port = self._show('ports', port_id, as_admin=True) self._check_port_binding_profile(port['port'], profile) def test_create_port_binding_profile_none(self): @@ -112,14 +103,13 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): def _test_update_port_binding_profile(self, profile): profile_arg = {portbindings.PROFILE: profile} - with self.port() as port: + with self.port(is_admin=True) as port: self._check_port_binding_profile(port['port']) port_id = port['port']['id'] - ctx = context.get_admin_context() port = self._update('ports', port_id, {'port': profile_arg}, - neutron_context=ctx)['port'] + as_admin=True)['port'] self._check_port_binding_profile(port, profile) - port = self._show('ports', port_id)['port'] + port = self._show('ports', port_id, as_admin=True)['port'] self._check_port_binding_profile(port, profile) def test_update_port_binding_profile_none(self): @@ -131,18 +121,16 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): def test_port_create_portinfo_non_admin(self): profile_arg = {portbindings.PROFILE: {dummy_plugin.RESOURCE_NAME: dummy_plugin.RESOURCE_NAME}} - with self.network(set_context=True, tenant_id='test') as net1: + with self.network() as net1: with self.subnet(network=net1) as subnet1: # succeed without binding:profile - with self.port(subnet=subnet1, - set_context=True, tenant_id='test'): + with self.port(subnet=subnet1): pass # fail with binding:profile try: with self.port(subnet=subnet1, expected_res_status=403, arg_list=(portbindings.PROFILE,), - set_context=True, tenant_id='test', **profile_arg): pass except exc.HTTPClientError: @@ -156,11 +144,9 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase): with self.port(subnet=subnet1) as port: # By default user is admin - now test non admin user port_id = port['port']['id'] - ctx = self._get_non_admin_context() port = self._update('ports', port_id, {'port': profile_arg}, - expected_code=exc.HTTPForbidden.code, - neutron_context=ctx) + expected_code=exc.HTTPForbidden.code) class PortBindingsHostTestCaseMixin(object): @@ -192,74 +178,70 @@ class PortBindingsHostTestCaseMixin(object): def test_port_vif_host(self): host_arg = {portbindings.HOST_ID: self.hostname} - with self.port(name='name', arg_list=(portbindings.HOST_ID,), + with self.port(name='name', is_admin=True, + arg_list=(portbindings.HOST_ID,), **host_arg) as port: port_id = port['port']['id'] # Check a response of create_port self._check_response_portbindings_host(port['port']) # Check a response of get_port - ctx = context.get_admin_context() - port = self._show('ports', port_id, neutron_context=ctx)['port'] + port = self._show('ports', port_id, as_admin=True)['port'] self._check_response_portbindings_host(port) - # By default user is admin - now test non admin user - ctx = context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False) - non_admin_port = self._show( - 'ports', port_id, neutron_context=ctx)['port'] + non_admin_port = self._show('ports', port_id)['port'] self._check_response_no_portbindings_host(non_admin_port) def test_ports_vif_host(self): host_arg = {portbindings.HOST_ID: self.hostname} with self.port(name='name1', + is_admin=True, arg_list=(portbindings.HOST_ID,), **host_arg), self.port(name='name2'): - ctx = context.get_admin_context() - ports = self._list('ports', neutron_context=ctx)['ports'] + ports = self._list('ports', as_admin=True)['ports'] self.assertEqual(2, len(ports)) for port in ports: if port['name'] == 'name1': self._check_response_portbindings_host(port) else: self.assertFalse(port[portbindings.HOST_ID]) - # By default user is admin - now test non admin user - ctx = context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False) - ports = self._list('ports', neutron_context=ctx)['ports'] + ports = self._list('ports')['ports'] self.assertEqual(2, len(ports)) for non_admin_port in ports: self._check_response_no_portbindings_host(non_admin_port) def test_ports_vif_host_update(self): host_arg = {portbindings.HOST_ID: self.hostname} - with self.port(name='name1', arg_list=(portbindings.HOST_ID,), + with self.port(name='name1', is_admin=True, + arg_list=(portbindings.HOST_ID,), **host_arg) as port1, self.port(name='name2') as port2: data = {'port': {portbindings.HOST_ID: 'testhosttemp'}} - req = self.new_update_request('ports', data, port1['port']['id']) + req = self.new_update_request('ports', data, port1['port']['id'], + as_admin=True) req.get_response(self.api) - req = self.new_update_request('ports', data, port2['port']['id']) - ctx = context.get_admin_context() + req = self.new_update_request('ports', data, port2['port']['id'], + as_admin=True) req.get_response(self.api) - ports = self._list('ports', neutron_context=ctx)['ports'] + ports = self._list('ports', as_admin=True)['ports'] self.assertEqual(2, len(ports)) for port in ports: self.assertEqual('testhosttemp', port[portbindings.HOST_ID]) def test_ports_vif_non_host_update(self): host_arg = {portbindings.HOST_ID: self.hostname} - with self.port(name='name', arg_list=(portbindings.HOST_ID,), + with self.port(name='name', is_admin=True, + arg_list=(portbindings.HOST_ID,), **host_arg) as port: data = {'port': {'admin_state_up': False}} - req = self.new_update_request('ports', data, port['port']['id']) + req = self.new_update_request('ports', data, port['port']['id'], + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual(port['port'][portbindings.HOST_ID], res['port'][portbindings.HOST_ID]) def test_ports_vif_non_host_update_when_host_null(self): - with self.port() as port: + with self.port(is_admin=True) as port: data = {'port': {'admin_state_up': False}} - req = self.new_update_request('ports', data, port['port']['id']) + req = self.new_update_request('ports', data, port['port']['id'], + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual(port['port'][portbindings.HOST_ID], res['port'][portbindings.HOST_ID]) @@ -267,10 +249,12 @@ class PortBindingsHostTestCaseMixin(object): def test_ports_vif_host_list(self): host_arg = {portbindings.HOST_ID: self.hostname} with self.port(name='name1', + is_admin=True, arg_list=(portbindings.HOST_ID,), **host_arg) as port1,\ self.port(name='name2'),\ self.port(name='name3', + is_admin=True, arg_list=(portbindings.HOST_ID,), **host_arg) as port3: self._test_list_resources( @@ -308,23 +292,16 @@ class PortBindingsVnicTestCaseMixin(object): # Check a response of create_port self._check_response_portbindings_vnic_type(port['port']) # Check a response of get_port - ctx = context.get_admin_context() - port = self._show('ports', port_id, neutron_context=ctx)['port'] + port = self._show('ports', port_id, as_admin=True)['port'] self._check_response_portbindings_vnic_type(port) - # By default user is admin - now test non admin user - ctx = context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False) - non_admin_port = self._show( - 'ports', port_id, neutron_context=ctx)['port'] + non_admin_port = self._show('ports', port_id)['port'] self._check_response_portbindings_vnic_type(non_admin_port) def test_ports_vnic_type(self): vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type} with self.port(name='name1', arg_list=(portbindings.VNIC_TYPE,), **vnic_arg), self.port(name='name2'): - ctx = context.get_admin_context() - ports = self._list('ports', neutron_context=ctx)['ports'] + ports = self._list('ports', as_admin=True)['ports'] self.assertEqual(2, len(ports)) for port in ports: if port['name'] == 'name1': @@ -332,11 +309,7 @@ class PortBindingsVnicTestCaseMixin(object): else: self.assertEqual(portbindings.VNIC_NORMAL, port[portbindings.VNIC_TYPE]) - # By default user is admin - now test non admin user - ctx = context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False) - ports = self._list('ports', neutron_context=ctx)['ports'] + ports = self._list('ports')['ports'] self.assertEqual(2, len(ports)) for non_admin_port in ports: self._check_response_portbindings_vnic_type(non_admin_port) |