# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import inspect import webob from nova.api.openstack import extensions from nova.api.openstack import wsgi from nova import exception from nova.openstack.common import gettextutils from nova import test from nova.tests.api.openstack import fakes from nova.tests import utils class RequestTest(test.NoDBTestCase): def test_content_type_missing(self): request = wsgi.Request.blank('/tests/123', method='POST') request.body = "" self.assertIsNone(request.get_content_type()) def test_content_type_unsupported(self): request = wsgi.Request.blank('/tests/123', method='POST') request.headers["Content-Type"] = "text/html" request.body = "asdf
" self.assertRaises(exception.InvalidContentType, request.get_content_type) def test_content_type_with_charset(self): request = wsgi.Request.blank('/tests/123') request.headers["Content-Type"] = "application/json; charset=UTF-8" result = request.get_content_type() self.assertEqual(result, "application/json") def test_content_type_from_accept(self): for content_type in ('application/xml', 'application/vnd.openstack.compute+xml', 'application/json', 'application/vnd.openstack.compute+json'): request = wsgi.Request.blank('/tests/123') request.headers["Accept"] = content_type result = request.best_match_content_type() self.assertEqual(result, content_type) def test_content_type_from_accept_best(self): request = wsgi.Request.blank('/tests/123') request.headers["Accept"] = "application/xml, application/json" result = request.best_match_content_type() self.assertEqual(result, "application/json") request = wsgi.Request.blank('/tests/123') request.headers["Accept"] = ("application/json; q=0.3, " "application/xml; q=0.9") result = request.best_match_content_type() self.assertEqual(result, "application/xml") def test_content_type_from_query_extension(self): request = wsgi.Request.blank('/tests/123.xml') result = request.best_match_content_type() self.assertEqual(result, "application/xml") request = wsgi.Request.blank('/tests/123.json') result = request.best_match_content_type() self.assertEqual(result, "application/json") request = wsgi.Request.blank('/tests/123.invalid') result = request.best_match_content_type() self.assertEqual(result, "application/json") def test_content_type_accept_and_query_extension(self): request = wsgi.Request.blank('/tests/123.xml') request.headers["Accept"] = "application/json" result = request.best_match_content_type() self.assertEqual(result, "application/xml") def test_content_type_accept_default(self): request = wsgi.Request.blank('/tests/123.unsupported') request.headers["Accept"] = "application/unsupported1" result = request.best_match_content_type() self.assertEqual(result, "application/json") def test_cache_and_retrieve_instances(self): request = wsgi.Request.blank('/foo') instances = [] for x in xrange(3): instances.append({'uuid': 'uuid%s' % x}) # Store 2 request.cache_db_instances(instances[:2]) # Store 1 request.cache_db_instance(instances[2]) self.assertEqual(request.get_db_instance('uuid0'), instances[0]) self.assertEqual(request.get_db_instance('uuid1'), instances[1]) self.assertEqual(request.get_db_instance('uuid2'), instances[2]) self.assertIsNone(request.get_db_instance('uuid3')) self.assertEqual(request.get_db_instances(), {'uuid0': instances[0], 'uuid1': instances[1], 'uuid2': instances[2]}) def test_cache_and_retrieve_compute_nodes(self): request = wsgi.Request.blank('/foo') compute_nodes = [] for x in xrange(3): compute_nodes.append({'id': 'id%s' % x}) # Store 2 request.cache_db_compute_nodes(compute_nodes[:2]) # Store 1 request.cache_db_compute_node(compute_nodes[2]) self.assertEqual(request.get_db_compute_node('id0'), compute_nodes[0]) self.assertEqual(request.get_db_compute_node('id1'), compute_nodes[1]) self.assertEqual(request.get_db_compute_node('id2'), compute_nodes[2]) self.assertEqual(request.get_db_compute_node('id3'), None) self.assertEqual(request.get_db_compute_nodes(), {'id0': compute_nodes[0], 'id1': compute_nodes[1], 'id2': compute_nodes[2]}) def test_from_request(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = 'bogus;q=1.1, en-gb;q=0.7,en-us,en;q=.5,*;q=.7' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'en_US') def test_asterisk(self): # asterisk should match first available if there # are not any other available matches self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = '*,es;q=.5' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'en_GB') def test_prefix(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = 'zh' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'zh_CN') def test_secondary(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = 'nn,en-gb;q=.5' request.headers = {'Accept-Language': accepted} self.assertEqual(request.best_match_language(), 'en_GB') def test_none_found(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = 'nb-no' request.headers = {'Accept-Language': accepted} self.assertIs(request.best_match_language(), None) def test_no_lang_header(self): self.stubs.Set(gettextutils, 'get_available_languages', fakes.fake_get_available_languages) request = wsgi.Request.blank('/') accepted = '' request.headers = {'Accept-Language': accepted} self.assertIs(request.best_match_language(), None) class ActionDispatcherTest(test.NoDBTestCase): def test_dispatch(self): serializer = wsgi.ActionDispatcher() serializer.create = lambda x: 'pants' self.assertEqual(serializer.dispatch({}, action='create'), 'pants') def test_dispatch_action_None(self): serializer = wsgi.ActionDispatcher() serializer.create = lambda x: 'pants' serializer.default = lambda x: 'trousers' self.assertEqual(serializer.dispatch({}, action=None), 'trousers') def test_dispatch_default(self): serializer = wsgi.ActionDispatcher() serializer.create = lambda x: 'pants' serializer.default = lambda x: 'trousers' self.assertEqual(serializer.dispatch({}, action='update'), 'trousers') class DictSerializerTest(test.NoDBTestCase): def test_dispatch_default(self): serializer = wsgi.DictSerializer() self.assertEqual(serializer.serialize({}, 'update'), '') class XMLDictSerializerTest(test.NoDBTestCase): def test_xml(self): input_dict = dict(servers=dict(a=(2, 3))) expected_xml = '(2,3)' serializer = wsgi.XMLDictSerializer(xmlns="asdf") result = serializer.serialize(input_dict) result = result.replace('\n', '').replace(' ', '') self.assertEqual(result, expected_xml) class JSONDictSerializerTest(test.NoDBTestCase): def test_json(self): input_dict = dict(servers=dict(a=(2, 3))) expected_json = '{"servers":{"a":[2,3]}}' serializer = wsgi.JSONDictSerializer() result = serializer.serialize(input_dict) result = result.replace('\n', '').replace(' ', '') self.assertEqual(result, expected_json) class TextDeserializerTest(test.NoDBTestCase): def test_dispatch_default(self): deserializer = wsgi.TextDeserializer() self.assertEqual(deserializer.deserialize({}, 'update'), {}) class JSONDeserializerTest(test.NoDBTestCase): def test_json(self): data = """{"a": { "a1": "1", "a2": "2", "bs": ["1", "2", "3", {"c": {"c1": "1"}}], "d": {"e": "1"}, "f": "1"}}""" as_dict = { 'body': { 'a': { 'a1': '1', 'a2': '2', 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], 'd': {'e': '1'}, 'f': '1', }, }, } deserializer = wsgi.JSONDeserializer() self.assertEqual(deserializer.deserialize(data), as_dict) def test_json_valid_utf8(self): data = """{"server": {"min_count": 1, "flavorRef": "1", "name": "\xe6\xa6\x82\xe5\xbf\xb5", "imageRef": "10bab10c-1304-47d", "max_count": 1}} """ as_dict = { 'body': { u'server': { u'min_count': 1, u'flavorRef': u'1', u'name': u'\u6982\u5ff5', u'imageRef': u'10bab10c-1304-47d', u'max_count': 1 } } } deserializer = wsgi.JSONDeserializer() self.assertEqual(deserializer.deserialize(data), as_dict) def test_json_invalid_utf8(self): """Send invalid utf-8 to JSONDeserializer.""" data = """{"server": {"min_count": 1, "flavorRef": "1", "name": "\xf0\x28\x8c\x28", "imageRef": "10bab10c-1304-47d", "max_count": 1}} """ deserializer = wsgi.JSONDeserializer() self.assertRaises(exception.MalformedRequestBody, deserializer.deserialize, data) class XMLDeserializerTest(test.NoDBTestCase): def test_xml(self): xml = """ 123 1 1 """.strip() as_dict = { 'body': { 'a': { 'a1': '1', 'a2': '2', 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], 'd': {'e': '1'}, 'f': '1', }, }, } metadata = {'plurals': {'bs': 'b', 'ts': 't'}} deserializer = wsgi.XMLDeserializer(metadata=metadata) self.assertEqual(deserializer.deserialize(xml), as_dict) def test_xml_empty(self): xml = '' as_dict = {"body": {"a": {}}} deserializer = wsgi.XMLDeserializer() self.assertEqual(deserializer.deserialize(xml), as_dict) def test_xml_valid_utf8(self): xml = """ \xe6\xa6\x82\xe5\xbf\xb5 """ deserializer = wsgi.XMLDeserializer() as_dict = {'body': {u'a': {u'name': u'\u6982\u5ff5'}}} self.assertEqual(deserializer.deserialize(xml), as_dict) def test_xml_invalid_utf8(self): """Send invalid utf-8 to XMLDeserializer.""" xml = """ \xf0\x28\x8c\x28 """ deserializer = wsgi.XMLDeserializer() self.assertRaises(exception.MalformedRequestBody, deserializer.deserialize, xml) class ResourceTest(test.NoDBTestCase): def get_req_id_header_name(self, request): header_name = 'x-openstack-request-id' if utils.get_api_version(request) < 3: header_name = 'x-compute-request-id' return header_name def test_resource_call_with_method_get(self): class Controller(object): def index(self, req): return 'success' app = fakes.TestRouter(Controller()) # the default method is GET req = webob.Request.blank('/tests') response = req.get_response(app) self.assertEqual(response.body, 'success') self.assertEqual(response.status_int, 200) req.body = '{"body": {"key": "value"}}' response = req.get_response(app) self.assertEqual(response.body, 'success') self.assertEqual(response.status_int, 200) req.content_type = 'application/json' response = req.get_response(app) self.assertEqual(response.body, 'success') self.assertEqual(response.status_int, 200) def test_resource_call_with_method_post(self): class Controller(object): @extensions.expected_errors(400) def create(self, req, body): if expected_body != body: msg = "The request body invalid" raise webob.exc.HTTPBadRequest(explanation=msg) return "success" # verify the method: POST app = fakes.TestRouter(Controller()) req = webob.Request.blank('/tests', method="POST", content_type='application/json') req.body = '{"body": {"key": "value"}}' expected_body = {'body': { "key": "value" } } response = req.get_response(app) self.assertEqual(response.status_int, 200) self.assertEqual(response.body, 'success') # verify without body expected_body = None req.body = None response = req.get_response(app) self.assertEqual(response.status_int, 200) self.assertEqual(response.body, 'success') # the body is validated in the controller expected_body = {'body': None} response = req.get_response(app) expected_unsupported_type_body = ('{"badRequest": ' '{"message": "The request body invalid", "code": 400}}') self.assertEqual(response.status_int, 400) self.assertEqual(expected_unsupported_type_body, response.body) def test_resource_call_with_method_put(self): class Controller(object): def update(self, req, id, body): if expected_body != body: msg = "The request body invalid" raise webob.exc.HTTPBadRequest(explanation=msg) return "success" # verify the method: PUT app = fakes.TestRouter(Controller()) req = webob.Request.blank('/tests/test_id', method="PUT", content_type='application/json') req.body = '{"body": {"key": "value"}}' expected_body = {'body': { "key": "value" } } response = req.get_response(app) self.assertEqual(response.body, 'success') self.assertEqual(response.status_int, 200) req.body = None expected_body = None response = req.get_response(app) self.assertEqual(response.status_int, 200) #verify no content_type is contained in the request req.content_type = None req.body = '{"body": {"key": "value"}}' response = req.get_response(app) expected_unsupported_type_body = ('{"badRequest": ' '{"message": "Unsupported Content-Type", "code": 400}}') self.assertEqual(response.status_int, 400) self.assertEqual(expected_unsupported_type_body, response.body) def test_resource_call_with_method_delete(self): class Controller(object): def delete(self, req, id): return "success" # verify the method: DELETE app = fakes.TestRouter(Controller()) req = webob.Request.blank('/tests/test_id', method="DELETE") response = req.get_response(app) self.assertEqual(response.status_int, 200) self.assertEqual(response.body, 'success') # ignore the body req.body = '{"body": {"key": "value"}}' response = req.get_response(app) self.assertEqual(response.status_int, 200) self.assertEqual(response.body, 'success') def test_resource_not_authorized(self): class Controller(object): def index(self, req): raise exception.Forbidden() req = webob.Request.blank('/tests') app = fakes.TestRouter(Controller()) response = req.get_response(app) self.assertEqual(response.status_int, 403) def test_dispatch(self): class Controller(object): def index(self, req, pants=None): return pants controller = Controller() resource = wsgi.Resource(controller) method, extensions = resource.get_method(None, 'index', None, '') actual = resource.dispatch(method, None, {'pants': 'off'}) expected = 'off' self.assertEqual(actual, expected) def test_get_method_unknown_controller_method(self): class Controller(object): def index(self, req, pants=None): return pants controller = Controller() resource = wsgi.Resource(controller) self.assertRaises(AttributeError, resource.get_method, None, 'create', None, '') def test_get_method_action_json(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) method, extensions = resource.get_method(None, 'action', 'application/json', '{"fooAction": true}') self.assertEqual(controller._action_foo, method) def test_get_method_action_xml(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) method, extensions = resource.get_method(None, 'action', 'application/xml', 'true') self.assertEqual(controller._action_foo, method) def test_get_method_action_corrupt_xml(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) self.assertRaises( exception.MalformedRequestBody, resource.get_method, None, 'action', 'application/xml', utils.killer_xml_body()) def test_get_method_action_bad_body(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) self.assertRaises(exception.MalformedRequestBody, resource.get_method, None, 'action', 'application/json', '{}') def test_get_method_unknown_controller_action(self): class Controller(wsgi.Controller): @wsgi.action('fooAction') def _action_foo(self, req, id, body): return body controller = Controller() resource = wsgi.Resource(controller) self.assertRaises(KeyError, resource.get_method, None, 'action', 'application/json', '{"barAction": true}') def test_get_method_action_method(self): class Controller(): def action(self, req, pants=None): return pants controller = Controller() resource = wsgi.Resource(controller) method, extensions = resource.get_method(None, 'action', 'application/xml', 'true