summaryrefslogtreecommitdiff
path: root/nova/tests/unit/api/openstack/compute/contrib/test_neutron_security_groups.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/tests/unit/api/openstack/compute/contrib/test_neutron_security_groups.py')
-rw-r--r--nova/tests/unit/api/openstack/compute/contrib/test_neutron_security_groups.py918
1 files changed, 918 insertions, 0 deletions
diff --git a/nova/tests/unit/api/openstack/compute/contrib/test_neutron_security_groups.py b/nova/tests/unit/api/openstack/compute/contrib/test_neutron_security_groups.py
new file mode 100644
index 0000000000..704de21005
--- /dev/null
+++ b/nova/tests/unit/api/openstack/compute/contrib/test_neutron_security_groups.py
@@ -0,0 +1,918 @@
+# Copyright 2013 Nicira, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import uuid
+
+from lxml import etree
+import mock
+from neutronclient.common import exceptions as n_exc
+from neutronclient.neutron import v2_0 as neutronv20
+from oslo.config import cfg
+from oslo.serialization import jsonutils
+import webob
+
+from nova.api.openstack.compute.contrib import security_groups
+from nova.api.openstack import xmlutil
+from nova import compute
+from nova import context
+import nova.db
+from nova import exception
+from nova.network import model
+from nova.network import neutronv2
+from nova.network.neutronv2 import api as neutron_api
+from nova.network.security_group import neutron_driver
+from nova.objects import instance as instance_obj
+from nova import test
+from nova.tests.unit.api.openstack.compute.contrib import test_security_groups
+from nova.tests.unit.api.openstack import fakes
+
+
+class TestNeutronSecurityGroupsTestCase(test.TestCase):
+ def setUp(self):
+ super(TestNeutronSecurityGroupsTestCase, self).setUp()
+ cfg.CONF.set_override('security_group_api', 'neutron')
+ self.original_client = neutronv2.get_client
+ neutronv2.get_client = get_client
+
+ def tearDown(self):
+ neutronv2.get_client = self.original_client
+ get_client()._reset()
+ super(TestNeutronSecurityGroupsTestCase, self).tearDown()
+
+
+class TestNeutronSecurityGroupsV21(
+ test_security_groups.TestSecurityGroupsV21,
+ TestNeutronSecurityGroupsTestCase):
+
+ def _create_sg_template(self, **kwargs):
+ sg = test_security_groups.security_group_template(**kwargs)
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ return self.controller.create(req, {'security_group': sg})
+
+ def _create_network(self):
+ body = {'network': {'name': 'net1'}}
+ neutron = get_client()
+ net = neutron.create_network(body)
+ body = {'subnet': {'network_id': net['network']['id'],
+ 'cidr': '10.0.0.0/24'}}
+ neutron.create_subnet(body)
+ return net
+
+ def _create_port(self, **kwargs):
+ body = {'port': {'binding:vnic_type': model.VNIC_TYPE_NORMAL}}
+ fields = ['security_groups', 'device_id', 'network_id',
+ 'port_security_enabled']
+ for field in fields:
+ if field in kwargs:
+ body['port'][field] = kwargs[field]
+ neutron = get_client()
+ return neutron.create_port(body)
+
+ def _create_security_group(self, **kwargs):
+ body = {'security_group': {}}
+ fields = ['name', 'description']
+ for field in fields:
+ if field in kwargs:
+ body['security_group'][field] = kwargs[field]
+ neutron = get_client()
+ return neutron.create_security_group(body)
+
+ def test_create_security_group_with_no_description(self):
+ # Neutron's security group description field is optional.
+ pass
+
+ def test_create_security_group_with_empty_description(self):
+ # Neutron's security group description field is optional.
+ pass
+
+ def test_create_security_group_with_blank_name(self):
+ # Neutron's security group name field is optional.
+ pass
+
+ def test_create_security_group_with_whitespace_name(self):
+ # Neutron allows security group name to be whitespace.
+ pass
+
+ def test_create_security_group_with_blank_description(self):
+ # Neutron's security group description field is optional.
+ pass
+
+ def test_create_security_group_with_whitespace_description(self):
+ # Neutron allows description to be whitespace.
+ pass
+
+ def test_create_security_group_with_duplicate_name(self):
+ # Neutron allows duplicate names for security groups.
+ pass
+
+ def test_create_security_group_non_string_name(self):
+ # Neutron allows security group name to be non string.
+ pass
+
+ def test_create_security_group_non_string_description(self):
+ # Neutron allows non string description.
+ pass
+
+ def test_create_security_group_quota_limit(self):
+ # Enforced by Neutron server.
+ pass
+
+ def test_update_security_group(self):
+ # Enforced by Neutron server.
+ pass
+
+ def test_get_security_group_list(self):
+ self._create_sg_template().get('security_group')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ list_dict = self.controller.index(req)
+ self.assertEqual(len(list_dict['security_groups']), 2)
+
+ def test_get_security_group_list_all_tenants(self):
+ pass
+
+ def test_get_security_group_by_instance(self):
+ sg = self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg['id']],
+ device_id=test_security_groups.FAKE_UUID1)
+ expected = [{'rules': [], 'tenant_id': 'fake', 'id': sg['id'],
+ 'name': 'test', 'description': 'test-description'}]
+ self.stubs.Set(nova.db, 'instance_get_by_uuid',
+ test_security_groups.return_server_by_uuid)
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/%s/os-security-groups'
+ % test_security_groups.FAKE_UUID1)
+ res_dict = self.server_controller.index(
+ req, test_security_groups.FAKE_UUID1)['security_groups']
+ self.assertEqual(expected, res_dict)
+
+ def test_get_security_group_by_id(self):
+ sg = self._create_sg_template().get('security_group')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'
+ % sg['id'])
+ res_dict = self.controller.show(req, sg['id'])
+ expected = {'security_group': sg}
+ self.assertEqual(res_dict, expected)
+
+ def test_delete_security_group_by_id(self):
+ sg = self._create_sg_template().get('security_group')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %
+ sg['id'])
+ self.controller.delete(req, sg['id'])
+
+ def test_delete_security_group_by_admin(self):
+ sg = self._create_sg_template().get('security_group')
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %
+ sg['id'], use_admin_context=True)
+ self.controller.delete(req, sg['id'])
+
+ def test_delete_security_group_in_use(self):
+ sg = self._create_sg_template().get('security_group')
+ self._create_network()
+ db_inst = fakes.stub_instance(id=1, nw_cache=[], security_groups=[])
+ _context = context.get_admin_context()
+ instance = instance_obj.Instance._from_db_object(
+ _context, instance_obj.Instance(), db_inst,
+ expected_attrs=instance_obj.INSTANCE_DEFAULT_FIELDS)
+ neutron = neutron_api.API()
+ with mock.patch.object(nova.db, 'instance_get_by_uuid',
+ return_value=db_inst):
+ neutron.allocate_for_instance(_context, instance,
+ security_groups=[sg['id']])
+
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'
+ % sg['id'])
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
+ req, sg['id'])
+
+ def test_associate_non_running_instance(self):
+ # Neutron does not care if the instance is running or not. When the
+ # instances is detected by nuetron it will push down the security
+ # group policy to it.
+ pass
+
+ def test_associate_already_associated_security_group_to_instance(self):
+ # Neutron security groups does not raise an error if you update a
+ # port adding a security group to it that was already associated
+ # to the port. This is because PUT semantics are used.
+ pass
+
+ def test_associate(self):
+ sg = self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg['id']],
+ device_id=test_security_groups.FAKE_UUID1)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(addSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.manager._addSecurityGroup(req, '1', body)
+
+ def test_associate_duplicate_names(self):
+ sg1 = self._create_security_group(name='sg1',
+ description='sg1')['security_group']
+ self._create_security_group(name='sg1',
+ description='sg1')['security_group']
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg1['id']],
+ device_id=test_security_groups.FAKE_UUID1)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(addSecurityGroup=dict(name="sg1"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.assertRaises(webob.exc.HTTPConflict,
+ self.manager._addSecurityGroup, req, '1', body)
+
+ def test_associate_port_security_enabled_true(self):
+ sg = self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg['id']],
+ port_security_enabled=True,
+ device_id=test_security_groups.FAKE_UUID1)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(addSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.manager._addSecurityGroup(req, '1', body)
+
+ def test_associate_port_security_enabled_false(self):
+ self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], port_security_enabled=False,
+ device_id=test_security_groups.FAKE_UUID1)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(addSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._addSecurityGroup,
+ req, '1', body)
+
+ def test_disassociate_by_non_existing_security_group_name(self):
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(removeSecurityGroup=dict(name='non-existing'))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.manager._removeSecurityGroup, req, '1', body)
+
+ def test_disassociate_non_running_instance(self):
+ # Neutron does not care if the instance is running or not. When the
+ # instances is detected by neutron it will push down the security
+ # group policy to it.
+ pass
+
+ def test_disassociate_already_associated_security_group_to_instance(self):
+ # Neutron security groups does not raise an error if you update a
+ # port adding a security group to it that was already associated
+ # to the port. This is because PUT semantics are used.
+ pass
+
+ def test_disassociate(self):
+ sg = self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg['id']],
+ device_id=test_security_groups.FAKE_UUID1)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(removeSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.manager._removeSecurityGroup(req, '1', body)
+
+ def test_get_raises_no_unique_match_error(self):
+
+ def fake_find_resourceid_by_name_or_id(client, param, name,
+ project_id=None):
+ raise n_exc.NeutronClientNoUniqueMatch()
+
+ self.stubs.Set(neutronv20, 'find_resourceid_by_name_or_id',
+ fake_find_resourceid_by_name_or_id)
+ security_group_api = self.controller.security_group_api
+ self.assertRaises(exception.NoUniqueMatch, security_group_api.get,
+ context.get_admin_context(), 'foobar')
+
+ def test_get_instances_security_groups_bindings(self):
+ servers = [{'id': test_security_groups.FAKE_UUID1},
+ {'id': test_security_groups.FAKE_UUID2}]
+ sg1 = self._create_sg_template(name='test1').get('security_group')
+ sg2 = self._create_sg_template(name='test2').get('security_group')
+ # test name='' is replaced with id
+ sg3 = self._create_sg_template(name='').get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg1['id'],
+ sg2['id']],
+ device_id=test_security_groups.FAKE_UUID1)
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg2['id'],
+ sg3['id']],
+ device_id=test_security_groups.FAKE_UUID2)
+ expected = {test_security_groups.FAKE_UUID1: [{'name': sg1['name']},
+ {'name': sg2['name']}],
+ test_security_groups.FAKE_UUID2: [{'name': sg2['name']},
+ {'name': sg3['id']}]}
+ security_group_api = self.controller.security_group_api
+ bindings = (
+ security_group_api.get_instances_security_groups_bindings(
+ context.get_admin_context(), servers))
+ self.assertEqual(bindings, expected)
+
+ def test_get_instance_security_groups(self):
+ sg1 = self._create_sg_template(name='test1').get('security_group')
+ sg2 = self._create_sg_template(name='test2').get('security_group')
+ # test name='' is replaced with id
+ sg3 = self._create_sg_template(name='').get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg1['id'],
+ sg2['id'],
+ sg3['id']],
+ device_id=test_security_groups.FAKE_UUID1)
+
+ expected = [{'name': sg1['name']}, {'name': sg2['name']},
+ {'name': sg3['id']}]
+ security_group_api = self.controller.security_group_api
+ sgs = security_group_api.get_instance_security_groups(
+ context.get_admin_context(), test_security_groups.FAKE_UUID1)
+ self.assertEqual(sgs, expected)
+
+ @mock.patch('nova.network.security_group.neutron_driver.SecurityGroupAPI.'
+ 'get_instances_security_groups_bindings')
+ def test_get_security_group_empty_for_instance(self, neutron_sg_bind_mock):
+ servers = [{'id': test_security_groups.FAKE_UUID1}]
+ neutron_sg_bind_mock.return_value = {}
+
+ security_group_api = self.controller.security_group_api
+ ctx = context.get_admin_context()
+ sgs = security_group_api.get_instance_security_groups(ctx,
+ test_security_groups.FAKE_UUID1)
+
+ neutron_sg_bind_mock.assert_called_once_with(ctx, servers, False)
+ self.assertEqual([], sgs)
+
+ def test_create_port_with_sg_and_port_security_enabled_true(self):
+ sg1 = self._create_sg_template(name='test1').get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg1['id']],
+ port_security_enabled=True,
+ device_id=test_security_groups.FAKE_UUID1)
+ security_group_api = self.controller.security_group_api
+ sgs = security_group_api.get_instance_security_groups(
+ context.get_admin_context(), test_security_groups.FAKE_UUID1)
+ self.assertEqual(sgs, [{'name': 'test1'}])
+
+ def test_create_port_with_sg_and_port_security_enabled_false(self):
+ sg1 = self._create_sg_template(name='test1').get('security_group')
+ net = self._create_network()
+ self.assertRaises(exception.SecurityGroupCannotBeApplied,
+ self._create_port,
+ network_id=net['network']['id'],
+ security_groups=[sg1['id']],
+ port_security_enabled=False,
+ device_id=test_security_groups.FAKE_UUID1)
+
+
+class TestNeutronSecurityGroupsV2(TestNeutronSecurityGroupsV21):
+ controller_cls = security_groups.SecurityGroupController
+ server_secgrp_ctl_cls = security_groups.ServerSecurityGroupController
+ secgrp_act_ctl_cls = security_groups.SecurityGroupActionController
+
+
+class TestNeutronSecurityGroupRulesTestCase(TestNeutronSecurityGroupsTestCase):
+ def setUp(self):
+ super(TestNeutronSecurityGroupRulesTestCase, self).setUp()
+ id1 = '11111111-1111-1111-1111-111111111111'
+ sg_template1 = test_security_groups.security_group_template(
+ security_group_rules=[], id=id1)
+ id2 = '22222222-2222-2222-2222-222222222222'
+ sg_template2 = test_security_groups.security_group_template(
+ security_group_rules=[], id=id2)
+ self.controller_sg = security_groups.SecurityGroupController()
+ neutron = get_client()
+ neutron._fake_security_groups[id1] = sg_template1
+ neutron._fake_security_groups[id2] = sg_template2
+
+ def tearDown(self):
+ neutronv2.get_client = self.original_client
+ get_client()._reset()
+ super(TestNeutronSecurityGroupsTestCase, self).tearDown()
+
+
+class _TestNeutronSecurityGroupRulesBase(object):
+
+ def test_create_add_existing_rules_by_cidr(self):
+ sg = test_security_groups.security_group_template()
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ self.controller_sg.create(req, {'security_group': sg})
+ rule = test_security_groups.security_group_rule_template(
+ cidr='15.0.0.0/8', parent_group_id=self.sg2['id'])
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
+ self.controller.create(req, {'security_group_rule': rule})
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
+
+ def test_create_add_existing_rules_by_group_id(self):
+ sg = test_security_groups.security_group_template()
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ self.controller_sg.create(req, {'security_group': sg})
+ rule = test_security_groups.security_group_rule_template(
+ group=self.sg1['id'], parent_group_id=self.sg2['id'])
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
+ self.controller.create(req, {'security_group_rule': rule})
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
+
+ def test_delete(self):
+ rule = test_security_groups.security_group_rule_template(
+ parent_group_id=self.sg2['id'])
+
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
+ res_dict = self.controller.create(req, {'security_group_rule': rule})
+ security_group_rule = res_dict['security_group_rule']
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/%s'
+ % security_group_rule['id'])
+ self.controller.delete(req, security_group_rule['id'])
+
+ def test_create_rule_quota_limit(self):
+ # Enforced by neutron
+ pass
+
+
+class TestNeutronSecurityGroupRulesV2(
+ _TestNeutronSecurityGroupRulesBase,
+ test_security_groups.TestSecurityGroupRulesV2,
+ TestNeutronSecurityGroupRulesTestCase):
+ pass
+
+
+class TestNeutronSecurityGroupRulesV21(
+ _TestNeutronSecurityGroupRulesBase,
+ test_security_groups.TestSecurityGroupRulesV21,
+ TestNeutronSecurityGroupRulesTestCase):
+ pass
+
+
+class TestNeutronSecurityGroupsXMLDeserializer(
+ test_security_groups.TestSecurityGroupXMLDeserializer,
+ TestNeutronSecurityGroupsTestCase):
+ pass
+
+
+class TestNeutronSecurityGroupsXMLSerializer(
+ test_security_groups.TestSecurityGroupXMLSerializer,
+ TestNeutronSecurityGroupsTestCase):
+ pass
+
+
+class TestNeutronSecurityGroupsOutputTest(TestNeutronSecurityGroupsTestCase):
+ content_type = 'application/json'
+
+ def setUp(self):
+ super(TestNeutronSecurityGroupsOutputTest, self).setUp()
+ fakes.stub_out_nw_api(self.stubs)
+ self.controller = security_groups.SecurityGroupController()
+ self.stubs.Set(compute.api.API, 'get',
+ test_security_groups.fake_compute_get)
+ self.stubs.Set(compute.api.API, 'get_all',
+ test_security_groups.fake_compute_get_all)
+ self.stubs.Set(compute.api.API, 'create',
+ test_security_groups.fake_compute_create)
+ self.stubs.Set(neutron_driver.SecurityGroupAPI,
+ 'get_instances_security_groups_bindings',
+ (test_security_groups.
+ fake_get_instances_security_groups_bindings))
+ self.flags(
+ osapi_compute_extension=[
+ 'nova.api.openstack.compute.contrib.select_extensions'],
+ osapi_compute_ext_list=['Security_groups'])
+
+ def _make_request(self, url, body=None):
+ req = webob.Request.blank(url)
+ if body:
+ req.method = 'POST'
+ req.body = self._encode_body(body)
+ req.content_type = self.content_type
+ req.headers['Accept'] = self.content_type
+ res = req.get_response(fakes.wsgi_app(init_only=('servers',)))
+ return res
+
+ def _encode_body(self, body):
+ return jsonutils.dumps(body)
+
+ def _get_server(self, body):
+ return jsonutils.loads(body).get('server')
+
+ def _get_servers(self, body):
+ return jsonutils.loads(body).get('servers')
+
+ def _get_groups(self, server):
+ return server.get('security_groups')
+
+ def test_create(self):
+ url = '/v2/fake/servers'
+ image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]
+ for security_group in security_groups:
+ sg = test_security_groups.security_group_template(
+ name=security_group['name'])
+ self.controller.create(req, {'security_group': sg})
+
+ server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,
+ security_groups=security_groups)
+ res = self._make_request(url, {'server': server})
+ self.assertEqual(res.status_int, 202)
+ server = self._get_server(res.body)
+ for i, group in enumerate(self._get_groups(server)):
+ name = 'fake-2-%s' % i
+ self.assertEqual(group.get('name'), name)
+
+ def test_create_server_get_default_security_group(self):
+ url = '/v2/fake/servers'
+ image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
+ server = dict(name='server_test', imageRef=image_uuid, flavorRef=2)
+ res = self._make_request(url, {'server': server})
+ self.assertEqual(res.status_int, 202)
+ server = self._get_server(res.body)
+ group = self._get_groups(server)[0]
+ self.assertEqual(group.get('name'), 'default')
+
+ def test_show(self):
+ def fake_get_instance_security_groups(inst, context, id):
+ return [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]
+
+ self.stubs.Set(neutron_driver.SecurityGroupAPI,
+ 'get_instance_security_groups',
+ fake_get_instance_security_groups)
+
+ url = '/v2/fake/servers'
+ image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]
+ for security_group in security_groups:
+ sg = test_security_groups.security_group_template(
+ name=security_group['name'])
+ self.controller.create(req, {'security_group': sg})
+ server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,
+ security_groups=security_groups)
+
+ res = self._make_request(url, {'server': server})
+ self.assertEqual(res.status_int, 202)
+ server = self._get_server(res.body)
+ for i, group in enumerate(self._get_groups(server)):
+ name = 'fake-2-%s' % i
+ self.assertEqual(group.get('name'), name)
+
+ # Test that show (GET) returns the same information as create (POST)
+ url = '/v2/fake/servers/' + test_security_groups.UUID3
+ res = self._make_request(url)
+ self.assertEqual(res.status_int, 200)
+ server = self._get_server(res.body)
+
+ for i, group in enumerate(self._get_groups(server)):
+ name = 'fake-2-%s' % i
+ self.assertEqual(group.get('name'), name)
+
+ def test_detail(self):
+ url = '/v2/fake/servers/detail'
+ res = self._make_request(url)
+
+ self.assertEqual(res.status_int, 200)
+ for i, server in enumerate(self._get_servers(res.body)):
+ for j, group in enumerate(self._get_groups(server)):
+ name = 'fake-%s-%s' % (i, j)
+ self.assertEqual(group.get('name'), name)
+
+ def test_no_instance_passthrough_404(self):
+
+ def fake_compute_get(*args, **kwargs):
+ raise exception.InstanceNotFound(instance_id='fake')
+
+ self.stubs.Set(compute.api.API, 'get', fake_compute_get)
+ url = '/v2/fake/servers/70f6db34-de8d-4fbd-aafb-4065bdfa6115'
+ res = self._make_request(url)
+
+ self.assertEqual(res.status_int, 404)
+
+
+class TestNeutronSecurityGroupsOutputXMLTest(
+ TestNeutronSecurityGroupsOutputTest):
+
+ content_type = 'application/xml'
+
+ class MinimalCreateServerTemplate(xmlutil.TemplateBuilder):
+ def construct(self):
+ root = xmlutil.TemplateElement('server', selector='server')
+ root.set('name')
+ root.set('id')
+ root.set('imageRef')
+ root.set('flavorRef')
+ elem = xmlutil.SubTemplateElement(root, 'security_groups')
+ sg = xmlutil.SubTemplateElement(elem, 'security_group',
+ selector='security_groups')
+ sg.set('name')
+ return xmlutil.MasterTemplate(root, 1,
+ nsmap={None: xmlutil.XMLNS_V11})
+
+ def _encode_body(self, body):
+ serializer = self.MinimalCreateServerTemplate()
+ return serializer.serialize(body)
+
+ def _get_server(self, body):
+ return etree.XML(body)
+
+ def _get_servers(self, body):
+ return etree.XML(body).getchildren()
+
+ def _get_groups(self, server):
+ # NOTE(vish): we are adding security groups without an extension
+ # namespace so we don't break people using the existing
+ # functionality, but that means we need to use find with
+ # the existing server namespace.
+ namespace = server.nsmap[None]
+ return server.find('{%s}security_groups' % namespace).getchildren()
+
+
+def get_client(context=None, admin=False):
+ return MockClient()
+
+
+class MockClient(object):
+
+ # Needs to be global to survive multiple calls to get_client.
+ _fake_security_groups = {}
+ _fake_ports = {}
+ _fake_networks = {}
+ _fake_subnets = {}
+ _fake_security_group_rules = {}
+
+ def __init__(self):
+ # add default security group
+ if not len(self._fake_security_groups):
+ ret = {'name': 'default', 'description': 'default',
+ 'tenant_id': 'fake_tenant', 'security_group_rules': [],
+ 'id': str(uuid.uuid4())}
+ self._fake_security_groups[ret['id']] = ret
+
+ def _reset(self):
+ self._fake_security_groups.clear()
+ self._fake_ports.clear()
+ self._fake_networks.clear()
+ self._fake_subnets.clear()
+ self._fake_security_group_rules.clear()
+
+ def create_security_group(self, body=None):
+ s = body.get('security_group')
+ if len(s.get('name')) > 255 or len(s.get('description')) > 255:
+ msg = 'Security Group name great than 255'
+ raise n_exc.NeutronClientException(message=msg, status_code=401)
+ ret = {'name': s.get('name'), 'description': s.get('description'),
+ 'tenant_id': 'fake', 'security_group_rules': [],
+ 'id': str(uuid.uuid4())}
+
+ self._fake_security_groups[ret['id']] = ret
+ return {'security_group': ret}
+
+ def create_network(self, body):
+ n = body.get('network')
+ ret = {'status': 'ACTIVE', 'subnets': [], 'name': n.get('name'),
+ 'admin_state_up': n.get('admin_state_up', True),
+ 'tenant_id': 'fake_tenant',
+ 'id': str(uuid.uuid4())}
+ if 'port_security_enabled' in n:
+ ret['port_security_enabled'] = n['port_security_enabled']
+ self._fake_networks[ret['id']] = ret
+ return {'network': ret}
+
+ def create_subnet(self, body):
+ s = body.get('subnet')
+ try:
+ net = self._fake_networks[s.get('network_id')]
+ except KeyError:
+ msg = 'Network %s not found' % s.get('network_id')
+ raise n_exc.NeutronClientException(message=msg, status_code=404)
+ ret = {'name': s.get('name'), 'network_id': s.get('network_id'),
+ 'tenant_id': 'fake_tenant', 'cidr': s.get('cidr'),
+ 'id': str(uuid.uuid4()), 'gateway_ip': '10.0.0.1'}
+ net['subnets'].append(ret['id'])
+ self._fake_networks[net['id']] = net
+ self._fake_subnets[ret['id']] = ret
+ return {'subnet': ret}
+
+ def create_port(self, body):
+ p = body.get('port')
+ ret = {'status': 'ACTIVE', 'id': str(uuid.uuid4()),
+ 'mac_address': p.get('mac_address', 'fa:16:3e:b8:f5:fb'),
+ 'device_id': p.get('device_id', str(uuid.uuid4())),
+ 'admin_state_up': p.get('admin_state_up', True),
+ 'security_groups': p.get('security_groups', []),
+ 'network_id': p.get('network_id'),
+ 'binding:vnic_type':
+ p.get('binding:vnic_type') or model.VNIC_TYPE_NORMAL}
+
+ network = self._fake_networks[p['network_id']]
+ if 'port_security_enabled' in p:
+ ret['port_security_enabled'] = p['port_security_enabled']
+ elif 'port_security_enabled' in network:
+ ret['port_security_enabled'] = network['port_security_enabled']
+
+ port_security = ret.get('port_security_enabled', True)
+ # port_security must be True if security groups are present
+ if not port_security and ret['security_groups']:
+ raise exception.SecurityGroupCannotBeApplied()
+
+ if network['subnets']:
+ ret['fixed_ips'] = [{'subnet_id': network['subnets'][0],
+ 'ip_address': '10.0.0.1'}]
+ if not ret['security_groups'] and (port_security is None or
+ port_security is True):
+ for security_group in self._fake_security_groups.values():
+ if security_group['name'] == 'default':
+ ret['security_groups'] = [security_group['id']]
+ break
+ self._fake_ports[ret['id']] = ret
+ return {'port': ret}
+
+ def create_security_group_rule(self, body):
+ # does not handle bulk case so just picks rule[0]
+ r = body.get('security_group_rules')[0]
+ fields = ['direction', 'protocol', 'port_range_min', 'port_range_max',
+ 'ethertype', 'remote_ip_prefix', 'tenant_id',
+ 'security_group_id', 'remote_group_id']
+ ret = {}
+ for field in fields:
+ ret[field] = r.get(field)
+ ret['id'] = str(uuid.uuid4())
+ self._fake_security_group_rules[ret['id']] = ret
+ return {'security_group_rules': [ret]}
+
+ def show_security_group(self, security_group, **_params):
+ try:
+ sg = self._fake_security_groups[security_group]
+ except KeyError:
+ msg = 'Security Group %s not found' % security_group
+ raise n_exc.NeutronClientException(message=msg, status_code=404)
+ for security_group_rule in self._fake_security_group_rules.values():
+ if security_group_rule['security_group_id'] == sg['id']:
+ sg['security_group_rules'].append(security_group_rule)
+
+ return {'security_group': sg}
+
+ def show_security_group_rule(self, security_group_rule, **_params):
+ try:
+ return {'security_group_rule':
+ self._fake_security_group_rules[security_group_rule]}
+ except KeyError:
+ msg = 'Security Group rule %s not found' % security_group_rule
+ raise n_exc.NeutronClientException(message=msg, status_code=404)
+
+ def show_network(self, network, **_params):
+ try:
+ return {'network':
+ self._fake_networks[network]}
+ except KeyError:
+ msg = 'Network %s not found' % network
+ raise n_exc.NeutronClientException(message=msg, status_code=404)
+
+ def show_port(self, port, **_params):
+ try:
+ return {'port':
+ self._fake_ports[port]}
+ except KeyError:
+ msg = 'Port %s not found' % port
+ raise n_exc.NeutronClientException(message=msg, status_code=404)
+
+ def show_subnet(self, subnet, **_params):
+ try:
+ return {'subnet':
+ self._fake_subnets[subnet]}
+ except KeyError:
+ msg = 'Port %s not found' % subnet
+ raise n_exc.NeutronClientException(message=msg, status_code=404)
+
+ def list_security_groups(self, **_params):
+ ret = []
+ for security_group in self._fake_security_groups.values():
+ names = _params.get('name')
+ if names:
+ if not isinstance(names, list):
+ names = [names]
+ for name in names:
+ if security_group.get('name') == name:
+ ret.append(security_group)
+ ids = _params.get('id')
+ if ids:
+ if not isinstance(ids, list):
+ ids = [ids]
+ for id in ids:
+ if security_group.get('id') == id:
+ ret.append(security_group)
+ elif not (names or ids):
+ ret.append(security_group)
+ return {'security_groups': ret}
+
+ def list_networks(self, **_params):
+ # neutronv2/api.py _get_available_networks calls this assuming
+ # search_opts filter "shared" is implemented and not ignored
+ shared = _params.get("shared", None)
+ if shared:
+ return {'networks': []}
+ else:
+ return {'networks':
+ [network for network in self._fake_networks.values()]}
+
+ def list_ports(self, **_params):
+ ret = []
+ device_id = _params.get('device_id')
+ for port in self._fake_ports.values():
+ if device_id:
+ if port['device_id'] in device_id:
+ ret.append(port)
+ else:
+ ret.append(port)
+ return {'ports': ret}
+
+ def list_subnets(self, **_params):
+ return {'subnets':
+ [subnet for subnet in self._fake_subnets.values()]}
+
+ def list_floatingips(self, **_params):
+ return {'floatingips': []}
+
+ def delete_security_group(self, security_group):
+ self.show_security_group(security_group)
+ ports = self.list_ports()
+ for port in ports.get('ports'):
+ for sg_port in port['security_groups']:
+ if sg_port == security_group:
+ msg = ('Unable to delete Security group %s in use'
+ % security_group)
+ raise n_exc.NeutronClientException(message=msg,
+ status_code=409)
+ del self._fake_security_groups[security_group]
+
+ def delete_security_group_rule(self, security_group_rule):
+ self.show_security_group_rule(security_group_rule)
+ del self._fake_security_group_rules[security_group_rule]
+
+ def delete_network(self, network):
+ self.show_network(network)
+ self._check_ports_on_network(network)
+ for subnet in self._fake_subnets.values():
+ if subnet['network_id'] == network:
+ del self._fake_subnets[subnet['id']]
+ del self._fake_networks[network]
+
+ def delete_subnet(self, subnet):
+ subnet = self.show_subnet(subnet).get('subnet')
+ self._check_ports_on_network(subnet['network_id'])
+ del self._fake_subnet[subnet]
+
+ def delete_port(self, port):
+ self.show_port(port)
+ del self._fake_ports[port]
+
+ def update_port(self, port, body=None):
+ self.show_port(port)
+ self._fake_ports[port].update(body['port'])
+ return {'port': self._fake_ports[port]}
+
+ def list_extensions(self, **_parms):
+ return {'extensions': []}
+
+ def _check_ports_on_network(self, network):
+ ports = self.list_ports()
+ for port in ports:
+ if port['network_id'] == network:
+ msg = ('Unable to complete operation on network %s. There is '
+ 'one or more ports still in use on the network'
+ % network)
+ raise n_exc.NeutronClientException(message=msg, status_code=409)