From 89cd6a0c493e26b5a9e017c99d731464292abbaf Mon Sep 17 00:00:00 2001 From: Sean Dague Date: Fri, 7 Nov 2014 14:27:03 +0100 Subject: move all tests to nova/tests/unit As part of the split of functional and unit tests we need to isolate the unit tests into a separate directory for having multiple test targets in a sane way. Part of bp:functional-tests-for-nova Change-Id: Id42ba373c1bda6a312b673ab2b489ca56da8c628 --- nova/tests/unit/api/ec2/__init__.py | 0 .../unit/api/ec2/public_key/dummy.fingerprint | 1 + nova/tests/unit/api/ec2/public_key/dummy.pub | 1 + nova/tests/unit/api/ec2/test_api.py | 635 ++++ nova/tests/unit/api/ec2/test_apirequest.py | 92 + nova/tests/unit/api/ec2/test_cinder_cloud.py | 1096 +++++++ nova/tests/unit/api/ec2/test_cloud.py | 3255 ++++++++++++++++++++ nova/tests/unit/api/ec2/test_ec2_validate.py | 277 ++ nova/tests/unit/api/ec2/test_ec2utils.py | 61 + nova/tests/unit/api/ec2/test_error_response.py | 132 + nova/tests/unit/api/ec2/test_faults.py | 46 + nova/tests/unit/api/ec2/test_middleware.py | 225 ++ 12 files changed, 5821 insertions(+) create mode 100644 nova/tests/unit/api/ec2/__init__.py create mode 100644 nova/tests/unit/api/ec2/public_key/dummy.fingerprint create mode 100644 nova/tests/unit/api/ec2/public_key/dummy.pub create mode 100644 nova/tests/unit/api/ec2/test_api.py create mode 100644 nova/tests/unit/api/ec2/test_apirequest.py create mode 100644 nova/tests/unit/api/ec2/test_cinder_cloud.py create mode 100644 nova/tests/unit/api/ec2/test_cloud.py create mode 100644 nova/tests/unit/api/ec2/test_ec2_validate.py create mode 100644 nova/tests/unit/api/ec2/test_ec2utils.py create mode 100644 nova/tests/unit/api/ec2/test_error_response.py create mode 100644 nova/tests/unit/api/ec2/test_faults.py create mode 100644 nova/tests/unit/api/ec2/test_middleware.py (limited to 'nova/tests/unit/api/ec2') diff --git a/nova/tests/unit/api/ec2/__init__.py b/nova/tests/unit/api/ec2/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/nova/tests/unit/api/ec2/public_key/dummy.fingerprint b/nova/tests/unit/api/ec2/public_key/dummy.fingerprint new file mode 100644 index 0000000000..715bca27a2 --- /dev/null +++ b/nova/tests/unit/api/ec2/public_key/dummy.fingerprint @@ -0,0 +1 @@ +1c:87:d1:d9:32:fd:62:3c:78:2b:c0:ad:c0:15:88:df diff --git a/nova/tests/unit/api/ec2/public_key/dummy.pub b/nova/tests/unit/api/ec2/public_key/dummy.pub new file mode 100644 index 0000000000..d4cf2bc0d8 --- /dev/null +++ b/nova/tests/unit/api/ec2/public_key/dummy.pub @@ -0,0 +1 @@ +ssh-dss AAAAB3NzaC1kc3MAAACBAMGJlY9XEIm2X234pdO5yFWMp2JuOQx8U0E815IVXhmKxYCBK9ZakgZOIQmPbXoGYyV+mziDPp6HJ0wKYLQxkwLEFr51fAZjWQvRss0SinURRuLkockDfGFtD4pYJthekr/rlqMKlBSDUSpGq8jUWW60UJ18FGooFpxR7ESqQRx/AAAAFQC96LRglaUeeP+E8U/yblEJocuiWwAAAIA3XiMR8Skiz/0aBm5K50SeQznQuMJTyzt9S9uaz5QZWiFu69hOyGSFGw8fqgxEkXFJIuHobQQpGYQubLW0NdaYRqyE/Vud3JUJUb8Texld6dz8vGemyB5d1YvtSeHIo8/BGv2msOqR3u5AZTaGCBD9DhpSGOKHEdNjTtvpPd8S8gAAAIBociGZ5jf09iHLVENhyXujJbxfGRPsyNTyARJfCOGl0oFV6hEzcQyw8U/ePwjgvjc2UizMWLl8tsb2FXKHRdc2v+ND3Us+XqKQ33X3ADP4FZ/+Oj213gMyhCmvFTP0u5FmHog9My4CB7YcIWRuUR42WlhQ2IfPvKwUoTk3R+T6Og== www-data@mk diff --git a/nova/tests/unit/api/ec2/test_api.py b/nova/tests/unit/api/ec2/test_api.py new file mode 100644 index 0000000000..cc4a2adb75 --- /dev/null +++ b/nova/tests/unit/api/ec2/test_api.py @@ -0,0 +1,635 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Unit tests for the API endpoint.""" + +import random +import re +import StringIO + +import boto +import boto.connection +from boto.ec2 import regioninfo +from boto import exception as boto_exc +# newer versions of boto use their own wrapper on top of httplib.HTTPResponse +if hasattr(boto.connection, 'HTTPResponse'): + httplib = boto.connection +else: + import httplib +import fixtures +import webob + +from nova.api import auth +from nova.api import ec2 +from nova.api.ec2 import ec2utils +from nova import block_device +from nova import context +from nova import exception +from nova.openstack.common import versionutils +from nova import test +from nova.tests.unit import matchers + + +class FakeHttplibSocket(object): + """a fake socket implementation for httplib.HTTPResponse, trivial.""" + def __init__(self, response_string): + self.response_string = response_string + self._buffer = StringIO.StringIO(response_string) + + def makefile(self, _mode, _other): + """Returns the socket's internal buffer.""" + return self._buffer + + +class FakeHttplibConnection(object): + """A fake httplib.HTTPConnection for boto to use + + requests made via this connection actually get translated and routed into + our WSGI app, we then wait for the response and turn it back into + the HTTPResponse that boto expects. + """ + def __init__(self, app, host, is_secure=False): + self.app = app + self.host = host + + def request(self, method, path, data, headers): + req = webob.Request.blank(path) + req.method = method + req.body = data + req.headers = headers + req.headers['Accept'] = 'text/html' + req.host = self.host + # Call the WSGI app, get the HTTP response + resp = str(req.get_response(self.app)) + # For some reason, the response doesn't have "HTTP/1.0 " prepended; I + # guess that's a function the web server usually provides. + resp = "HTTP/1.0 %s" % resp + self.sock = FakeHttplibSocket(resp) + self.http_response = httplib.HTTPResponse(self.sock) + # NOTE(vish): boto is accessing private variables for some reason + self._HTTPConnection__response = self.http_response + self.http_response.begin() + + def getresponse(self): + return self.http_response + + def getresponsebody(self): + return self.sock.response_string + + def close(self): + """Required for compatibility with boto/tornado.""" + pass + + +class XmlConversionTestCase(test.NoDBTestCase): + """Unit test api xml conversion.""" + def test_number_conversion(self): + conv = ec2utils._try_convert + self.assertIsNone(conv('None')) + self.assertEqual(conv('True'), True) + self.assertEqual(conv('TRUE'), True) + self.assertEqual(conv('true'), True) + self.assertEqual(conv('False'), False) + self.assertEqual(conv('FALSE'), False) + self.assertEqual(conv('false'), False) + self.assertEqual(conv('0'), 0) + self.assertEqual(conv('42'), 42) + self.assertEqual(conv('3.14'), 3.14) + self.assertEqual(conv('-57.12'), -57.12) + self.assertEqual(conv('0x57'), 0x57) + self.assertEqual(conv('-0x57'), -0x57) + self.assertEqual(conv('-'), '-') + self.assertEqual(conv('-0'), 0) + self.assertEqual(conv('0.0'), 0.0) + self.assertEqual(conv('1e-8'), 0.0) + self.assertEqual(conv('-1e-8'), 0.0) + self.assertEqual(conv('0xDD8G'), '0xDD8G') + self.assertEqual(conv('0XDD8G'), '0XDD8G') + self.assertEqual(conv('-stringy'), '-stringy') + self.assertEqual(conv('stringy'), 'stringy') + self.assertEqual(conv('add'), 'add') + self.assertEqual(conv('remove'), 'remove') + self.assertEqual(conv(''), '') + + +class Ec2utilsTestCase(test.NoDBTestCase): + def test_ec2_id_to_id(self): + self.assertEqual(ec2utils.ec2_id_to_id('i-0000001e'), 30) + self.assertEqual(ec2utils.ec2_id_to_id('ami-1d'), 29) + self.assertEqual(ec2utils.ec2_id_to_id('snap-0000001c'), 28) + self.assertEqual(ec2utils.ec2_id_to_id('vol-0000001b'), 27) + + def test_bad_ec2_id(self): + self.assertRaises(exception.InvalidEc2Id, + ec2utils.ec2_id_to_id, + 'badone') + + def test_id_to_ec2_id(self): + self.assertEqual(ec2utils.id_to_ec2_id(30), 'i-0000001e') + self.assertEqual(ec2utils.id_to_ec2_id(29, 'ami-%08x'), 'ami-0000001d') + self.assertEqual(ec2utils.id_to_ec2_snap_id(28), 'snap-0000001c') + self.assertEqual(ec2utils.id_to_ec2_vol_id(27), 'vol-0000001b') + + def test_dict_from_dotted_str(self): + in_str = [('BlockDeviceMapping.1.DeviceName', '/dev/sda1'), + ('BlockDeviceMapping.1.Ebs.SnapshotId', 'snap-0000001c'), + ('BlockDeviceMapping.1.Ebs.VolumeSize', '80'), + ('BlockDeviceMapping.1.Ebs.DeleteOnTermination', 'false'), + ('BlockDeviceMapping.2.DeviceName', '/dev/sdc'), + ('BlockDeviceMapping.2.VirtualName', 'ephemeral0')] + expected_dict = { + 'block_device_mapping': { + '1': {'device_name': '/dev/sda1', + 'ebs': {'snapshot_id': 'snap-0000001c', + 'volume_size': 80, + 'delete_on_termination': False}}, + '2': {'device_name': '/dev/sdc', + 'virtual_name': 'ephemeral0'}}} + out_dict = ec2utils.dict_from_dotted_str(in_str) + + self.assertThat(out_dict, matchers.DictMatches(expected_dict)) + + def test_properties_root_defice_name(self): + mappings = [{"device": "/dev/sda1", "virtual": "root"}] + properties0 = {'mappings': mappings} + properties1 = {'root_device_name': '/dev/sdb', 'mappings': mappings} + + root_device_name = block_device.properties_root_device_name( + properties0) + self.assertEqual(root_device_name, '/dev/sda1') + + root_device_name = block_device.properties_root_device_name( + properties1) + self.assertEqual(root_device_name, '/dev/sdb') + + def test_regex_from_ec2_regex(self): + def _test_re(ec2_regex, expected, literal, match=True): + regex = ec2utils.regex_from_ec2_regex(ec2_regex) + self.assertEqual(regex, expected) + if match: + self.assertIsNotNone(re.match(regex, literal)) + else: + self.assertIsNone(re.match(regex, literal)) + + # wildcards + _test_re('foo', '\Afoo\Z(?s)', 'foo') + _test_re('foo', '\Afoo\Z(?s)', 'baz', match=False) + _test_re('foo?bar', '\Afoo.bar\Z(?s)', 'foo bar') + _test_re('foo?bar', '\Afoo.bar\Z(?s)', 'foo bar', match=False) + _test_re('foo*bar', '\Afoo.*bar\Z(?s)', 'foo QUUX bar') + + # backslashes and escaped wildcards + _test_re('foo\\', '\Afoo\\\\\Z(?s)', 'foo\\') + _test_re('foo*bar', '\Afoo.*bar\Z(?s)', 'zork QUUX bar', match=False) + _test_re('foo\\?bar', '\Afoo[?]bar\Z(?s)', 'foo?bar') + _test_re('foo\\?bar', '\Afoo[?]bar\Z(?s)', 'foo bar', match=False) + _test_re('foo\\*bar', '\Afoo[*]bar\Z(?s)', 'foo*bar') + _test_re('foo\\*bar', '\Afoo[*]bar\Z(?s)', 'foo bar', match=False) + + # analog to the example given in the EC2 API docs + ec2_regex = '\*nova\?\\end' + expected = r'\A[*]nova[?]\\end\Z(?s)' + literal = r'*nova?\end' + _test_re(ec2_regex, expected, literal) + + def test_mapping_prepend_dev(self): + mappings = [ + {'virtual': 'ami', + 'device': 'sda1'}, + {'virtual': 'root', + 'device': '/dev/sda1'}, + + {'virtual': 'swap', + 'device': 'sdb1'}, + {'virtual': 'swap', + 'device': '/dev/sdb2'}, + + {'virtual': 'ephemeral0', + 'device': 'sdc1'}, + {'virtual': 'ephemeral1', + 'device': '/dev/sdc1'}] + expected_result = [ + {'virtual': 'ami', + 'device': 'sda1'}, + {'virtual': 'root', + 'device': '/dev/sda1'}, + + {'virtual': 'swap', + 'device': '/dev/sdb1'}, + {'virtual': 'swap', + 'device': '/dev/sdb2'}, + + {'virtual': 'ephemeral0', + 'device': '/dev/sdc1'}, + {'virtual': 'ephemeral1', + 'device': '/dev/sdc1'}] + self.assertThat(block_device.mappings_prepend_dev(mappings), + matchers.DictListMatches(expected_result)) + + +class ApiEc2TestCase(test.TestCase): + """Unit test for the cloud controller on an EC2 API.""" + def setUp(self): + super(ApiEc2TestCase, self).setUp() + self.host = '127.0.0.1' + # NOTE(vish): skipping the Authorizer + roles = ['sysadmin', 'netadmin'] + ctxt = context.RequestContext('fake', 'fake', roles=roles) + self.app = auth.InjectContext(ctxt, ec2.FaultWrapper( + ec2.RequestLogging(ec2.Requestify(ec2.Authorizer(ec2.Executor() + ), 'nova.api.ec2.cloud.CloudController')))) + self.useFixture(fixtures.FakeLogger('boto')) + + def expect_http(self, host=None, is_secure=False, api_version=None): + """Returns a new EC2 connection.""" + self.ec2 = boto.connect_ec2( + aws_access_key_id='fake', + aws_secret_access_key='fake', + is_secure=False, + region=regioninfo.RegionInfo(None, 'test', self.host), + port=8773, + path='/services/Cloud') + if api_version: + self.ec2.APIVersion = api_version + + self.mox.StubOutWithMock(self.ec2, 'new_http_connection') + self.http = FakeHttplibConnection( + self.app, '%s:8773' % (self.host), False) + # pylint: disable=E1103 + if versionutils.is_compatible('2.14', boto.Version, same_major=False): + self.ec2.new_http_connection(host or self.host, 8773, + is_secure).AndReturn(self.http) + elif versionutils.is_compatible('2', boto.Version, same_major=False): + self.ec2.new_http_connection(host or '%s:8773' % (self.host), + is_secure).AndReturn(self.http) + else: + self.ec2.new_http_connection(host, is_secure).AndReturn(self.http) + return self.http + + def test_xmlns_version_matches_request_version(self): + self.expect_http(api_version='2010-10-30') + self.mox.ReplayAll() + + # Any request should be fine + self.ec2.get_all_instances() + self.assertTrue(self.ec2.APIVersion in self.http.getresponsebody(), + 'The version in the xmlns of the response does ' + 'not match the API version given in the request.') + + def test_describe_instances(self): + """Test that, after creating a user and a project, the describe + instances call to the API works properly. + """ + self.expect_http() + self.mox.ReplayAll() + self.assertEqual(self.ec2.get_all_instances(), []) + + def test_terminate_invalid_instance(self): + # Attempt to terminate an invalid instance. + self.expect_http() + self.mox.ReplayAll() + self.assertRaises(boto_exc.EC2ResponseError, + self.ec2.terminate_instances, "i-00000005") + + def test_get_all_key_pairs(self): + """Test that, after creating a user and project and generating + a key pair, that the API call to list key pairs works properly. + """ + keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") + for x in range(random.randint(4, 8))) + self.expect_http() + self.mox.ReplayAll() + self.ec2.create_key_pair(keyname) + rv = self.ec2.get_all_key_pairs() + results = [k for k in rv if k.name == keyname] + self.assertEqual(len(results), 1) + + def test_create_duplicate_key_pair(self): + """Test that, after successfully generating a keypair, + requesting a second keypair with the same name fails sanely. + """ + self.expect_http() + self.mox.ReplayAll() + self.ec2.create_key_pair('test') + + try: + self.ec2.create_key_pair('test') + except boto_exc.EC2ResponseError as e: + if e.code == 'InvalidKeyPair.Duplicate': + pass + else: + self.assertEqual('InvalidKeyPair.Duplicate', e.code) + else: + self.fail('Exception not raised.') + + def test_get_all_security_groups(self): + # Test that we can retrieve security groups. + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + self.assertEqual(len(rv), 1) + self.assertEqual(rv[0].name, 'default') + + def test_create_delete_security_group(self): + # Test that we can create a security group. + self.expect_http() + self.mox.ReplayAll() + + security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") + for x in range(random.randint(4, 8))) + + self.ec2.create_security_group(security_group_name, 'test group') + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + self.assertEqual(len(rv), 2) + self.assertIn(security_group_name, [group.name for group in rv]) + + self.expect_http() + self.mox.ReplayAll() + + self.ec2.delete_security_group(security_group_name) + + def test_group_name_valid_chars_security_group(self): + """Test that we sanely handle invalid security group names. + + EC2 API Spec states we should only accept alphanumeric characters, + spaces, dashes, and underscores. Amazon implementation + accepts more characters - so, [:print:] is ok. + """ + bad_strict_ec2 = "aa \t\x01\x02\x7f" + bad_amazon_ec2 = "aa #^% -=99" + test_raise = [ + (True, bad_amazon_ec2, "test desc"), + (True, "test name", bad_amazon_ec2), + (False, bad_strict_ec2, "test desc"), + ] + for t in test_raise: + self.expect_http() + self.mox.ReplayAll() + self.flags(ec2_strict_validation=t[0]) + self.assertRaises(boto_exc.EC2ResponseError, + self.ec2.create_security_group, + t[1], + t[2]) + test_accept = [ + (False, bad_amazon_ec2, "test desc"), + (False, "test name", bad_amazon_ec2), + ] + for t in test_accept: + self.expect_http() + self.mox.ReplayAll() + self.flags(ec2_strict_validation=t[0]) + self.ec2.create_security_group(t[1], t[2]) + self.expect_http() + self.mox.ReplayAll() + self.ec2.delete_security_group(t[1]) + + def test_group_name_valid_length_security_group(self): + """Test that we sanely handle invalid security group names. + + API Spec states that the length should not exceed 255 char. + """ + self.expect_http() + self.mox.ReplayAll() + + # Test block group_name > 255 chars + security_group_name = "".join(random.choice("poiuytrewqasdfghjklmnbvc") + for x in range(random.randint(256, 266))) + + self.assertRaises(boto_exc.EC2ResponseError, + self.ec2.create_security_group, + security_group_name, + 'test group') + + def test_authorize_revoke_security_group_cidr(self): + """Test that we can add and remove CIDR based rules + to a security group + """ + self.expect_http() + self.mox.ReplayAll() + + security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") + for x in range(random.randint(4, 8))) + + group = self.ec2.create_security_group(security_group_name, + 'test group') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.authorize('tcp', 80, 81, '0.0.0.0/0') + group.authorize('icmp', -1, -1, '0.0.0.0/0') + group.authorize('udp', 80, 81, '0.0.0.0/0') + group.authorize('tcp', 1, 65535, '0.0.0.0/0') + group.authorize('udp', 1, 65535, '0.0.0.0/0') + group.authorize('icmp', 1, 0, '0.0.0.0/0') + group.authorize('icmp', 0, 1, '0.0.0.0/0') + group.authorize('icmp', 0, 0, '0.0.0.0/0') + + def _assert(message, *args): + try: + group.authorize(*args) + except boto_exc.EC2ResponseError as e: + self.assertEqual(e.status, 400, 'Expected status to be 400') + self.assertIn(message, e.error_message) + else: + raise self.failureException, 'EC2ResponseError not raised' + + # Invalid CIDR address + _assert('Invalid CIDR', 'tcp', 80, 81, '0.0.0.0/0444') + # Missing ports + _assert('Not enough parameters', 'tcp', '0.0.0.0/0') + # from port cannot be greater than to port + _assert('Invalid port range', 'tcp', 100, 1, '0.0.0.0/0') + # For tcp, negative values are not allowed + _assert('Invalid port range', 'tcp', -1, 1, '0.0.0.0/0') + # For tcp, valid port range 1-65535 + _assert('Invalid port range', 'tcp', 1, 65599, '0.0.0.0/0') + # Invalid Cidr for ICMP type + _assert('Invalid CIDR', 'icmp', -1, -1, '0.0.444.0/4') + # Invalid protocol + _assert('Invalid IP protocol', 'xyz', 1, 14, '0.0.0.0/0') + # Invalid port + _assert('Invalid input received: To and From ports must be integers', + 'tcp', " ", "81", '0.0.0.0/0') + # Invalid icmp port + _assert('Invalid input received: ' + 'Type and Code must be integers for ICMP protocol type', + 'icmp', " ", "81", '0.0.0.0/0') + # Invalid CIDR Address + _assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0') + # Invalid CIDR Address + _assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0/') + # Invalid Cidr ports + _assert('Invalid port range', 'icmp', 1, 256, '0.0.0.0/0') + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + group = [grp for grp in rv if grp.name == security_group_name][0] + + self.assertEqual(len(group.rules), 8) + self.assertEqual(int(group.rules[0].from_port), 80) + self.assertEqual(int(group.rules[0].to_port), 81) + self.assertEqual(len(group.rules[0].grants), 1) + self.assertEqual(str(group.rules[0].grants[0]), '0.0.0.0/0') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.revoke('tcp', 80, 81, '0.0.0.0/0') + group.revoke('icmp', -1, -1, '0.0.0.0/0') + group.revoke('udp', 80, 81, '0.0.0.0/0') + group.revoke('tcp', 1, 65535, '0.0.0.0/0') + group.revoke('udp', 1, 65535, '0.0.0.0/0') + group.revoke('icmp', 1, 0, '0.0.0.0/0') + group.revoke('icmp', 0, 1, '0.0.0.0/0') + group.revoke('icmp', 0, 0, '0.0.0.0/0') + + self.expect_http() + self.mox.ReplayAll() + + self.ec2.delete_security_group(security_group_name) + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + rv = self.ec2.get_all_security_groups() + + self.assertEqual(len(rv), 1) + self.assertEqual(rv[0].name, 'default') + + def test_authorize_revoke_security_group_cidr_v6(self): + """Test that we can add and remove CIDR based rules + to a security group for IPv6 + """ + self.expect_http() + self.mox.ReplayAll() + + security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") + for x in range(random.randint(4, 8))) + + group = self.ec2.create_security_group(security_group_name, + 'test group') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.authorize('tcp', 80, 81, '::/0') + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + group = [grp for grp in rv if grp.name == security_group_name][0] + self.assertEqual(len(group.rules), 1) + self.assertEqual(int(group.rules[0].from_port), 80) + self.assertEqual(int(group.rules[0].to_port), 81) + self.assertEqual(len(group.rules[0].grants), 1) + self.assertEqual(str(group.rules[0].grants[0]), '::/0') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.revoke('tcp', 80, 81, '::/0') + + self.expect_http() + self.mox.ReplayAll() + + self.ec2.delete_security_group(security_group_name) + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + rv = self.ec2.get_all_security_groups() + + self.assertEqual(len(rv), 1) + self.assertEqual(rv[0].name, 'default') + + def test_authorize_revoke_security_group_foreign_group(self): + """Test that we can grant and revoke another security group access + to a security group + """ + self.expect_http() + self.mox.ReplayAll() + + rand_string = 'sdiuisudfsdcnpaqwertasd' + security_group_name = "".join(random.choice(rand_string) + for x in range(random.randint(4, 8))) + other_security_group_name = "".join(random.choice(rand_string) + for x in range(random.randint(4, 8))) + + group = self.ec2.create_security_group(security_group_name, + 'test group') + + self.expect_http() + self.mox.ReplayAll() + + other_group = self.ec2.create_security_group(other_security_group_name, + 'some other group') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.authorize(src_group=other_group) + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + # I don't bother checkng that we actually find it here, + # because the create/delete unit test further up should + # be good enough for that. + for group in rv: + if group.name == security_group_name: + self.assertEqual(len(group.rules), 3) + self.assertEqual(len(group.rules[0].grants), 1) + self.assertEqual(str(group.rules[0].grants[0]), + '%s-%s' % (other_security_group_name, 'fake')) + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + for group in rv: + if group.name == security_group_name: + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + group.revoke(src_group=other_group) + + self.expect_http() + self.mox.ReplayAll() + + self.ec2.delete_security_group(security_group_name) + self.ec2.delete_security_group(other_security_group_name) diff --git a/nova/tests/unit/api/ec2/test_apirequest.py b/nova/tests/unit/api/ec2/test_apirequest.py new file mode 100644 index 0000000000..4b2dee96f8 --- /dev/null +++ b/nova/tests/unit/api/ec2/test_apirequest.py @@ -0,0 +1,92 @@ +# Copyright 2014 Hewlett-Packard Development Company, L.P. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Unit tests for the API Request internals.""" + +import copy + +from oslo.utils import timeutils + +from nova.api.ec2 import apirequest +from nova import test + + +class APIRequestTestCase(test.NoDBTestCase): + + def setUp(self): + super(APIRequestTestCase, self).setUp() + self.req = apirequest.APIRequest("FakeController", "FakeAction", + "FakeVersion", {}) + self.resp = { + 'string': 'foo', + 'int': 1, + 'long': long(1), + 'bool': False, + 'dict': { + 'string': 'foo', + 'int': 1, + } + } + + # The previous will produce an output that looks like the + # following (excusing line wrap for 80 cols): + # + # + # uuid + # 1 + # + # 1 + # foo + # + # false + # foo + # + # + # We don't attempt to ever test for the full document because + # hash seed order might impact it's rendering order. The fact + # that running the function doesn't explode is a big part of + # the win. + + def test_render_response_ascii(self): + data = self.req._render_response(self.resp, 'uuid') + self.assertIn('= 500 or without code) should + be filtered as they might contain sensitive information. + """ + msg = "Test server failure." + err = ec2.ec2_error_ex(TestServerExceptionEC2(msg), self.req, + unexpected=True) + self._validate_ec2_error(err, TestServerExceptionEC2.code, + TestServerExceptionEC2.ec2_code, + unknown_msg=True) + + def test_unexpected_exception_builtin(self): + """Test response to builtin unexpected exception. + + Server exception messages (with code >= 500 or without code) should + be filtered as they might contain sensitive information. + """ + msg = "Test server failure." + err = ec2.ec2_error_ex(RuntimeError(msg), self.req, unexpected=True) + self._validate_ec2_error(err, 500, 'RuntimeError', unknown_msg=True) diff --git a/nova/tests/unit/api/ec2/test_faults.py b/nova/tests/unit/api/ec2/test_faults.py new file mode 100644 index 0000000000..ae71be9bbf --- /dev/null +++ b/nova/tests/unit/api/ec2/test_faults.py @@ -0,0 +1,46 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mox +import webob + +from nova.api.ec2 import faults +from nova import test +from nova import wsgi + + +class TestFaults(test.NoDBTestCase): + """Tests covering ec2 Fault class.""" + + def test_fault_exception(self): + # Ensure the status_int is set correctly on faults. + fault = faults.Fault(webob.exc.HTTPBadRequest( + explanation='test')) + self.assertIsInstance(fault.wrapped_exc, webob.exc.HTTPBadRequest) + + def test_fault_exception_status_int(self): + # Ensure the status_int is set correctly on faults. + fault = faults.Fault(webob.exc.HTTPNotFound(explanation='test')) + self.assertEqual(fault.wrapped_exc.status_int, 404) + + def test_fault_call(self): + # Ensure proper EC2 response on faults. + message = 'test message' + ex = webob.exc.HTTPNotFound(explanation=message) + fault = faults.Fault(ex) + req = wsgi.Request.blank('/test') + req.GET['AWSAccessKeyId'] = "test_user_id:test_project_id" + self.mox.StubOutWithMock(faults, 'ec2_error_response') + faults.ec2_error_response(mox.IgnoreArg(), 'HTTPNotFound', + message=message, status=ex.status_int) + self.mox.ReplayAll() + fault(req) diff --git a/nova/tests/unit/api/ec2/test_middleware.py b/nova/tests/unit/api/ec2/test_middleware.py new file mode 100644 index 0000000000..3eb9c703da --- /dev/null +++ b/nova/tests/unit/api/ec2/test_middleware.py @@ -0,0 +1,225 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from eventlet.green import httplib +from lxml import etree +import mox +from oslo.config import cfg +from oslo.utils import timeutils +import webob +import webob.dec +import webob.exc + +from nova.api import ec2 +from nova import context +from nova import exception +from nova import test +from nova import wsgi + +CONF = cfg.CONF + + +@webob.dec.wsgify +def conditional_forbid(req): + """Helper wsgi app returns 403 if param 'die' is 1.""" + if 'die' in req.params and req.params['die'] == '1': + raise webob.exc.HTTPForbidden() + return 'OK' + + +class LockoutTestCase(test.NoDBTestCase): + """Test case for the Lockout middleware.""" + def setUp(self): # pylint: disable=C0103 + super(LockoutTestCase, self).setUp() + timeutils.set_time_override() + self.lockout = ec2.Lockout(conditional_forbid) + + def tearDown(self): # pylint: disable=C0103 + timeutils.clear_time_override() + super(LockoutTestCase, self).tearDown() + + def _send_bad_attempts(self, access_key, num_attempts=1): + """Fail x.""" + for i in xrange(num_attempts): + req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key) + self.assertEqual(req.get_response(self.lockout).status_int, 403) + + def _is_locked_out(self, access_key): + """Sends a test request to see if key is locked out.""" + req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key) + return (req.get_response(self.lockout).status_int == 403) + + def test_lockout(self): + self._send_bad_attempts('test', CONF.lockout_attempts) + self.assertTrue(self._is_locked_out('test')) + + def test_timeout(self): + self._send_bad_attempts('test', CONF.lockout_attempts) + self.assertTrue(self._is_locked_out('test')) + timeutils.advance_time_seconds(CONF.lockout_minutes * 60) + self.assertFalse(self._is_locked_out('test')) + + def test_multiple_keys(self): + self._send_bad_attempts('test1', CONF.lockout_attempts) + self.assertTrue(self._is_locked_out('test1')) + self.assertFalse(self._is_locked_out('test2')) + timeutils.advance_time_seconds(CONF.lockout_minutes * 60) + self.assertFalse(self._is_locked_out('test1')) + self.assertFalse(self._is_locked_out('test2')) + + def test_window_timeout(self): + self._send_bad_attempts('test', CONF.lockout_attempts - 1) + self.assertFalse(self._is_locked_out('test')) + timeutils.advance_time_seconds(CONF.lockout_window * 60) + self._send_bad_attempts('test', CONF.lockout_attempts - 1) + self.assertFalse(self._is_locked_out('test')) + + +class ExecutorTestCase(test.NoDBTestCase): + def setUp(self): + super(ExecutorTestCase, self).setUp() + self.executor = ec2.Executor() + + def _execute(self, invoke): + class Fake(object): + pass + fake_ec2_request = Fake() + fake_ec2_request.invoke = invoke + + fake_wsgi_request = Fake() + + fake_wsgi_request.environ = { + 'nova.context': context.get_admin_context(), + 'ec2.request': fake_ec2_request, + } + return self.executor(fake_wsgi_request) + + def _extract_message(self, result): + tree = etree.fromstring(result.body) + return tree.findall('./Errors')[0].find('Error/Message').text + + def _extract_code(self, result): + tree = etree.fromstring(result.body) + return tree.findall('./Errors')[0].find('Error/Code').text + + def test_instance_not_found(self): + def not_found(context): + raise exception.InstanceNotFound(instance_id=5) + result = self._execute(not_found) + self.assertIn('i-00000005', self._extract_message(result)) + self.assertEqual('InvalidInstanceID.NotFound', + self._extract_code(result)) + + def test_instance_not_found_none(self): + def not_found(context): + raise exception.InstanceNotFound(instance_id=None) + + # NOTE(mikal): we want no exception to be raised here, which was what + # was happening in bug/1080406 + result = self._execute(not_found) + self.assertIn('None', self._extract_message(result)) + self.assertEqual('InvalidInstanceID.NotFound', + self._extract_code(result)) + + def test_snapshot_not_found(self): + def not_found(context): + raise exception.SnapshotNotFound(snapshot_id=5) + result = self._execute(not_found) + self.assertIn('snap-00000005', self._extract_message(result)) + self.assertEqual('InvalidSnapshot.NotFound', + self._extract_code(result)) + + def test_volume_not_found(self): + def not_found(context): + raise exception.VolumeNotFound(volume_id=5) + result = self._execute(not_found) + self.assertIn('vol-00000005', self._extract_message(result)) + self.assertEqual('InvalidVolume.NotFound', self._extract_code(result)) + + +class FakeResponse(object): + reason = "Test Reason" + + def __init__(self, status=400): + self.status = status + + def read(self): + return '{}' + + +class KeystoneAuthTestCase(test.NoDBTestCase): + def setUp(self): + super(KeystoneAuthTestCase, self).setUp() + self.kauth = ec2.EC2KeystoneAuth(conditional_forbid) + + def _validate_ec2_error(self, response, http_status, ec2_code): + self.assertEqual(response.status_code, http_status, + 'Expected HTTP status %s' % http_status) + root_e = etree.XML(response.body) + self.assertEqual(root_e.tag, 'Response', + "Top element must be Response.") + errors_e = root_e.find('Errors') + error_e = errors_e[0] + code_e = error_e.find('Code') + self.assertIsNotNone(code_e, "Code element must be present.") + self.assertEqual(code_e.text, ec2_code) + + def test_no_signature(self): + req = wsgi.Request.blank('/test') + resp = self.kauth(req) + self._validate_ec2_error(resp, 400, 'AuthFailure') + + def test_no_key_id(self): + req = wsgi.Request.blank('/test') + req.GET['Signature'] = 'test-signature' + resp = self.kauth(req) + self._validate_ec2_error(resp, 400, 'AuthFailure') + + def test_communication_failure(self): + req = wsgi.Request.blank('/test') + req.GET['Signature'] = 'test-signature' + req.GET['AWSAccessKeyId'] = 'test-key-id' + + conn = httplib.HTTPConnection('/mock') + self.mox.StubOutWithMock(httplib.HTTPConnection, 'request') + self.mox.StubOutWithMock(httplib.HTTPConnection, 'getresponse') + conn.request('POST', mox.IgnoreArg(), body=mox.IgnoreArg(), + headers=mox.IgnoreArg()) + resp = FakeResponse() + conn.getresponse().AndReturn(resp) + self.mox.ReplayAll() + + resp = self.kauth(req) + self._validate_ec2_error(resp, 400, 'AuthFailure') + + def test_no_result_data(self): + req = wsgi.Request.blank('/test') + req.GET['Signature'] = 'test-signature' + req.GET['AWSAccessKeyId'] = 'test-key-id' + + conn = httplib.HTTPConnection('/mock') + self.mox.StubOutWithMock(httplib.HTTPConnection, 'request') + self.mox.StubOutWithMock(httplib.HTTPConnection, 'getresponse') + self.mox.StubOutWithMock(httplib.HTTPConnection, 'close') + conn.request('POST', mox.IgnoreArg(), body=mox.IgnoreArg(), + headers=mox.IgnoreArg()) + resp = FakeResponse(200) + conn.getresponse().AndReturn(resp) + conn.close() + self.mox.ReplayAll() + + resp = self.kauth(req) + self._validate_ec2_error(resp, 400, 'AuthFailure') -- cgit v1.2.1