summaryrefslogtreecommitdiff
path: root/neutron/tests/unit/_test_extension_portbindings.py
diff options
context:
space:
mode:
Diffstat (limited to 'neutron/tests/unit/_test_extension_portbindings.py')
-rw-r--r--neutron/tests/unit/_test_extension_portbindings.py107
1 files changed, 40 insertions, 67 deletions
diff --git a/neutron/tests/unit/_test_extension_portbindings.py b/neutron/tests/unit/_test_extension_portbindings.py
index ea18b96e10..763e28b6d8 100644
--- a/neutron/tests/unit/_test_extension_portbindings.py
+++ b/neutron/tests/unit/_test_extension_portbindings.py
@@ -55,24 +55,16 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self.assertNotIn(portbindings.VIF_TYPE, port)
self.assertNotIn(portbindings.VIF_DETAILS, port)
- def _get_non_admin_context(self):
- return context.Context(user_id=None,
- tenant_id=self._tenant_id,
- is_admin=False)
-
def test_port_vif_details(self):
- with self.port(name='name') as port:
+ with self.port(is_admin=True, name='name') as port:
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbindings(port['port'])
# Check a response of get_port
- ctx = context.get_admin_context()
- port = self._show('ports', port_id, neutron_context=ctx)['port']
+ port = self._show('ports', port_id, as_admin=True)['port']
self._check_response_portbindings(port)
# By default user is admin - now test non admin user
- ctx = self._get_non_admin_context()
- non_admin_port = self._show(
- 'ports', port_id, neutron_context=ctx)['port']
+ non_admin_port = self._show('ports', port_id)['port']
self._check_response_no_portbindings(non_admin_port)
def test_ports_vif_details(self):
@@ -83,9 +75,7 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self.assertEqual(len(ports), 2)
for port in ports:
self._check_response_portbindings(port)
- # By default user is admin - now test non admin user
- ctx = self._get_non_admin_context()
- ports = self._list('ports', neutron_context=ctx)['ports']
+ ports = self._list('ports')['ports']
self.assertEqual(len(ports), 2)
for non_admin_port in ports:
self._check_response_no_portbindings(non_admin_port)
@@ -97,11 +87,12 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def _test_create_port_binding_profile(self, profile):
profile_arg = {portbindings.PROFILE: profile}
- with self.port(arg_list=(portbindings.PROFILE,),
+ with self.port(is_admin=True,
+ arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
port_id = port['port']['id']
self._check_port_binding_profile(port['port'], profile)
- port = self._show('ports', port_id)
+ port = self._show('ports', port_id, as_admin=True)
self._check_port_binding_profile(port['port'], profile)
def test_create_port_binding_profile_none(self):
@@ -112,14 +103,13 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def _test_update_port_binding_profile(self, profile):
profile_arg = {portbindings.PROFILE: profile}
- with self.port() as port:
+ with self.port(is_admin=True) as port:
self._check_port_binding_profile(port['port'])
port_id = port['port']['id']
- ctx = context.get_admin_context()
port = self._update('ports', port_id, {'port': profile_arg},
- neutron_context=ctx)['port']
+ as_admin=True)['port']
self._check_port_binding_profile(port, profile)
- port = self._show('ports', port_id)['port']
+ port = self._show('ports', port_id, as_admin=True)['port']
self._check_port_binding_profile(port, profile)
def test_update_port_binding_profile_none(self):
@@ -131,18 +121,16 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def test_port_create_portinfo_non_admin(self):
profile_arg = {portbindings.PROFILE: {dummy_plugin.RESOURCE_NAME:
dummy_plugin.RESOURCE_NAME}}
- with self.network(set_context=True, tenant_id='test') as net1:
+ with self.network() as net1:
with self.subnet(network=net1) as subnet1:
# succeed without binding:profile
- with self.port(subnet=subnet1,
- set_context=True, tenant_id='test'):
+ with self.port(subnet=subnet1):
pass
# fail with binding:profile
try:
with self.port(subnet=subnet1,
expected_res_status=403,
arg_list=(portbindings.PROFILE,),
- set_context=True, tenant_id='test',
**profile_arg):
pass
except exc.HTTPClientError:
@@ -156,11 +144,9 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
with self.port(subnet=subnet1) as port:
# By default user is admin - now test non admin user
port_id = port['port']['id']
- ctx = self._get_non_admin_context()
port = self._update('ports', port_id,
{'port': profile_arg},
- expected_code=exc.HTTPForbidden.code,
- neutron_context=ctx)
+ expected_code=exc.HTTPForbidden.code)
class PortBindingsHostTestCaseMixin(object):
@@ -192,74 +178,70 @@ class PortBindingsHostTestCaseMixin(object):
def test_port_vif_host(self):
host_arg = {portbindings.HOST_ID: self.hostname}
- with self.port(name='name', arg_list=(portbindings.HOST_ID,),
+ with self.port(name='name', is_admin=True,
+ arg_list=(portbindings.HOST_ID,),
**host_arg) as port:
port_id = port['port']['id']
# Check a response of create_port
self._check_response_portbindings_host(port['port'])
# Check a response of get_port
- ctx = context.get_admin_context()
- port = self._show('ports', port_id, neutron_context=ctx)['port']
+ port = self._show('ports', port_id, as_admin=True)['port']
self._check_response_portbindings_host(port)
- # By default user is admin - now test non admin user
- ctx = context.Context(user_id=None,
- tenant_id=self._tenant_id,
- is_admin=False)
- non_admin_port = self._show(
- 'ports', port_id, neutron_context=ctx)['port']
+ non_admin_port = self._show('ports', port_id)['port']
self._check_response_no_portbindings_host(non_admin_port)
def test_ports_vif_host(self):
host_arg = {portbindings.HOST_ID: self.hostname}
with self.port(name='name1',
+ is_admin=True,
arg_list=(portbindings.HOST_ID,),
**host_arg), self.port(name='name2'):
- ctx = context.get_admin_context()
- ports = self._list('ports', neutron_context=ctx)['ports']
+ ports = self._list('ports', as_admin=True)['ports']
self.assertEqual(2, len(ports))
for port in ports:
if port['name'] == 'name1':
self._check_response_portbindings_host(port)
else:
self.assertFalse(port[portbindings.HOST_ID])
- # By default user is admin - now test non admin user
- ctx = context.Context(user_id=None,
- tenant_id=self._tenant_id,
- is_admin=False)
- ports = self._list('ports', neutron_context=ctx)['ports']
+ ports = self._list('ports')['ports']
self.assertEqual(2, len(ports))
for non_admin_port in ports:
self._check_response_no_portbindings_host(non_admin_port)
def test_ports_vif_host_update(self):
host_arg = {portbindings.HOST_ID: self.hostname}
- with self.port(name='name1', arg_list=(portbindings.HOST_ID,),
+ with self.port(name='name1', is_admin=True,
+ arg_list=(portbindings.HOST_ID,),
**host_arg) as port1, self.port(name='name2') as port2:
data = {'port': {portbindings.HOST_ID: 'testhosttemp'}}
- req = self.new_update_request('ports', data, port1['port']['id'])
+ req = self.new_update_request('ports', data, port1['port']['id'],
+ as_admin=True)
req.get_response(self.api)
- req = self.new_update_request('ports', data, port2['port']['id'])
- ctx = context.get_admin_context()
+ req = self.new_update_request('ports', data, port2['port']['id'],
+ as_admin=True)
req.get_response(self.api)
- ports = self._list('ports', neutron_context=ctx)['ports']
+ ports = self._list('ports', as_admin=True)['ports']
self.assertEqual(2, len(ports))
for port in ports:
self.assertEqual('testhosttemp', port[portbindings.HOST_ID])
def test_ports_vif_non_host_update(self):
host_arg = {portbindings.HOST_ID: self.hostname}
- with self.port(name='name', arg_list=(portbindings.HOST_ID,),
+ with self.port(name='name', is_admin=True,
+ arg_list=(portbindings.HOST_ID,),
**host_arg) as port:
data = {'port': {'admin_state_up': False}}
- req = self.new_update_request('ports', data, port['port']['id'])
+ req = self.new_update_request('ports', data, port['port']['id'],
+ as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(port['port'][portbindings.HOST_ID],
res['port'][portbindings.HOST_ID])
def test_ports_vif_non_host_update_when_host_null(self):
- with self.port() as port:
+ with self.port(is_admin=True) as port:
data = {'port': {'admin_state_up': False}}
- req = self.new_update_request('ports', data, port['port']['id'])
+ req = self.new_update_request('ports', data, port['port']['id'],
+ as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(port['port'][portbindings.HOST_ID],
res['port'][portbindings.HOST_ID])
@@ -267,10 +249,12 @@ class PortBindingsHostTestCaseMixin(object):
def test_ports_vif_host_list(self):
host_arg = {portbindings.HOST_ID: self.hostname}
with self.port(name='name1',
+ is_admin=True,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1,\
self.port(name='name2'),\
self.port(name='name3',
+ is_admin=True,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port3:
self._test_list_resources(
@@ -308,23 +292,16 @@ class PortBindingsVnicTestCaseMixin(object):
# Check a response of create_port
self._check_response_portbindings_vnic_type(port['port'])
# Check a response of get_port
- ctx = context.get_admin_context()
- port = self._show('ports', port_id, neutron_context=ctx)['port']
+ port = self._show('ports', port_id, as_admin=True)['port']
self._check_response_portbindings_vnic_type(port)
- # By default user is admin - now test non admin user
- ctx = context.Context(user_id=None,
- tenant_id=self._tenant_id,
- is_admin=False)
- non_admin_port = self._show(
- 'ports', port_id, neutron_context=ctx)['port']
+ non_admin_port = self._show('ports', port_id)['port']
self._check_response_portbindings_vnic_type(non_admin_port)
def test_ports_vnic_type(self):
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
with self.port(name='name1', arg_list=(portbindings.VNIC_TYPE,),
**vnic_arg), self.port(name='name2'):
- ctx = context.get_admin_context()
- ports = self._list('ports', neutron_context=ctx)['ports']
+ ports = self._list('ports', as_admin=True)['ports']
self.assertEqual(2, len(ports))
for port in ports:
if port['name'] == 'name1':
@@ -332,11 +309,7 @@ class PortBindingsVnicTestCaseMixin(object):
else:
self.assertEqual(portbindings.VNIC_NORMAL,
port[portbindings.VNIC_TYPE])
- # By default user is admin - now test non admin user
- ctx = context.Context(user_id=None,
- tenant_id=self._tenant_id,
- is_admin=False)
- ports = self._list('ports', neutron_context=ctx)['ports']
+ ports = self._list('ports')['ports']
self.assertEqual(2, len(ports))
for non_admin_port in ports:
self._check_response_portbindings_vnic_type(non_admin_port)