summaryrefslogtreecommitdiff
path: root/nova/tests/unit/api/openstack/test_wsgi.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/tests/unit/api/openstack/test_wsgi.py')
-rw-r--r--nova/tests/unit/api/openstack/test_wsgi.py1244
1 files changed, 1244 insertions, 0 deletions
diff --git a/nova/tests/unit/api/openstack/test_wsgi.py b/nova/tests/unit/api/openstack/test_wsgi.py
new file mode 100644
index 0000000000..7607101628
--- /dev/null
+++ b/nova/tests/unit/api/openstack/test_wsgi.py
@@ -0,0 +1,1244 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import inspect
+
+import webob
+
+from nova.api.openstack import extensions
+from nova.api.openstack import wsgi
+from nova import exception
+from nova import i18n
+from nova import test
+from nova.tests.unit.api.openstack import fakes
+from nova.tests.unit import utils
+
+
+class RequestTest(test.NoDBTestCase):
+ def test_content_type_missing(self):
+ request = wsgi.Request.blank('/tests/123', method='POST')
+ request.body = "<body />"
+ self.assertIsNone(request.get_content_type())
+
+ def test_content_type_unsupported(self):
+ request = wsgi.Request.blank('/tests/123', method='POST')
+ request.headers["Content-Type"] = "text/html"
+ request.body = "asdf<br />"
+ self.assertRaises(exception.InvalidContentType,
+ request.get_content_type)
+
+ def test_content_type_with_charset(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Content-Type"] = "application/json; charset=UTF-8"
+ result = request.get_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_content_type_from_accept(self):
+ for content_type in ('application/xml',
+ 'application/vnd.openstack.compute+xml',
+ 'application/json',
+ 'application/vnd.openstack.compute+json'):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = content_type
+ result = request.best_match_content_type()
+ self.assertEqual(result, content_type)
+
+ def test_content_type_from_accept_best(self):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = "application/xml, application/json"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = ("application/json; q=0.3, "
+ "application/xml; q=0.9")
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ def test_content_type_from_query_extension(self):
+ request = wsgi.Request.blank('/tests/123.xml')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ request = wsgi.Request.blank('/tests/123.json')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ request = wsgi.Request.blank('/tests/123.invalid')
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_content_type_accept_and_query_extension(self):
+ request = wsgi.Request.blank('/tests/123.xml')
+ request.headers["Accept"] = "application/json"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/xml")
+
+ def test_content_type_accept_default(self):
+ request = wsgi.Request.blank('/tests/123.unsupported')
+ request.headers["Accept"] = "application/unsupported1"
+ result = request.best_match_content_type()
+ self.assertEqual(result, "application/json")
+
+ def test_cache_and_retrieve_instances(self):
+ request = wsgi.Request.blank('/foo')
+ instances = []
+ for x in xrange(3):
+ instances.append({'uuid': 'uuid%s' % x})
+ # Store 2
+ request.cache_db_instances(instances[:2])
+ # Store 1
+ request.cache_db_instance(instances[2])
+ self.assertEqual(request.get_db_instance('uuid0'),
+ instances[0])
+ self.assertEqual(request.get_db_instance('uuid1'),
+ instances[1])
+ self.assertEqual(request.get_db_instance('uuid2'),
+ instances[2])
+ self.assertIsNone(request.get_db_instance('uuid3'))
+ self.assertEqual(request.get_db_instances(),
+ {'uuid0': instances[0],
+ 'uuid1': instances[1],
+ 'uuid2': instances[2]})
+
+ def test_cache_and_retrieve_compute_nodes(self):
+ request = wsgi.Request.blank('/foo')
+ compute_nodes = []
+ for x in xrange(3):
+ compute_nodes.append({'id': 'id%s' % x})
+ # Store 2
+ request.cache_db_compute_nodes(compute_nodes[:2])
+ # Store 1
+ request.cache_db_compute_node(compute_nodes[2])
+ self.assertEqual(request.get_db_compute_node('id0'),
+ compute_nodes[0])
+ self.assertEqual(request.get_db_compute_node('id1'),
+ compute_nodes[1])
+ self.assertEqual(request.get_db_compute_node('id2'),
+ compute_nodes[2])
+ self.assertIsNone(request.get_db_compute_node('id3'))
+ self.assertEqual(request.get_db_compute_nodes(),
+ {'id0': compute_nodes[0],
+ 'id1': compute_nodes[1],
+ 'id2': compute_nodes[2]})
+
+ def test_from_request(self):
+ self.stubs.Set(i18n, 'get_available_languages',
+ fakes.fake_get_available_languages)
+
+ request = wsgi.Request.blank('/')
+ accepted = 'bogus;q=1.1, en-gb;q=0.7,en-us,en;q=.5,*;q=.7'
+ request.headers = {'Accept-Language': accepted}
+ self.assertEqual(request.best_match_language(), 'en_US')
+
+ def test_asterisk(self):
+ # asterisk should match first available if there
+ # are not any other available matches
+ self.stubs.Set(i18n, 'get_available_languages',
+ fakes.fake_get_available_languages)
+
+ request = wsgi.Request.blank('/')
+ accepted = '*,es;q=.5'
+ request.headers = {'Accept-Language': accepted}
+ self.assertEqual(request.best_match_language(), 'en_GB')
+
+ def test_prefix(self):
+ self.stubs.Set(i18n, 'get_available_languages',
+ fakes.fake_get_available_languages)
+
+ request = wsgi.Request.blank('/')
+ accepted = 'zh'
+ request.headers = {'Accept-Language': accepted}
+ self.assertEqual(request.best_match_language(), 'zh_CN')
+
+ def test_secondary(self):
+ self.stubs.Set(i18n, 'get_available_languages',
+ fakes.fake_get_available_languages)
+
+ request = wsgi.Request.blank('/')
+ accepted = 'nn,en-gb;q=.5'
+ request.headers = {'Accept-Language': accepted}
+ self.assertEqual(request.best_match_language(), 'en_GB')
+
+ def test_none_found(self):
+ self.stubs.Set(i18n, 'get_available_languages',
+ fakes.fake_get_available_languages)
+
+ request = wsgi.Request.blank('/')
+ accepted = 'nb-no'
+ request.headers = {'Accept-Language': accepted}
+ self.assertIs(request.best_match_language(), None)
+
+ def test_no_lang_header(self):
+ self.stubs.Set(i18n, 'get_available_languages',
+ fakes.fake_get_available_languages)
+
+ request = wsgi.Request.blank('/')
+ accepted = ''
+ request.headers = {'Accept-Language': accepted}
+ self.assertIs(request.best_match_language(), None)
+
+
+class ActionDispatcherTest(test.NoDBTestCase):
+ def test_dispatch(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: 'pants'
+ self.assertEqual(serializer.dispatch({}, action='create'), 'pants')
+
+ def test_dispatch_action_None(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: 'pants'
+ serializer.default = lambda x: 'trousers'
+ self.assertEqual(serializer.dispatch({}, action=None), 'trousers')
+
+ def test_dispatch_default(self):
+ serializer = wsgi.ActionDispatcher()
+ serializer.create = lambda x: 'pants'
+ serializer.default = lambda x: 'trousers'
+ self.assertEqual(serializer.dispatch({}, action='update'), 'trousers')
+
+
+class DictSerializerTest(test.NoDBTestCase):
+ def test_dispatch_default(self):
+ serializer = wsgi.DictSerializer()
+ self.assertEqual(serializer.serialize({}, 'update'), '')
+
+
+class XMLDictSerializerTest(test.NoDBTestCase):
+ def test_xml(self):
+ input_dict = dict(servers=dict(a=(2, 3)))
+ expected_xml = '<serversxmlns="asdf"><a>(2,3)</a></servers>'
+ serializer = wsgi.XMLDictSerializer(xmlns="asdf")
+ result = serializer.serialize(input_dict)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
+
+ def test_xml_contains_unicode(self):
+ input_dict = dict(test=u'\u89e3\u7801')
+ expected_xml = '<test>\xe8\xa7\xa3\xe7\xa0\x81</test>'
+ serializer = wsgi.XMLDictSerializer()
+ result = serializer.serialize(input_dict)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(expected_xml, result)
+
+
+class JSONDictSerializerTest(test.NoDBTestCase):
+ def test_json(self):
+ input_dict = dict(servers=dict(a=(2, 3)))
+ expected_json = '{"servers":{"a":[2,3]}}'
+ serializer = wsgi.JSONDictSerializer()
+ result = serializer.serialize(input_dict)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_json)
+
+
+class TextDeserializerTest(test.NoDBTestCase):
+ def test_dispatch_default(self):
+ deserializer = wsgi.TextDeserializer()
+ self.assertEqual(deserializer.deserialize({}, 'update'), {})
+
+
+class JSONDeserializerTest(test.NoDBTestCase):
+ def test_json(self):
+ data = """{"a": {
+ "a1": "1",
+ "a2": "2",
+ "bs": ["1", "2", "3", {"c": {"c1": "1"}}],
+ "d": {"e": "1"},
+ "f": "1"}}"""
+ as_dict = {
+ 'body': {
+ 'a': {
+ 'a1': '1',
+ 'a2': '2',
+ 'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
+ 'd': {'e': '1'},
+ 'f': '1',
+ },
+ },
+ }
+ deserializer = wsgi.JSONDeserializer()
+ self.assertEqual(deserializer.deserialize(data), as_dict)
+
+ def test_json_valid_utf8(self):
+ data = """{"server": {"min_count": 1, "flavorRef": "1",
+ "name": "\xe6\xa6\x82\xe5\xbf\xb5",
+ "imageRef": "10bab10c-1304-47d",
+ "max_count": 1}} """
+ as_dict = {
+ 'body': {
+ u'server': {
+ u'min_count': 1, u'flavorRef': u'1',
+ u'name': u'\u6982\u5ff5',
+ u'imageRef': u'10bab10c-1304-47d',
+ u'max_count': 1
+ }
+ }
+ }
+ deserializer = wsgi.JSONDeserializer()
+ self.assertEqual(deserializer.deserialize(data), as_dict)
+
+ def test_json_invalid_utf8(self):
+ """Send invalid utf-8 to JSONDeserializer."""
+ data = """{"server": {"min_count": 1, "flavorRef": "1",
+ "name": "\xf0\x28\x8c\x28",
+ "imageRef": "10bab10c-1304-47d",
+ "max_count": 1}} """
+
+ deserializer = wsgi.JSONDeserializer()
+ self.assertRaises(exception.MalformedRequestBody,
+ deserializer.deserialize, data)
+
+
+class XMLDeserializerTest(test.NoDBTestCase):
+ def test_xml(self):
+ xml = """
+ <a a1="1" a2="2">
+ <bs><b>1</b><b>2</b><b>3</b><b><c c1="1"/></b></bs>
+ <d><e>1</e></d>
+ <f>1</f>
+ </a>
+ """.strip()
+ as_dict = {
+ 'body': {
+ 'a': {
+ 'a1': '1',
+ 'a2': '2',
+ 'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
+ 'd': {'e': '1'},
+ 'f': '1',
+ },
+ },
+ }
+ metadata = {'plurals': {'bs': 'b', 'ts': 't'}}
+ deserializer = wsgi.XMLDeserializer(metadata=metadata)
+ self.assertEqual(deserializer.deserialize(xml), as_dict)
+
+ def test_xml_empty(self):
+ xml = '<a></a>'
+ as_dict = {"body": {"a": {}}}
+ deserializer = wsgi.XMLDeserializer()
+ self.assertEqual(deserializer.deserialize(xml), as_dict)
+
+ def test_xml_valid_utf8(self):
+ xml = """ <a><name>\xe6\xa6\x82\xe5\xbf\xb5</name></a> """
+ deserializer = wsgi.XMLDeserializer()
+ as_dict = {'body': {u'a': {u'name': u'\u6982\u5ff5'}}}
+ self.assertEqual(deserializer.deserialize(xml), as_dict)
+
+ def test_xml_invalid_utf8(self):
+ """Send invalid utf-8 to XMLDeserializer."""
+ xml = """ <a><name>\xf0\x28\x8c\x28</name></a> """
+ deserializer = wsgi.XMLDeserializer()
+ self.assertRaises(exception.MalformedRequestBody,
+ deserializer.deserialize, xml)
+
+
+class ResourceTest(test.NoDBTestCase):
+
+ def get_req_id_header_name(self, request):
+ header_name = 'x-openstack-request-id'
+ if utils.get_api_version(request) < 3:
+ header_name = 'x-compute-request-id'
+
+ return header_name
+
+ def test_resource_call_with_method_get(self):
+ class Controller(object):
+ def index(self, req):
+ return 'success'
+
+ app = fakes.TestRouter(Controller())
+ # the default method is GET
+ req = webob.Request.blank('/tests')
+ response = req.get_response(app)
+ self.assertEqual(response.body, 'success')
+ self.assertEqual(response.status_int, 200)
+ req.body = '{"body": {"key": "value"}}'
+ response = req.get_response(app)
+ self.assertEqual(response.body, 'success')
+ self.assertEqual(response.status_int, 200)
+ req.content_type = 'application/json'
+ response = req.get_response(app)
+ self.assertEqual(response.body, 'success')
+ self.assertEqual(response.status_int, 200)
+
+ def test_resource_call_with_method_post(self):
+ class Controller(object):
+ @extensions.expected_errors(400)
+ def create(self, req, body):
+ if expected_body != body:
+ msg = "The request body invalid"
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+ return "success"
+ # verify the method: POST
+ app = fakes.TestRouter(Controller())
+ req = webob.Request.blank('/tests', method="POST",
+ content_type='application/json')
+ req.body = '{"body": {"key": "value"}}'
+ expected_body = {'body': {
+ "key": "value"
+ }
+ }
+ response = req.get_response(app)
+ self.assertEqual(response.status_int, 200)
+ self.assertEqual(response.body, 'success')
+ # verify without body
+ expected_body = None
+ req.body = None
+ response = req.get_response(app)
+ self.assertEqual(response.status_int, 200)
+ self.assertEqual(response.body, 'success')
+ # the body is validated in the controller
+ expected_body = {'body': None}
+ response = req.get_response(app)
+ expected_unsupported_type_body = ('{"badRequest": '
+ '{"message": "The request body invalid", "code": 400}}')
+ self.assertEqual(response.status_int, 400)
+ self.assertEqual(expected_unsupported_type_body, response.body)
+
+ def test_resource_call_with_method_put(self):
+ class Controller(object):
+ def update(self, req, id, body):
+ if expected_body != body:
+ msg = "The request body invalid"
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+ return "success"
+ # verify the method: PUT
+ app = fakes.TestRouter(Controller())
+ req = webob.Request.blank('/tests/test_id', method="PUT",
+ content_type='application/json')
+ req.body = '{"body": {"key": "value"}}'
+ expected_body = {'body': {
+ "key": "value"
+ }
+ }
+ response = req.get_response(app)
+ self.assertEqual(response.body, 'success')
+ self.assertEqual(response.status_int, 200)
+ req.body = None
+ expected_body = None
+ response = req.get_response(app)
+ self.assertEqual(response.status_int, 200)
+ # verify no content_type is contained in the request
+ req.content_type = None
+ req.body = '{"body": {"key": "value"}}'
+ response = req.get_response(app)
+ expected_unsupported_type_body = ('{"badRequest": '
+ '{"message": "Unsupported Content-Type", "code": 400}}')
+ self.assertEqual(response.status_int, 400)
+ self.assertEqual(expected_unsupported_type_body, response.body)
+
+ def test_resource_call_with_method_delete(self):
+ class Controller(object):
+ def delete(self, req, id):
+ return "success"
+
+ # verify the method: DELETE
+ app = fakes.TestRouter(Controller())
+ req = webob.Request.blank('/tests/test_id', method="DELETE")
+ response = req.get_response(app)
+ self.assertEqual(response.status_int, 200)
+ self.assertEqual(response.body, 'success')
+ # ignore the body
+ req.body = '{"body": {"key": "value"}}'
+ response = req.get_response(app)
+ self.assertEqual(response.status_int, 200)
+ self.assertEqual(response.body, 'success')
+
+ def test_resource_not_authorized(self):
+ class Controller(object):
+ def index(self, req):
+ raise exception.Forbidden()
+
+ req = webob.Request.blank('/tests')
+ app = fakes.TestRouter(Controller())
+ response = req.get_response(app)
+ self.assertEqual(response.status_int, 403)
+
+ def test_dispatch(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ method, extensions = resource.get_method(None, 'index', None, '')
+ actual = resource.dispatch(method, None, {'pants': 'off'})
+ expected = 'off'
+ self.assertEqual(actual, expected)
+
+ def test_get_method_unknown_controller_method(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ self.assertRaises(AttributeError, resource.get_method,
+ None, 'create', None, '')
+
+ def test_get_method_action_json(self):
+ class Controller(wsgi.Controller):
+ @wsgi.action('fooAction')
+ def _action_foo(self, req, id, body):
+ return body
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ method, extensions = resource.get_method(None, 'action',
+ 'application/json',
+ '{"fooAction": true}')
+ self.assertEqual(controller._action_foo, method)
+
+ def test_get_method_action_xml(self):
+ class Controller(wsgi.Controller):
+ @wsgi.action('fooAction')
+ def _action_foo(self, req, id, body):
+ return body
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ method, extensions = resource.get_method(None, 'action',
+ 'application/xml',
+ '<fooAction>true</fooAction>')
+ self.assertEqual(controller._action_foo, method)
+
+ def test_get_method_action_corrupt_xml(self):
+ class Controller(wsgi.Controller):
+ @wsgi.action('fooAction')
+ def _action_foo(self, req, id, body):
+ return body
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ self.assertRaises(
+ exception.MalformedRequestBody,
+ resource.get_method,
+ None, 'action',
+ 'application/xml',
+ utils.killer_xml_body())
+
+ def test_get_method_action_bad_body(self):
+ class Controller(wsgi.Controller):
+ @wsgi.action('fooAction')
+ def _action_foo(self, req, id, body):
+ return body
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ self.assertRaises(exception.MalformedRequestBody, resource.get_method,
+ None, 'action', 'application/json', '{}')
+
+ def test_get_method_unknown_controller_action(self):
+ class Controller(wsgi.Controller):
+ @wsgi.action('fooAction')
+ def _action_foo(self, req, id, body):
+ return body
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ self.assertRaises(KeyError, resource.get_method,
+ None, 'action', 'application/json',
+ '{"barAction": true}')
+
+ def test_get_method_action_method(self):
+ class Controller():
+ def action(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ method, extensions = resource.get_method(None, 'action',
+ 'application/xml',
+ '<fooAction>true</fooAction')
+ self.assertEqual(controller.action, method)
+
+ def test_get_action_args(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ env = {
+ 'wsgiorg.routing_args': [None, {
+ 'controller': None,
+ 'format': None,
+ 'action': 'update',
+ 'id': 12,
+ }],
+ }
+
+ expected = {'action': 'update', 'id': 12}
+
+ self.assertEqual(resource.get_action_args(env), expected)
+
+ def test_get_body_bad_content(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ request = wsgi.Request.blank('/', method='POST')
+ request.headers['Content-Type'] = 'application/none'
+ request.body = 'foo'
+
+ content_type, body = resource.get_body(request)
+ self.assertIsNone(content_type)
+ self.assertEqual(body, '')
+
+ def test_get_body_no_content_type(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ request = wsgi.Request.blank('/', method='POST')
+ request.body = 'foo'
+
+ content_type, body = resource.get_body(request)
+ self.assertIsNone(content_type)
+ self.assertEqual(body, 'foo')
+
+ def test_get_body_no_content_body(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ request = wsgi.Request.blank('/', method='POST')
+ request.headers['Content-Type'] = 'application/json'
+ request.body = ''
+
+ content_type, body = resource.get_body(request)
+ self.assertEqual('application/json', content_type)
+ self.assertEqual(body, '')
+
+ def test_get_body(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ request = wsgi.Request.blank('/', method='POST')
+ request.headers['Content-Type'] = 'application/json'
+ request.body = 'foo'
+
+ content_type, body = resource.get_body(request)
+ self.assertEqual(content_type, 'application/json')
+ self.assertEqual(body, 'foo')
+
+ def test_get_request_id_with_dict_response_body(self):
+ class Controller(wsgi.Controller):
+ def index(self, req):
+ return {'foo': 'bar'}
+
+ req = fakes.HTTPRequest.blank('/tests')
+ app = fakes.TestRouter(Controller())
+ response = req.get_response(app)
+ self.assertIn('nova.context', req.environ)
+ self.assertEqual(response.body, '{"foo": "bar"}')
+ self.assertEqual(response.status_int, 200)
+
+ def test_no_request_id_with_str_response_body(self):
+ class Controller(wsgi.Controller):
+ def index(self, req):
+ return 'foo'
+
+ req = fakes.HTTPRequest.blank('/tests')
+ app = fakes.TestRouter(Controller())
+ response = req.get_response(app)
+ # NOTE(alaski): This test is really to ensure that a str response
+ # doesn't error. Not having a request_id header is a side effect of
+ # our wsgi setup, ideally it would be there.
+ expected_header = self.get_req_id_header_name(req)
+ self.assertFalse(hasattr(response.headers, expected_header))
+ self.assertEqual(response.body, 'foo')
+ self.assertEqual(response.status_int, 200)
+
+ def test_get_request_id_no_response_body(self):
+ class Controller(object):
+ def index(self, req):
+ pass
+
+ req = fakes.HTTPRequest.blank('/tests')
+ app = fakes.TestRouter(Controller())
+ response = req.get_response(app)
+ self.assertIn('nova.context', req.environ)
+ self.assertEqual(response.body, '')
+ self.assertEqual(response.status_int, 200)
+
+ def test_deserialize_badtype(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ self.assertRaises(exception.InvalidContentType,
+ resource.deserialize,
+ controller.index, 'application/none', 'foo')
+
+ def test_deserialize_default(self):
+ class JSONDeserializer(object):
+ def deserialize(self, body):
+ return 'json'
+
+ class XMLDeserializer(object):
+ def deserialize(self, body):
+ return 'xml'
+
+ class Controller(object):
+ @wsgi.deserializers(xml=XMLDeserializer)
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller, json=JSONDeserializer)
+
+ obj = resource.deserialize(controller.index, 'application/json', 'foo')
+ self.assertEqual(obj, 'json')
+
+ def test_deserialize_decorator(self):
+ class JSONDeserializer(object):
+ def deserialize(self, body):
+ return 'json'
+
+ class XMLDeserializer(object):
+ def deserialize(self, body):
+ return 'xml'
+
+ class Controller(object):
+ @wsgi.deserializers(xml=XMLDeserializer)
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller, json=JSONDeserializer)
+
+ obj = resource.deserialize(controller.index, 'application/xml', 'foo')
+ self.assertEqual(obj, 'xml')
+
+ def test_register_actions(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ class ControllerExtended(wsgi.Controller):
+ @wsgi.action('fooAction')
+ def _action_foo(self, req, id, body):
+ return body
+
+ @wsgi.action('barAction')
+ def _action_bar(self, req, id, body):
+ return body
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ self.assertEqual({}, resource.wsgi_actions)
+
+ extended = ControllerExtended()
+ resource.register_actions(extended)
+ self.assertEqual({
+ 'fooAction': extended._action_foo,
+ 'barAction': extended._action_bar,
+ }, resource.wsgi_actions)
+
+ def test_register_extensions(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ class ControllerExtended(wsgi.Controller):
+ @wsgi.extends
+ def index(self, req, resp_obj, pants=None):
+ return None
+
+ @wsgi.extends(action='fooAction')
+ def _action_foo(self, req, resp, id, body):
+ return None
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+ self.assertEqual({}, resource.wsgi_extensions)
+ self.assertEqual({}, resource.wsgi_action_extensions)
+
+ extended = ControllerExtended()
+ resource.register_extensions(extended)
+ self.assertEqual({'index': [extended.index]}, resource.wsgi_extensions)
+ self.assertEqual({'fooAction': [extended._action_foo]},
+ resource.wsgi_action_extensions)
+
+ def test_get_method_extensions(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ class ControllerExtended(wsgi.Controller):
+ @wsgi.extends
+ def index(self, req, resp_obj, pants=None):
+ return None
+
+ controller = Controller()
+ extended = ControllerExtended()
+ resource = wsgi.Resource(controller)
+ resource.register_extensions(extended)
+ method, extensions = resource.get_method(None, 'index', None, '')
+ self.assertEqual(method, controller.index)
+ self.assertEqual(extensions, [extended.index])
+
+ def test_get_method_action_extensions(self):
+ class Controller(wsgi.Controller):
+ def index(self, req, pants=None):
+ return pants
+
+ @wsgi.action('fooAction')
+ def _action_foo(self, req, id, body):
+ return body
+
+ class ControllerExtended(wsgi.Controller):
+ @wsgi.extends(action='fooAction')
+ def _action_foo(self, req, resp_obj, id, body):
+ return None
+
+ controller = Controller()
+ extended = ControllerExtended()
+ resource = wsgi.Resource(controller)
+ resource.register_extensions(extended)
+ method, extensions = resource.get_method(None, 'action',
+ 'application/json',
+ '{"fooAction": true}')
+ self.assertEqual(method, controller._action_foo)
+ self.assertEqual(extensions, [extended._action_foo])
+
+ def test_get_method_action_whitelist_extensions(self):
+ class Controller(wsgi.Controller):
+ def index(self, req, pants=None):
+ return pants
+
+ class ControllerExtended(wsgi.Controller):
+ @wsgi.action('create')
+ def _create(self, req, body):
+ pass
+
+ @wsgi.action('delete')
+ def _delete(self, req, id):
+ pass
+
+ controller = Controller()
+ extended = ControllerExtended()
+ resource = wsgi.Resource(controller)
+ resource.register_actions(extended)
+
+ method, extensions = resource.get_method(None, 'create',
+ 'application/json',
+ '{"create": true}')
+ self.assertEqual(method, extended._create)
+ self.assertEqual(extensions, [])
+
+ method, extensions = resource.get_method(None, 'delete', None, None)
+ self.assertEqual(method, extended._delete)
+ self.assertEqual(extensions, [])
+
+ def test_pre_process_extensions_regular(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ called = []
+
+ def extension1(req, resp_obj):
+ called.append(1)
+ return None
+
+ def extension2(req, resp_obj):
+ called.append(2)
+ return None
+
+ extensions = [extension1, extension2]
+ response, post = resource.pre_process_extensions(extensions, None, {})
+ self.assertEqual(called, [])
+ self.assertIsNone(response)
+ self.assertEqual(list(post), [extension2, extension1])
+
+ def test_pre_process_extensions_generator(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ called = []
+
+ def extension1(req):
+ called.append('pre1')
+ yield
+ called.append('post1')
+
+ def extension2(req):
+ called.append('pre2')
+ yield
+ called.append('post2')
+
+ extensions = [extension1, extension2]
+ response, post = resource.pre_process_extensions(extensions, None, {})
+ post = list(post)
+ self.assertEqual(called, ['pre1', 'pre2'])
+ self.assertIsNone(response)
+ self.assertEqual(len(post), 2)
+ self.assertTrue(inspect.isgenerator(post[0]))
+ self.assertTrue(inspect.isgenerator(post[1]))
+
+ for gen in post:
+ try:
+ gen.send(None)
+ except StopIteration:
+ continue
+
+ self.assertEqual(called, ['pre1', 'pre2', 'post2', 'post1'])
+
+ def test_pre_process_extensions_generator_response(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ called = []
+
+ def extension1(req):
+ called.append('pre1')
+ yield 'foo'
+
+ def extension2(req):
+ called.append('pre2')
+
+ extensions = [extension1, extension2]
+ response, post = resource.pre_process_extensions(extensions, None, {})
+ self.assertEqual(called, ['pre1'])
+ self.assertEqual(response, 'foo')
+ self.assertEqual(post, [])
+
+ def test_post_process_extensions_regular(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ called = []
+
+ def extension1(req, resp_obj):
+ called.append(1)
+ return None
+
+ def extension2(req, resp_obj):
+ called.append(2)
+ return None
+
+ response = resource.post_process_extensions([extension2, extension1],
+ None, None, {})
+ self.assertEqual(called, [2, 1])
+ self.assertIsNone(response)
+
+ def test_post_process_extensions_regular_response(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ called = []
+
+ def extension1(req, resp_obj):
+ called.append(1)
+ return None
+
+ def extension2(req, resp_obj):
+ called.append(2)
+ return 'foo'
+
+ response = resource.post_process_extensions([extension2, extension1],
+ None, None, {})
+ self.assertEqual(called, [2])
+ self.assertEqual(response, 'foo')
+
+ def test_post_process_extensions_generator(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ called = []
+
+ def extension1(req):
+ yield
+ called.append(1)
+
+ def extension2(req):
+ yield
+ called.append(2)
+
+ ext1 = extension1(None)
+ ext1.next()
+ ext2 = extension2(None)
+ ext2.next()
+
+ response = resource.post_process_extensions([ext2, ext1],
+ None, None, {})
+
+ self.assertEqual(called, [2, 1])
+ self.assertIsNone(response)
+
+ def test_post_process_extensions_generator_response(self):
+ class Controller(object):
+ def index(self, req, pants=None):
+ return pants
+
+ controller = Controller()
+ resource = wsgi.Resource(controller)
+
+ called = []
+
+ def extension1(req):
+ yield
+ called.append(1)
+
+ def extension2(req):
+ yield
+ called.append(2)
+ yield 'foo'
+
+ ext1 = extension1(None)
+ ext1.next()
+ ext2 = extension2(None)
+ ext2.next()
+
+ response = resource.post_process_extensions([ext2, ext1],
+ None, None, {})
+
+ self.assertEqual(called, [2])
+ self.assertEqual(response, 'foo')
+
+ def test_resource_exception_handler_type_error(self):
+ # A TypeError should be translated to a Fault/HTTP 400.
+ def foo(a,):
+ return a
+
+ try:
+ with wsgi.ResourceExceptionHandler():
+ foo() # generate a TypeError
+ self.fail("Should have raised a Fault (HTTP 400)")
+ except wsgi.Fault as fault:
+ self.assertEqual(400, fault.status_int)
+
+ def test_resource_headers_are_utf8(self):
+ resp = webob.Response(status_int=202)
+ resp.headers['x-header1'] = 1
+ resp.headers['x-header2'] = u'header2'
+ resp.headers['x-header3'] = u'header3'
+
+ class Controller(object):
+ def index(self, req):
+ return resp
+
+ req = webob.Request.blank('/tests')
+ app = fakes.TestRouter(Controller())
+ response = req.get_response(app)
+
+ for hdr, val in response.headers.iteritems():
+ # All headers must be utf8
+ self.assertIsInstance(hdr, str)
+ self.assertIsInstance(val, str)
+ self.assertEqual(response.headers['x-header1'], '1')
+ self.assertEqual(response.headers['x-header2'], 'header2')
+ self.assertEqual(response.headers['x-header3'], 'header3')
+
+ def test_resource_valid_utf8_body(self):
+ class Controller(object):
+ def update(self, req, id, body):
+ return body
+
+ req = webob.Request.blank('/tests/test_id', method="PUT")
+ body = """ {"name": "\xe6\xa6\x82\xe5\xbf\xb5" } """
+ expected_body = '{"name": "\\u6982\\u5ff5"}'
+ req.body = body
+ req.headers['Content-Type'] = 'application/json'
+ app = fakes.TestRouter(Controller())
+ response = req.get_response(app)
+ self.assertEqual(response.body, expected_body)
+ self.assertEqual(response.status_int, 200)
+
+ def test_resource_invalid_utf8(self):
+ class Controller(object):
+ def update(self, req, id, body):
+ return body
+
+ req = webob.Request.blank('/tests/test_id', method="PUT")
+ body = """ {"name": "\xf0\x28\x8c\x28" } """
+ req.body = body
+ req.headers['Content-Type'] = 'application/json'
+ app = fakes.TestRouter(Controller())
+ self.assertRaises(UnicodeDecodeError, req.get_response, app)
+
+
+class ResponseObjectTest(test.NoDBTestCase):
+ def test_default_code(self):
+ robj = wsgi.ResponseObject({})
+ self.assertEqual(robj.code, 200)
+
+ def test_modified_code(self):
+ robj = wsgi.ResponseObject({})
+ robj._default_code = 202
+ self.assertEqual(robj.code, 202)
+
+ def test_override_default_code(self):
+ robj = wsgi.ResponseObject({}, code=404)
+ self.assertEqual(robj.code, 404)
+
+ def test_override_modified_code(self):
+ robj = wsgi.ResponseObject({}, code=404)
+ robj._default_code = 202
+ self.assertEqual(robj.code, 404)
+
+ def test_set_header(self):
+ robj = wsgi.ResponseObject({})
+ robj['Header'] = 'foo'
+ self.assertEqual(robj.headers, {'header': 'foo'})
+
+ def test_get_header(self):
+ robj = wsgi.ResponseObject({})
+ robj['Header'] = 'foo'
+ self.assertEqual(robj['hEADER'], 'foo')
+
+ def test_del_header(self):
+ robj = wsgi.ResponseObject({})
+ robj['Header'] = 'foo'
+ del robj['hEADER']
+ self.assertNotIn('header', robj.headers)
+
+ def test_header_isolation(self):
+ robj = wsgi.ResponseObject({})
+ robj['Header'] = 'foo'
+ hdrs = robj.headers
+ hdrs['hEADER'] = 'bar'
+ self.assertEqual(robj['hEADER'], 'foo')
+
+ def test_default_serializers(self):
+ robj = wsgi.ResponseObject({})
+ self.assertEqual(robj.serializers, {})
+
+ def test_bind_serializers(self):
+ robj = wsgi.ResponseObject({}, json='foo')
+ robj._bind_method_serializers(dict(xml='bar', json='baz'))
+ self.assertEqual(robj.serializers, dict(xml='bar', json='foo'))
+
+ def test_get_serializer(self):
+ robj = wsgi.ResponseObject({}, json='json', xml='xml', atom='atom')
+ for content_type, mtype in wsgi._MEDIA_TYPE_MAP.items():
+ _mtype, serializer = robj.get_serializer(content_type)
+ self.assertEqual(serializer, mtype)
+
+ def test_get_serializer_defaults(self):
+ robj = wsgi.ResponseObject({})
+ default_serializers = dict(json='json', xml='xml', atom='atom')
+ for content_type, mtype in wsgi._MEDIA_TYPE_MAP.items():
+ self.assertRaises(exception.InvalidContentType,
+ robj.get_serializer, content_type)
+ _mtype, serializer = robj.get_serializer(content_type,
+ default_serializers)
+ self.assertEqual(serializer, mtype)
+
+ def test_serialize(self):
+ class JSONSerializer(object):
+ def serialize(self, obj):
+ return 'json'
+
+ class XMLSerializer(object):
+ def serialize(self, obj):
+ return 'xml'
+
+ class AtomSerializer(object):
+ def serialize(self, obj):
+ return 'atom'
+
+ robj = wsgi.ResponseObject({}, code=202,
+ json=JSONSerializer,
+ xml=XMLSerializer,
+ atom=AtomSerializer)
+ robj['X-header1'] = 'header1'
+ robj['X-header2'] = 'header2'
+ robj['X-header3'] = 3
+ robj['X-header-unicode'] = u'header-unicode'
+
+ for content_type, mtype in wsgi._MEDIA_TYPE_MAP.items():
+ request = wsgi.Request.blank('/tests/123')
+ response = robj.serialize(request, content_type)
+
+ self.assertEqual(response.headers['Content-Type'], content_type)
+ for hdr, val in response.headers.iteritems():
+ # All headers must be utf8
+ self.assertIsInstance(hdr, str)
+ self.assertIsInstance(val, str)
+ self.assertEqual(response.headers['X-header1'], 'header1')
+ self.assertEqual(response.headers['X-header2'], 'header2')
+ self.assertEqual(response.headers['X-header3'], '3')
+ self.assertEqual(response.status_int, 202)
+ self.assertEqual(response.body, mtype)
+
+
+class ValidBodyTest(test.NoDBTestCase):
+
+ def setUp(self):
+ super(ValidBodyTest, self).setUp()
+ self.controller = wsgi.Controller()
+
+ def test_is_valid_body(self):
+ body = {'foo': {}}
+ self.assertTrue(self.controller.is_valid_body(body, 'foo'))
+
+ def test_is_valid_body_none(self):
+ wsgi.Resource(controller=None)
+ self.assertFalse(self.controller.is_valid_body(None, 'foo'))
+
+ def test_is_valid_body_empty(self):
+ wsgi.Resource(controller=None)
+ self.assertFalse(self.controller.is_valid_body({}, 'foo'))
+
+ def test_is_valid_body_no_entity(self):
+ wsgi.Resource(controller=None)
+ body = {'bar': {}}
+ self.assertFalse(self.controller.is_valid_body(body, 'foo'))
+
+ def test_is_valid_body_malformed_entity(self):
+ wsgi.Resource(controller=None)
+ body = {'foo': 'bar'}
+ self.assertFalse(self.controller.is_valid_body(body, 'foo'))