summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-04-06 13:37:14 +0000
committerGerrit Code Review <review@openstack.org>2015-04-06 13:37:14 +0000
commitaa943f930d2d6e13c652917bac36c51abcaf4696 (patch)
tree191fdc6de40e90df857f72d1933300d0c43f8ec4
parent29effc67fc78186206061b06a0e8f9795e8189b0 (diff)
parent980e54713776584f2b810d136a369ce5a73b3a7f (diff)
downloadneutron-aa943f930d2d6e13c652917bac36c51abcaf4696.tar.gz
Merge "Prepare for unit test reorg"
-rw-r--r--neutron/tests/unit/agent/l3/test_l3_router.py226
-rw-r--r--neutron/tests/unit/agent/l3/test_router_info.py205
-rw-r--r--neutron/tests/unit/agent/linux/test_process_monitor.py95
-rw-r--r--neutron/tests/unit/test_api_v2.py3
-rw-r--r--neutron/tests/unit/test_db_plugin.py60
-rw-r--r--neutron/tests/unit/test_db_plugin_level.py82
-rw-r--r--neutron/tests/unit/test_extension_extended_attribute.py154
-rw-r--r--neutron/tests/unit/test_extensions.py126
-rw-r--r--neutron/tests/unit/test_linux_external_process.py77
9 files changed, 469 insertions, 559 deletions
diff --git a/neutron/tests/unit/agent/l3/test_l3_router.py b/neutron/tests/unit/agent/l3/test_l3_router.py
deleted file mode 100644
index 27d5b8030c..0000000000
--- a/neutron/tests/unit/agent/l3/test_l3_router.py
+++ /dev/null
@@ -1,226 +0,0 @@
-# Copyright (c) 2015 Openstack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-
-from neutron.agent.l3 import router_info
-from neutron.agent.linux import ip_lib
-from neutron.common import constants as l3_constants
-from neutron.common import exceptions as n_exc
-from neutron.openstack.common import uuidutils
-from neutron.tests import base
-
-_uuid = uuidutils.generate_uuid
-
-
-class BasicRouterTestCaseFramework(base.BaseTestCase):
- def _create_router(self, router=None, **kwargs):
- if not router:
- router = mock.MagicMock()
- self.agent_conf = mock.Mock()
- # NOTE The use_namespaces config will soon be deprecated
- self.agent_conf.use_namespaces = True
- self.router_id = _uuid()
- return router_info.RouterInfo(self.router_id,
- router,
- self.agent_conf,
- mock.sentinel.interface_driver,
- **kwargs)
-
-
-class TestBasicRouterOperations(BasicRouterTestCaseFramework):
-
- def test_get_floating_ips(self):
- router = mock.MagicMock()
- router.get.return_value = [mock.sentinel.floating_ip]
- ri = self._create_router(router)
-
- fips = ri.get_floating_ips()
-
- self.assertEqual([mock.sentinel.floating_ip], fips)
-
- def test_process_floating_ip_nat_rules(self):
- ri = self._create_router()
- fips = [{'fixed_ip_address': mock.sentinel.ip,
- 'floating_ip_address': mock.sentinel.fip}]
- ri.get_floating_ips = mock.Mock(return_value=fips)
- ri.iptables_manager = mock.MagicMock()
- ipv4_nat = ri.iptables_manager.ipv4['nat']
- ri.floating_forward_rules = mock.Mock(
- return_value=[(mock.sentinel.chain, mock.sentinel.rule)])
-
- ri.process_floating_ip_nat_rules()
-
- # Be sure that the rules are cleared first and apply is called last
- self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
- ipv4_nat.mock_calls[0])
- self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
-
- # Be sure that add_rule is called somewhere in the middle
- ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain,
- mock.sentinel.rule,
- tag='floating_ip')
-
- def test_process_floating_ip_nat_rules_removed(self):
- ri = self._create_router()
- ri.get_floating_ips = mock.Mock(return_value=[])
- ri.iptables_manager = mock.MagicMock()
- ipv4_nat = ri.iptables_manager.ipv4['nat']
-
- ri.process_floating_ip_nat_rules()
-
- # Be sure that the rules are cleared first and apply is called last
- self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
- ipv4_nat.mock_calls[0])
- self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
-
- # Be sure that add_rule is called somewhere in the middle
- self.assertFalse(ipv4_nat.add_rule.called)
-
- def _test_add_fip_addr_to_device_error(self, device):
- ri = self._create_router()
- ip = '15.1.2.3'
-
- result = ri._add_fip_addr_to_device(
- {'id': mock.sentinel.id, 'floating_ip_address': ip}, device)
-
- device.addr.add.assert_called_with(ip + '/32')
- return result
-
- def test__add_fip_addr_to_device(self):
- result = self._test_add_fip_addr_to_device_error(mock.Mock())
- self.assertTrue(result)
-
- def test__add_fip_addr_to_device_error(self):
- device = mock.Mock()
- device.addr.add.side_effect = RuntimeError
- result = self._test_add_fip_addr_to_device_error(device)
- self.assertFalse(result)
-
- def test_process_snat_dnat_for_fip(self):
- ri = self._create_router()
- ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception)
-
- self.assertRaises(n_exc.FloatingIpSetupException,
- ri.process_snat_dnat_for_fip)
-
- ri.process_floating_ip_nat_rules.assert_called_once_with()
-
- def test_put_fips_in_error_state(self):
- ri = self._create_router()
- ri.router = mock.Mock()
- ri.router.get.return_value = [{'id': mock.sentinel.id1},
- {'id': mock.sentinel.id2}]
-
- statuses = ri.put_fips_in_error_state()
-
- expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR,
- mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}]
- self.assertNotEqual(expected, statuses)
-
- def test_configure_fip_addresses(self):
- ri = self._create_router()
- ri.process_floating_ip_addresses = mock.Mock(
- side_effect=Exception)
-
- self.assertRaises(n_exc.FloatingIpSetupException,
- ri.configure_fip_addresses,
- mock.sentinel.interface_name)
-
- ri.process_floating_ip_addresses.assert_called_once_with(
- mock.sentinel.interface_name)
-
- def test_get_router_cidrs_returns_cidrs(self):
- ri = self._create_router()
- addresses = ['15.1.2.2/24', '15.1.2.3/32']
- device = mock.MagicMock()
- device.addr.list.return_value = [{'cidr': addresses[0]},
- {'cidr': addresses[1]}]
- self.assertEqual(set(addresses), ri.get_router_cidrs(device))
-
-
-@mock.patch.object(ip_lib, 'IPDevice')
-class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework):
-
- def test_process_floating_ip_addresses_remap(self, IPDevice):
- fip_id = _uuid()
- fip = {
- 'id': fip_id, 'port_id': _uuid(),
- 'floating_ip_address': '15.1.2.3',
- 'fixed_ip_address': '192.168.0.2'
- }
-
- IPDevice.return_value = device = mock.Mock()
- device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
- ri = self._create_router()
- ri.get_floating_ips = mock.Mock(return_value=[fip])
-
- fip_statuses = ri.process_floating_ip_addresses(
- mock.sentinel.interface_name)
- self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE},
- fip_statuses)
-
- self.assertFalse(device.addr.add.called)
- self.assertFalse(device.addr.delete.called)
-
- def test_process_router_with_disabled_floating_ip(self, IPDevice):
- fip_id = _uuid()
- fip = {
- 'id': fip_id, 'port_id': _uuid(),
- 'floating_ip_address': '15.1.2.3',
- 'fixed_ip_address': '192.168.0.2'
- }
-
- ri = self._create_router()
- ri.floating_ips = [fip]
- ri.get_floating_ips = mock.Mock(return_value=[])
-
- fip_statuses = ri.process_floating_ip_addresses(
- mock.sentinel.interface_name)
-
- self.assertIsNone(fip_statuses.get(fip_id))
-
- def test_process_router_floating_ip_with_device_add_error(self, IPDevice):
- IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError)
- device.addr.list.return_value = []
- fip_id = _uuid()
- fip = {
- 'id': fip_id, 'port_id': _uuid(),
- 'floating_ip_address': '15.1.2.3',
- 'fixed_ip_address': '192.168.0.2'
- }
- ri = self._create_router()
- ri.add_floating_ip = mock.Mock(
- return_value=l3_constants.FLOATINGIP_STATUS_ERROR)
- ri.get_floating_ips = mock.Mock(return_value=[fip])
-
- fip_statuses = ri.process_floating_ip_addresses(
- mock.sentinel.interface_name)
-
- self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR},
- fip_statuses)
-
- # TODO(mrsmith): refactor for DVR cases
- def test_process_floating_ip_addresses_remove(self, IPDevice):
- IPDevice.return_value = device = mock.Mock()
- device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
-
- ri = self._create_router()
- ri.remove_floating_ip = mock.Mock()
- ri.router.get = mock.Mock(return_value=[])
-
- fip_statuses = ri.process_floating_ip_addresses(
- mock.sentinel.interface_name)
- self.assertEqual({}, fip_statuses)
- ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32')
diff --git a/neutron/tests/unit/agent/l3/test_router_info.py b/neutron/tests/unit/agent/l3/test_router_info.py
index acb764d373..04aa55748c 100644
--- a/neutron/tests/unit/agent/l3/test_router_info.py
+++ b/neutron/tests/unit/agent/l3/test_router_info.py
@@ -14,6 +14,9 @@ import mock
from neutron.agent.common import config as agent_config
from neutron.agent.l3 import router_info
+from neutron.agent.linux import ip_lib
+from neutron.common import constants as l3_constants
+from neutron.common import exceptions as n_exc
from neutron.openstack.common import uuidutils
from neutron.tests import base
@@ -104,3 +107,205 @@ class TestRouterInfo(base.BaseTestCase):
expected = [['ip', 'route', 'delete', 'to', '110.100.30.0/24',
'via', '10.100.10.30']]
self._check_agent_method_called(expected)
+
+
+class BasicRouterTestCaseFramework(base.BaseTestCase):
+ def _create_router(self, router=None, **kwargs):
+ if not router:
+ router = mock.MagicMock()
+ self.agent_conf = mock.Mock()
+ # NOTE The use_namespaces config will soon be deprecated
+ self.agent_conf.use_namespaces = True
+ self.router_id = _uuid()
+ return router_info.RouterInfo(self.router_id,
+ router,
+ self.agent_conf,
+ mock.sentinel.interface_driver,
+ **kwargs)
+
+
+class TestBasicRouterOperations(BasicRouterTestCaseFramework):
+
+ def test_get_floating_ips(self):
+ router = mock.MagicMock()
+ router.get.return_value = [mock.sentinel.floating_ip]
+ ri = self._create_router(router)
+
+ fips = ri.get_floating_ips()
+
+ self.assertEqual([mock.sentinel.floating_ip], fips)
+
+ def test_process_floating_ip_nat_rules(self):
+ ri = self._create_router()
+ fips = [{'fixed_ip_address': mock.sentinel.ip,
+ 'floating_ip_address': mock.sentinel.fip}]
+ ri.get_floating_ips = mock.Mock(return_value=fips)
+ ri.iptables_manager = mock.MagicMock()
+ ipv4_nat = ri.iptables_manager.ipv4['nat']
+ ri.floating_forward_rules = mock.Mock(
+ return_value=[(mock.sentinel.chain, mock.sentinel.rule)])
+
+ ri.process_floating_ip_nat_rules()
+
+ # Be sure that the rules are cleared first and apply is called last
+ self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
+ ipv4_nat.mock_calls[0])
+ self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
+
+ # Be sure that add_rule is called somewhere in the middle
+ ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain,
+ mock.sentinel.rule,
+ tag='floating_ip')
+
+ def test_process_floating_ip_nat_rules_removed(self):
+ ri = self._create_router()
+ ri.get_floating_ips = mock.Mock(return_value=[])
+ ri.iptables_manager = mock.MagicMock()
+ ipv4_nat = ri.iptables_manager.ipv4['nat']
+
+ ri.process_floating_ip_nat_rules()
+
+ # Be sure that the rules are cleared first and apply is called last
+ self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
+ ipv4_nat.mock_calls[0])
+ self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
+
+ # Be sure that add_rule is called somewhere in the middle
+ self.assertFalse(ipv4_nat.add_rule.called)
+
+ def _test_add_fip_addr_to_device_error(self, device):
+ ri = self._create_router()
+ ip = '15.1.2.3'
+
+ result = ri._add_fip_addr_to_device(
+ {'id': mock.sentinel.id, 'floating_ip_address': ip}, device)
+
+ device.addr.add.assert_called_with(ip + '/32')
+ return result
+
+ def test__add_fip_addr_to_device(self):
+ result = self._test_add_fip_addr_to_device_error(mock.Mock())
+ self.assertTrue(result)
+
+ def test__add_fip_addr_to_device_error(self):
+ device = mock.Mock()
+ device.addr.add.side_effect = RuntimeError
+ result = self._test_add_fip_addr_to_device_error(device)
+ self.assertFalse(result)
+
+ def test_process_snat_dnat_for_fip(self):
+ ri = self._create_router()
+ ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception)
+
+ self.assertRaises(n_exc.FloatingIpSetupException,
+ ri.process_snat_dnat_for_fip)
+
+ ri.process_floating_ip_nat_rules.assert_called_once_with()
+
+ def test_put_fips_in_error_state(self):
+ ri = self._create_router()
+ ri.router = mock.Mock()
+ ri.router.get.return_value = [{'id': mock.sentinel.id1},
+ {'id': mock.sentinel.id2}]
+
+ statuses = ri.put_fips_in_error_state()
+
+ expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR,
+ mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}]
+ self.assertNotEqual(expected, statuses)
+
+ def test_configure_fip_addresses(self):
+ ri = self._create_router()
+ ri.process_floating_ip_addresses = mock.Mock(
+ side_effect=Exception)
+
+ self.assertRaises(n_exc.FloatingIpSetupException,
+ ri.configure_fip_addresses,
+ mock.sentinel.interface_name)
+
+ ri.process_floating_ip_addresses.assert_called_once_with(
+ mock.sentinel.interface_name)
+
+ def test_get_router_cidrs_returns_cidrs(self):
+ ri = self._create_router()
+ addresses = ['15.1.2.2/24', '15.1.2.3/32']
+ device = mock.MagicMock()
+ device.addr.list.return_value = [{'cidr': addresses[0]},
+ {'cidr': addresses[1]}]
+ self.assertEqual(set(addresses), ri.get_router_cidrs(device))
+
+
+@mock.patch.object(ip_lib, 'IPDevice')
+class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework):
+
+ def test_process_floating_ip_addresses_remap(self, IPDevice):
+ fip_id = _uuid()
+ fip = {
+ 'id': fip_id, 'port_id': _uuid(),
+ 'floating_ip_address': '15.1.2.3',
+ 'fixed_ip_address': '192.168.0.2'
+ }
+
+ IPDevice.return_value = device = mock.Mock()
+ device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
+ ri = self._create_router()
+ ri.get_floating_ips = mock.Mock(return_value=[fip])
+
+ fip_statuses = ri.process_floating_ip_addresses(
+ mock.sentinel.interface_name)
+ self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE},
+ fip_statuses)
+
+ self.assertFalse(device.addr.add.called)
+ self.assertFalse(device.addr.delete.called)
+
+ def test_process_router_with_disabled_floating_ip(self, IPDevice):
+ fip_id = _uuid()
+ fip = {
+ 'id': fip_id, 'port_id': _uuid(),
+ 'floating_ip_address': '15.1.2.3',
+ 'fixed_ip_address': '192.168.0.2'
+ }
+
+ ri = self._create_router()
+ ri.floating_ips = [fip]
+ ri.get_floating_ips = mock.Mock(return_value=[])
+
+ fip_statuses = ri.process_floating_ip_addresses(
+ mock.sentinel.interface_name)
+
+ self.assertIsNone(fip_statuses.get(fip_id))
+
+ def test_process_router_floating_ip_with_device_add_error(self, IPDevice):
+ IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError)
+ device.addr.list.return_value = []
+ fip_id = _uuid()
+ fip = {
+ 'id': fip_id, 'port_id': _uuid(),
+ 'floating_ip_address': '15.1.2.3',
+ 'fixed_ip_address': '192.168.0.2'
+ }
+ ri = self._create_router()
+ ri.add_floating_ip = mock.Mock(
+ return_value=l3_constants.FLOATINGIP_STATUS_ERROR)
+ ri.get_floating_ips = mock.Mock(return_value=[fip])
+
+ fip_statuses = ri.process_floating_ip_addresses(
+ mock.sentinel.interface_name)
+
+ self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR},
+ fip_statuses)
+
+ # TODO(mrsmith): refactor for DVR cases
+ def test_process_floating_ip_addresses_remove(self, IPDevice):
+ IPDevice.return_value = device = mock.Mock()
+ device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
+
+ ri = self._create_router()
+ ri.remove_floating_ip = mock.Mock()
+ ri.router.get = mock.Mock(return_value=[])
+
+ fip_statuses = ri.process_floating_ip_addresses(
+ mock.sentinel.interface_name)
+ self.assertEqual({}, fip_statuses)
+ ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32')
diff --git a/neutron/tests/unit/agent/linux/test_process_monitor.py b/neutron/tests/unit/agent/linux/test_process_monitor.py
deleted file mode 100644
index fe4093892f..0000000000
--- a/neutron/tests/unit/agent/linux/test_process_monitor.py
+++ /dev/null
@@ -1,95 +0,0 @@
-# Copyright 2014 Red Hat Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import mock
-
-from neutron.agent.linux import external_process
-from neutron.tests import base
-
-TEST_UUID = 'test-uuid'
-TEST_SERVICE = 'testsvc'
-TEST_PID = 1234
-
-
-class BaseTestProcessMonitor(base.BaseTestCase):
-
- def setUp(self):
- super(BaseTestProcessMonitor, self).setUp()
- self.log_patch = mock.patch("neutron.agent.linux.external_process."
- "LOG.error")
- self.error_log = self.log_patch.start()
-
- self.spawn_patch = mock.patch("eventlet.spawn")
- self.eventlent_spawn = self.spawn_patch.start()
-
- # create a default process monitor
- self.create_child_process_monitor('respawn')
-
- def create_child_process_monitor(self, action):
- conf = mock.Mock()
- conf.AGENT.check_child_processes_action = action
- conf.AGENT.check_child_processes = True
- self.pmonitor = external_process.ProcessMonitor(
- config=conf,
- resource_type='test')
-
- def get_monitored_process(self, uuid, service=None):
- monitored_process = mock.Mock()
- self.pmonitor.register(uuid=uuid,
- service_name=service,
- monitored_process=monitored_process)
- return monitored_process
-
-
-class TestProcessMonitor(BaseTestProcessMonitor):
-
- def test_error_logged(self):
- pm = self.get_monitored_process(TEST_UUID)
- pm.active = False
- self.pmonitor._check_child_processes()
- self.assertTrue(self.error_log.called)
-
- def test_exit_handler(self):
- self.create_child_process_monitor('exit')
- pm = self.get_monitored_process(TEST_UUID)
- pm.active = False
- with mock.patch.object(external_process.ProcessMonitor,
- '_exit_handler') as exit_handler:
- self.pmonitor._check_child_processes()
- exit_handler.assert_called_once_with(TEST_UUID, None)
-
- def test_register(self):
- pm = self.get_monitored_process(TEST_UUID)
- self.assertEqual(len(self.pmonitor._monitored_processes), 1)
- self.assertIn(pm, self.pmonitor._monitored_processes.values())
-
- def test_register_same_service_twice(self):
- self.get_monitored_process(TEST_UUID)
- self.get_monitored_process(TEST_UUID)
- self.assertEqual(len(self.pmonitor._monitored_processes), 1)
-
- def test_register_different_service_types(self):
- self.get_monitored_process(TEST_UUID)
- self.get_monitored_process(TEST_UUID, TEST_SERVICE)
- self.assertEqual(len(self.pmonitor._monitored_processes), 2)
-
- def test_unregister(self):
- self.get_monitored_process(TEST_UUID)
- self.pmonitor.unregister(TEST_UUID, None)
- self.assertEqual(len(self.pmonitor._monitored_processes), 0)
-
- def test_unregister_unknown_process(self):
- self.pmonitor.unregister(TEST_UUID, None)
- self.assertEqual(len(self.pmonitor._monitored_processes), 0)
diff --git a/neutron/tests/unit/test_api_v2.py b/neutron/tests/unit/test_api_v2.py
index dd98d9046f..c4b706bd5a 100644
--- a/neutron/tests/unit/test_api_v2.py
+++ b/neutron/tests/unit/test_api_v2.py
@@ -40,8 +40,7 @@ from neutron.tests import fake_notifier
from neutron.tests.unit import testlib_api
-ROOTDIR = os.path.dirname(os.path.dirname(__file__))
-EXTDIR = os.path.join(ROOTDIR, 'unit/extensions')
+EXTDIR = os.path.join(base.ROOTDIR, 'unit/extensions')
_uuid = uuidutils.generate_uuid
diff --git a/neutron/tests/unit/test_db_plugin.py b/neutron/tests/unit/test_db_plugin.py
index 26b62202c5..8a5393b493 100644
--- a/neutron/tests/unit/test_db_plugin.py
+++ b/neutron/tests/unit/test_db_plugin.py
@@ -5452,3 +5452,63 @@ class NeutronDbPluginV2AsMixinTestCase(testlib_api.SqlTestCase):
self.net_data['network']['status'] = 'BUILD'
net = self.plugin.create_network(self.context, self.net_data)
self.assertEqual(net['status'], 'BUILD')
+
+
+class TestNetworks(testlib_api.SqlTestCase):
+ def setUp(self):
+ super(TestNetworks, self).setUp()
+ self._tenant_id = 'test-tenant'
+
+ # Update the plugin
+ self.setup_coreplugin(DB_PLUGIN_KLASS)
+
+ def _create_network(self, plugin, ctx, shared=True):
+ network = {'network': {'name': 'net',
+ 'shared': shared,
+ 'admin_state_up': True,
+ 'tenant_id': self._tenant_id}}
+ created_network = plugin.create_network(ctx, network)
+ return (network, created_network['id'])
+
+ def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id):
+ port = {'port': {'name': 'port',
+ 'network_id': net_id,
+ 'mac_address': attributes.ATTR_NOT_SPECIFIED,
+ 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
+ 'admin_state_up': True,
+ 'device_id': 'device_id',
+ 'device_owner': device_owner,
+ 'tenant_id': tenant_id}}
+ plugin.create_port(ctx, port)
+
+ def _test_update_shared_net_used(self,
+ device_owner,
+ expected_exception=None):
+ plugin = manager.NeutronManager.get_plugin()
+ ctx = context.get_admin_context()
+ network, net_id = self._create_network(plugin, ctx)
+
+ self._create_port(plugin,
+ ctx,
+ net_id,
+ device_owner,
+ self._tenant_id + '1')
+
+ network['network']['shared'] = False
+
+ if (expected_exception):
+ with testlib_api.ExpectedException(expected_exception):
+ plugin.update_network(ctx, net_id, network)
+ else:
+ plugin.update_network(ctx, net_id, network)
+
+ def test_update_shared_net_used_fails(self):
+ self._test_update_shared_net_used('', n_exc.InvalidSharedSetting)
+
+ def test_update_shared_net_used_as_router_gateway(self):
+ self._test_update_shared_net_used(
+ constants.DEVICE_OWNER_ROUTER_GW)
+
+ def test_update_shared_net_used_by_floating_ip(self):
+ self._test_update_shared_net_used(
+ constants.DEVICE_OWNER_FLOATINGIP)
diff --git a/neutron/tests/unit/test_db_plugin_level.py b/neutron/tests/unit/test_db_plugin_level.py
deleted file mode 100644
index a0ebbb5e66..0000000000
--- a/neutron/tests/unit/test_db_plugin_level.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Copyright (c) 2014 Red Hat, Inc.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from neutron.api.v2 import attributes
-from neutron.common import constants
-from neutron.common import exceptions as n_exc
-from neutron import context
-from neutron import manager
-from neutron.tests.unit import test_db_plugin
-from neutron.tests.unit import testlib_api
-
-
-class TestNetworks(testlib_api.SqlTestCase):
- def setUp(self):
- super(TestNetworks, self).setUp()
- self._tenant_id = 'test-tenant'
-
- # Update the plugin
- self.setup_coreplugin(test_db_plugin.DB_PLUGIN_KLASS)
-
- def _create_network(self, plugin, ctx, shared=True):
- network = {'network': {'name': 'net',
- 'shared': shared,
- 'admin_state_up': True,
- 'tenant_id': self._tenant_id}}
- created_network = plugin.create_network(ctx, network)
- return (network, created_network['id'])
-
- def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id):
- port = {'port': {'name': 'port',
- 'network_id': net_id,
- 'mac_address': attributes.ATTR_NOT_SPECIFIED,
- 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
- 'admin_state_up': True,
- 'device_id': 'device_id',
- 'device_owner': device_owner,
- 'tenant_id': tenant_id}}
- plugin.create_port(ctx, port)
-
- def _test_update_shared_net_used(self,
- device_owner,
- expected_exception=None):
- plugin = manager.NeutronManager.get_plugin()
- ctx = context.get_admin_context()
- network, net_id = self._create_network(plugin, ctx)
-
- self._create_port(plugin,
- ctx,
- net_id,
- device_owner,
- self._tenant_id + '1')
-
- network['network']['shared'] = False
-
- if (expected_exception):
- with testlib_api.ExpectedException(expected_exception):
- plugin.update_network(ctx, net_id, network)
- else:
- plugin.update_network(ctx, net_id, network)
-
- def test_update_shared_net_used_fails(self):
- self._test_update_shared_net_used('', n_exc.InvalidSharedSetting)
-
- def test_update_shared_net_used_as_router_gateway(self):
- self._test_update_shared_net_used(
- constants.DEVICE_OWNER_ROUTER_GW)
-
- def test_update_shared_net_used_by_floating_ip(self):
- self._test_update_shared_net_used(
- constants.DEVICE_OWNER_FLOATINGIP)
diff --git a/neutron/tests/unit/test_extension_extended_attribute.py b/neutron/tests/unit/test_extension_extended_attribute.py
deleted file mode 100644
index 7ff48c837f..0000000000
--- a/neutron/tests/unit/test_extension_extended_attribute.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# Copyright 2013 VMware, Inc
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Unit tests for extension extended attribute
-"""
-
-from oslo_config import cfg
-import webob.exc as webexc
-
-import neutron
-from neutron.api import extensions
-from neutron.api.v2 import attributes
-from neutron.common import config
-from neutron import manager
-from neutron.plugins.common import constants
-from neutron.plugins.ml2 import plugin as ml2_plugin
-from neutron import quota
-from neutron.tests import base
-from neutron.tests.unit.extensions import extendedattribute as extattr
-from neutron.tests.unit import test_api_v2
-from neutron.tests.unit import testlib_api
-from neutron import wsgi
-
-_uuid = test_api_v2._uuid
-_get_path = test_api_v2._get_path
-extensions_path = ':'.join(neutron.tests.unit.extensions.__path__)
-
-
-class ExtensionExtendedAttributeTestPlugin(
- ml2_plugin.Ml2Plugin):
-
- supported_extension_aliases = [
- 'ext-obj-test', "extended-ext-attr"
- ]
-
- def __init__(self, configfile=None):
- super(ExtensionExtendedAttributeTestPlugin, self)
- self.objs = []
- self.objh = {}
-
- def create_ext_test_resource(self, context, ext_test_resource):
- obj = ext_test_resource['ext_test_resource']
- id = _uuid()
- obj['id'] = id
- self.objs.append(obj)
- self.objh.update({id: obj})
- return obj
-
- def get_ext_test_resources(self, context, filters=None, fields=None):
- return self.objs
-
- def get_ext_test_resource(self, context, id, fields=None):
- return self.objh[id]
-
-
-class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
- def setUp(self):
- super(ExtensionExtendedAttributeTestCase, self).setUp()
- plugin = (
- "neutron.tests.unit.test_extension_extended_attribute."
- "ExtensionExtendedAttributeTestPlugin"
- )
-
- # point config file to: neutron/tests/etc/neutron.conf.test
- self.config_parse()
-
- self.setup_coreplugin(plugin)
-
- ext_mgr = extensions.PluginAwareExtensionManager(
- extensions_path,
- {constants.CORE: ExtensionExtendedAttributeTestPlugin}
- )
- ext_mgr.extend_resources("2.0", {})
- extensions.PluginAwareExtensionManager._instance = ext_mgr
-
- app = config.load_paste_app('extensions_test_app')
- self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
-
- self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
- # Save the global RESOURCE_ATTRIBUTE_MAP
- self.saved_attr_map = {}
- for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
- self.saved_attr_map[resource] = attrs.copy()
- # Add the resources to the global attribute map
- # This is done here as the setup process won't
- # initialize the main API router which extends
- # the global attribute map
- attributes.RESOURCE_ATTRIBUTE_MAP.update(
- extattr.EXTENDED_ATTRIBUTES_2_0)
- self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin()
- self.addCleanup(self.restore_attribute_map)
-
- quota.QUOTAS._driver = None
- cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
- group='QUOTAS')
-
- def restore_attribute_map(self):
- # Restore the original RESOURCE_ATTRIBUTE_MAP
- attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
-
- def _do_request(self, method, path, data=None, params=None, action=None):
- content_type = 'application/json'
- body = None
- if data is not None: # empty dict is valid
- body = wsgi.Serializer().serialize(data, content_type)
-
- req = testlib_api.create_request(
- path, body, content_type,
- method, query_string=params)
- res = req.get_response(self._api)
- if res.status_code >= 400:
- raise webexc.HTTPClientError(detail=res.body, code=res.status_code)
- if res.status_code != webexc.HTTPNoContent.code:
- return res.json
-
- def _ext_test_resource_create(self, attr=None):
- data = {
- "ext_test_resource": {
- "tenant_id": self._tenant_id,
- "name": "test",
- extattr.EXTENDED_ATTRIBUTE: attr
- }
- }
-
- res = self._do_request('POST', _get_path('ext_test_resources'), data)
- return res['ext_test_resource']
-
- def test_ext_test_resource_create(self):
- ext_test_resource = self._ext_test_resource_create()
- attr = _uuid()
- ext_test_resource = self._ext_test_resource_create(attr)
- self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr)
-
- def test_ext_test_resource_get(self):
- attr = _uuid()
- obj = self._ext_test_resource_create(attr)
- obj_id = obj['id']
- res = self._do_request('GET', _get_path(
- 'ext_test_resources/{0}'.format(obj_id)))
- obj2 = res['ext_test_resource']
- self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr)
diff --git a/neutron/tests/unit/test_extensions.py b/neutron/tests/unit/test_extensions.py
index e140729933..ae533032b0 100644
--- a/neutron/tests/unit/test_extensions.py
+++ b/neutron/tests/unit/test_extensions.py
@@ -16,25 +16,36 @@
import abc
import mock
+from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
import routes
import webob
+import webob.exc as webexc
import webtest
+import neutron
from neutron.api import extensions
+from neutron.api.v2 import attributes
from neutron.common import config
from neutron.common import exceptions
from neutron.db import db_base_plugin_v2
+from neutron import manager
from neutron.plugins.common import constants
+from neutron.plugins.ml2 import plugin as ml2_plugin
+from neutron import quota
from neutron.tests import base
from neutron.tests.unit import extension_stubs as ext_stubs
import neutron.tests.unit.extensions
+from neutron.tests.unit.extensions import extendedattribute as extattr
+from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
from neutron import wsgi
LOG = logging.getLogger(__name__)
+_uuid = test_api_v2._uuid
+_get_path = test_api_v2._get_path
extensions_path = ':'.join(neutron.tests.unit.extensions.__path__)
@@ -677,3 +688,118 @@ class SimpleExtensionManager(object):
if self.request_ext:
request_extensions.append(self.request_ext)
return request_extensions
+
+
+class ExtensionExtendedAttributeTestPlugin(
+ ml2_plugin.Ml2Plugin):
+
+ supported_extension_aliases = [
+ 'ext-obj-test', "extended-ext-attr"
+ ]
+
+ def __init__(self, configfile=None):
+ super(ExtensionExtendedAttributeTestPlugin, self)
+ self.objs = []
+ self.objh = {}
+
+ def create_ext_test_resource(self, context, ext_test_resource):
+ obj = ext_test_resource['ext_test_resource']
+ id = _uuid()
+ obj['id'] = id
+ self.objs.append(obj)
+ self.objh.update({id: obj})
+ return obj
+
+ def get_ext_test_resources(self, context, filters=None, fields=None):
+ return self.objs
+
+ def get_ext_test_resource(self, context, id, fields=None):
+ return self.objh[id]
+
+
+class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
+ def setUp(self):
+ super(ExtensionExtendedAttributeTestCase, self).setUp()
+ plugin = (
+ "neutron.tests.unit.test_extensions."
+ "ExtensionExtendedAttributeTestPlugin"
+ )
+
+ # point config file to: neutron/tests/etc/neutron.conf.test
+ self.config_parse()
+
+ self.setup_coreplugin(plugin)
+
+ ext_mgr = extensions.PluginAwareExtensionManager(
+ extensions_path,
+ {constants.CORE: ExtensionExtendedAttributeTestPlugin}
+ )
+ ext_mgr.extend_resources("2.0", {})
+ extensions.PluginAwareExtensionManager._instance = ext_mgr
+
+ app = config.load_paste_app('extensions_test_app')
+ self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
+
+ self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ extattr.EXTENDED_ATTRIBUTES_2_0)
+ self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin()
+ self.addCleanup(self.restore_attribute_map)
+
+ quota.QUOTAS._driver = None
+ cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
+ group='QUOTAS')
+
+ def restore_attribute_map(self):
+ # Restore the original RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+
+ def _do_request(self, method, path, data=None, params=None, action=None):
+ content_type = 'application/json'
+ body = None
+ if data is not None: # empty dict is valid
+ body = wsgi.Serializer().serialize(data, content_type)
+
+ req = testlib_api.create_request(
+ path, body, content_type,
+ method, query_string=params)
+ res = req.get_response(self._api)
+ if res.status_code >= 400:
+ raise webexc.HTTPClientError(detail=res.body, code=res.status_code)
+ if res.status_code != webexc.HTTPNoContent.code:
+ return res.json
+
+ def _ext_test_resource_create(self, attr=None):
+ data = {
+ "ext_test_resource": {
+ "tenant_id": self._tenant_id,
+ "name": "test",
+ extattr.EXTENDED_ATTRIBUTE: attr
+ }
+ }
+
+ res = self._do_request('POST', _get_path('ext_test_resources'), data)
+ return res['ext_test_resource']
+
+ def test_ext_test_resource_create(self):
+ ext_test_resource = self._ext_test_resource_create()
+ attr = _uuid()
+ ext_test_resource = self._ext_test_resource_create(attr)
+ self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr)
+
+ def test_ext_test_resource_get(self):
+ attr = _uuid()
+ obj = self._ext_test_resource_create(attr)
+ obj_id = obj['id']
+ res = self._do_request('GET', _get_path(
+ 'ext_test_resources/{0}'.format(obj_id)))
+ obj2 = res['ext_test_resource']
+ self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr)
diff --git a/neutron/tests/unit/test_linux_external_process.py b/neutron/tests/unit/test_linux_external_process.py
index 99cd7d8f2f..cb99da8f73 100644
--- a/neutron/tests/unit/test_linux_external_process.py
+++ b/neutron/tests/unit/test_linux_external_process.py
@@ -20,6 +20,83 @@ from neutron.agent.linux import utils
from neutron.tests import base
+TEST_UUID = 'test-uuid'
+TEST_SERVICE = 'testsvc'
+TEST_PID = 1234
+
+
+class BaseTestProcessMonitor(base.BaseTestCase):
+
+ def setUp(self):
+ super(BaseTestProcessMonitor, self).setUp()
+ self.log_patch = mock.patch("neutron.agent.linux.external_process."
+ "LOG.error")
+ self.error_log = self.log_patch.start()
+
+ self.spawn_patch = mock.patch("eventlet.spawn")
+ self.eventlent_spawn = self.spawn_patch.start()
+
+ # create a default process monitor
+ self.create_child_process_monitor('respawn')
+
+ def create_child_process_monitor(self, action):
+ conf = mock.Mock()
+ conf.AGENT.check_child_processes_action = action
+ conf.AGENT.check_child_processes = True
+ self.pmonitor = ep.ProcessMonitor(
+ config=conf,
+ resource_type='test')
+
+ def get_monitored_process(self, uuid, service=None):
+ monitored_process = mock.Mock()
+ self.pmonitor.register(uuid=uuid,
+ service_name=service,
+ monitored_process=monitored_process)
+ return monitored_process
+
+
+class TestProcessMonitor(BaseTestProcessMonitor):
+
+ def test_error_logged(self):
+ pm = self.get_monitored_process(TEST_UUID)
+ pm.active = False
+ self.pmonitor._check_child_processes()
+ self.assertTrue(self.error_log.called)
+
+ def test_exit_handler(self):
+ self.create_child_process_monitor('exit')
+ pm = self.get_monitored_process(TEST_UUID)
+ pm.active = False
+ with mock.patch.object(ep.ProcessMonitor,
+ '_exit_handler') as exit_handler:
+ self.pmonitor._check_child_processes()
+ exit_handler.assert_called_once_with(TEST_UUID, None)
+
+ def test_register(self):
+ pm = self.get_monitored_process(TEST_UUID)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 1)
+ self.assertIn(pm, self.pmonitor._monitored_processes.values())
+
+ def test_register_same_service_twice(self):
+ self.get_monitored_process(TEST_UUID)
+ self.get_monitored_process(TEST_UUID)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 1)
+
+ def test_register_different_service_types(self):
+ self.get_monitored_process(TEST_UUID)
+ self.get_monitored_process(TEST_UUID, TEST_SERVICE)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 2)
+
+ def test_unregister(self):
+ self.get_monitored_process(TEST_UUID)
+ self.pmonitor.unregister(TEST_UUID, None)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 0)
+
+ def test_unregister_unknown_process(self):
+ self.pmonitor.unregister(TEST_UUID, None)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 0)
+
+
class TestProcessManager(base.BaseTestCase):
def setUp(self):
super(TestProcessManager, self).setUp()