diff options
author | Chris McDonough <chrism@plope.com> | 2011-03-20 23:25:37 -0400 |
---|---|---|
committer | Chris McDonough <chrism@plope.com> | 2011-03-20 23:25:37 -0400 |
commit | d8176e7279e19ab3e618056555a05d710bb161d4 (patch) | |
tree | 43717841fd1326d22a4aaea2d1a17c1c0ce72fcb /tests/test_request.py | |
parent | 1c4912af221e89ecf117154e2708f58984287ad8 (diff) | |
parent | c3f021fbf2fd027b0778ca9dadc79ae7b2a5394b (diff) | |
download | webob-tests.pycon2011.tar.gz |
merge from headtests.pycon2011
Diffstat (limited to 'tests/test_request.py')
-rw-r--r-- | tests/test_request.py | 3081 |
1 files changed, 2400 insertions, 681 deletions
diff --git a/tests/test_request.py b/tests/test_request.py index ae7bad8..301480e 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -1,14 +1,2271 @@ -from webob import Request, BaseRequest -from webob.request import ( - NoDefault, AdhocAttrMixin, environ_from_url, environ_add_POST -) -from webtest import TestApp -from nose.tools import eq_, ok_, assert_raises, assert_false -from cStringIO import StringIO -import string -import cgi +import unittest + +_marker = object() + +class BaseRequestTests(unittest.TestCase): + + def _getTargetClass(self): + from webob import BaseRequest + return BaseRequest + + def _makeOne(self, environ, environ_getter=None, charset=_marker, + unicode_errors=_marker, decode_param_names=_marker, **kw): + from webob.request import NoDefault + if charset is _marker: + charset = NoDefault + if unicode_errors is _marker: + unicode_errors = NoDefault + if decode_param_names is _marker: + decode_param_names = NoDefault + return self._getTargetClass()(environ, environ_getter, charset, + unicode_errors, decode_param_names, **kw) + + def _makeStringIO(self, text): + try: + from io import BytesIO + except ImportError: # Python < 2.6 + from StringIO import StringIO as BytesIO + return BytesIO(text) + + def test_ctor_environ_getter_raises_WTF(self): + # This API should be changed. + self.assertRaises(ValueError, + self._makeOne, {}, environ_getter=object()) + + def test_ctor_wo_environ_raises_WTF(self): + # This API should be changed. + self.assertRaises(TypeError, self._makeOne, None) + + def test_ctor_w_environ(self): + environ = {} + req = self._makeOne(environ) + self.assertEqual(req.environ, environ) + + def test_body_file_getter(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT} + req = self._makeOne(environ) + self.assert_(req.body_file is INPUT) + + def test_body_file_setter_w_string(self): + BEFORE = self._makeStringIO('before') + AFTER = str('AFTER') + environ = {'wsgi.input': BEFORE, + 'CONTENT_LENGTH': len('before'), + } + req = self._makeOne(environ) + req.body_file = AFTER + self.assertEqual(req.body_file.getvalue(), AFTER) + self.assertEqual(req.content_length, len(AFTER)) + + def test_body_file_setter_non_string(self): + BEFORE = self._makeStringIO('before') + AFTER = self._makeStringIO('after') + environ = {'wsgi.input': BEFORE, + 'CONTENT_LENGTH': len('before'), + } + req = self._makeOne(environ) + req.body_file = AFTER + self.assert_(req.body_file is AFTER) + self.assertEqual(req.content_length, None) + + def test_body_file_deleter(self): + INPUT = self._makeStringIO('before') + environ = {'wsgi.input': INPUT, + 'CONTENT_LENGTH': len('before'), + } + req = self._makeOne(environ) + del req.body_file + self.assertEqual(req.body_file.getvalue(), '') + self.assertEqual(req.content_length, 0) + + def test_body_file_raw(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + self.assert_(req.body_file_raw is INPUT) + + def test_body_file_seekable_input_not_seekable(self): + INPUT = self._makeStringIO('input') + INPUT.seek(1, 0) # consume + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': False, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + seekable = req.body_file_seekable + self.assert_(seekable is not INPUT) + self.assertEqual(seekable.getvalue(), 'nput') + + def test_body_file_seekable_input_is_seekable(self): + INPUT = self._makeStringIO('input') + INPUT.seek(1, 0) # consume + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + seekable = req.body_file_seekable + self.assert_(seekable is INPUT) + + def test_scheme(self): + environ = {'wsgi.url_scheme': 'something:', + } + req = self._makeOne(environ) + self.assertEqual(req.scheme, 'something:') + + def test_method(self): + environ = {'REQUEST_METHOD': 'OPTIONS', + } + req = self._makeOne(environ) + self.assertEqual(req.method, 'OPTIONS') + + def test_http_version(self): + environ = {'SERVER_PROTOCOL': '1.1', + } + req = self._makeOne(environ) + self.assertEqual(req.http_version, '1.1') + + def test_script_name(self): + environ = {'SCRIPT_NAME': '/script', + } + req = self._makeOne(environ) + self.assertEqual(req.script_name, '/script') + + def test_path_info(self): + environ = {'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.path_info, '/path/info') + + def test_content_length_getter(self): + environ = {'CONTENT_LENGTH': '1234', + } + req = self._makeOne(environ) + self.assertEqual(req.content_length, 1234) + + def test_content_length_setter_w_str(self): + environ = {'CONTENT_LENGTH': '1234', + } + req = self._makeOne(environ) + req.content_length = '3456' + self.assertEqual(req.content_length, 3456) + + def test_remote_user(self): + environ = {'REMOTE_USER': 'phred', + } + req = self._makeOne(environ) + self.assertEqual(req.remote_user, 'phred') + + def test_remote_addr(self): + environ = {'REMOTE_ADDR': '1.2.3.4', + } + req = self._makeOne(environ) + self.assertEqual(req.remote_addr, '1.2.3.4') + + def test_query_string(self): + environ = {'QUERY_STRING': 'foo=bar&baz=bam', + } + req = self._makeOne(environ) + self.assertEqual(req.query_string, 'foo=bar&baz=bam') + + def test_server_name(self): + environ = {'SERVER_NAME': 'somehost.tld', + } + req = self._makeOne(environ) + self.assertEqual(req.server_name, 'somehost.tld') + + def test_server_port_getter(self): + environ = {'SERVER_PORT': '6666', + } + req = self._makeOne(environ) + self.assertEqual(req.server_port, 6666) + + def test_server_port_setter_with_string(self): + environ = {'SERVER_PORT': '6666', + } + req = self._makeOne(environ) + req.server_port = '6667' + self.assertEqual(req.server_port, 6667) + + def test_uscript_name(self): + environ = {'SCRIPT_NAME': '/script', + } + req = self._makeOne(environ) + self.assert_(isinstance(req.uscript_name, unicode)) + self.assertEqual(req.uscript_name, '/script') + + def test_upath_info(self): + environ = {'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assert_(isinstance(req.upath_info, unicode)) + self.assertEqual(req.upath_info, '/path/info') + + def test_content_type_getter_no_parameters(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar', + } + req = self._makeOne(environ) + self.assertEqual(req.content_type, 'application/xml+foobar') + + def test_content_type_getter_w_parameters(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar;charset="utf8"', + } + req = self._makeOne(environ) + self.assertEqual(req.content_type, 'application/xml+foobar') + + def test_content_type_setter_w_None(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar;charset="utf8"', + } + req = self._makeOne(environ) + req.content_type = None + self.assertEqual(req.content_type, '') + self.assert_('CONTENT_TYPE' not in environ) + + def test_content_type_setter_existing_paramter_no_new_paramter(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar;charset="utf8"', + } + req = self._makeOne(environ) + req.content_type = 'text/xml' + self.assertEqual(req.content_type, 'text/xml') + self.assertEqual(environ['CONTENT_TYPE'], 'text/xml;charset="utf8"') + + def test_content_type_deleter_clears_environ_value(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar;charset="utf8"', + } + req = self._makeOne(environ) + del req.content_type + self.assertEqual(req.content_type, '') + self.assert_('CONTENT_TYPE' not in environ) + + def test_content_type_deleter_no_environ_value(self): + environ = {} + req = self._makeOne(environ) + del req.content_type + self.assertEqual(req.content_type, '') + self.assert_('CONTENT_TYPE' not in environ) + + def test_charset_getter_cache_hit(self): + CT = 'application/xml+foobar' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req._charset_cache = (CT, 'cp1252') + self.assertEqual(req.charset, 'cp1252') + + def test_charset_getter_cache_miss_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + self.assertEqual(req.charset, 'utf8') + self.assertEqual(req._charset_cache, (CT, 'utf8')) + + def test_charset_getter_cache_miss_wo_parameter(self): + CT = 'application/xml+foobar' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + self.assertEqual(req.charset, 'UTF-8') + self.assertEqual(req._charset_cache, (CT, 'UTF-8')) + + def test_charset_setter_None_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req.charset = None + self.assertEqual(environ['CONTENT_TYPE'], 'application/xml+foobar') + self.assertEqual(req.charset, 'UTF-8') + + def test_charset_setter_empty_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req.charset = '' + self.assertEqual(environ['CONTENT_TYPE'], 'application/xml+foobar') + self.assertEqual(req.charset, 'UTF-8') + + def test_charset_setter_nonempty_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req.charset = 'cp1252' + self.assertEqual(environ['CONTENT_TYPE'], + #'application/xml+foobar; charset="cp1252"') WTF? + 'application/xml+foobar;charset=cp1252', + ) + self.assertEqual(req.charset, 'cp1252') + + def test_charset_setter_nonempty_wo_parameter(self): + CT = 'application/xml+foobar' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req.charset = 'cp1252' + self.assertEqual(environ['CONTENT_TYPE'], + 'application/xml+foobar; charset="cp1252"', + #'application/xml+foobar;charset=cp1252', WTF? + ) + self.assertEqual(req.charset, 'cp1252') + + def test_charset_deleter_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + del req.charset + self.assertEqual(environ['CONTENT_TYPE'], 'application/xml+foobar') + self.assertEqual(req.charset, 'UTF-8') + + def test_headers_getter_miss(self): + CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CONTENT_TYPE, + 'CONTENT_LENGTH': '123', + } + req = self._makeOne(environ) + headers = req.headers + self.assertEqual(headers, + {'Content-Type': CONTENT_TYPE, + 'Content-Length': '123'}) + self.assertEqual(req._headers, headers) + + def test_headers_getter_hit(self): + CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CONTENT_TYPE, + 'CONTENT_LENGTH': '123', + } + req = self._makeOne(environ) + req._headers = {'Foo': 'Bar'} + self.assertEqual(req.headers, + {'Foo': 'Bar'}) + + def test_headers_setter(self): + CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CONTENT_TYPE, + 'CONTENT_LENGTH': '123', + } + req = self._makeOne(environ) + req._headers = {'Foo': 'Bar'} + req.headers = {'Qux': 'Spam'} + self.assertEqual(req.headers, + {'Qux': 'Spam'}) + + def test_no_headers_deleter(self): + CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CONTENT_TYPE, + 'CONTENT_LENGTH': '123', + } + req = self._makeOne(environ) + def _test(): + del req.headers + self.assertRaises(AttributeError, _test) + + def test_host_url_w_http_host_and_no_port(self): + environ = {'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'example.com', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'http://example.com') + + def test_host_url_w_http_host_and_standard_port(self): + environ = {'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'example.com:80', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'http://example.com') + + def test_host_url_w_http_host_and_oddball_port(self): + environ = {'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'example.com:8888', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'http://example.com:8888') + + def test_host_url_w_http_host_https_and_no_port(self): + environ = {'wsgi.url_scheme': 'https', + 'HTTP_HOST': 'example.com', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'https://example.com') + + def test_host_url_w_http_host_https_and_standard_port(self): + environ = {'wsgi.url_scheme': 'https', + 'HTTP_HOST': 'example.com:443', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'https://example.com') + + def test_host_url_w_http_host_https_and_oddball_port(self): + environ = {'wsgi.url_scheme': 'https', + 'HTTP_HOST': 'example.com:4333', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'https://example.com:4333') + + def test_host_url_wo_http_host(self): + environ = {'wsgi.url_scheme': 'https', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '4333', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'https://example.com:4333') + + def test_application_url(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + } + req = self._makeOne(environ) + self.assertEqual(req.application_url, 'http://example.com/script') + + def test_path_url(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.path_url, 'http://example.com/script/path/info') + + def test_path(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.path, '/script/path/info') + + def test_path_qs_no_qs(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.path_qs, '/script/path/info') + + def test_path_qs_w_qs(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.path_qs, '/script/path/info?foo=bar&baz=bam') + + def test_url_no_qs(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.url, 'http://example.com/script/path/info') + + def test_url_w_qs(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.url, + 'http://example.com/script/path/info?foo=bar&baz=bam') + + def test_relative_url_to_app_true_wo_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.relative_url('other/page', True), + 'http://example.com/script/other/page') + + def test_relative_url_to_app_true_w_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.relative_url('/other/page', True), + 'http://example.com/other/page') + + def test_relative_url_to_app_false_other_w_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.relative_url('/other/page', False), + 'http://example.com/other/page') + + def test_relative_url_to_app_false_other_wo_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.relative_url('other/page', False), + 'http://example.com/script/path/other/page') + + def test_path_info_pop_empty(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '', + } + req = self._makeOne(environ) + popped = req.path_info_pop() + self.assertEqual(popped, None) + self.assertEqual(environ['SCRIPT_NAME'], '/script') + + def test_path_info_pop_just_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/', + } + req = self._makeOne(environ) + popped = req.path_info_pop() + self.assertEqual(popped, '') + self.assertEqual(environ['SCRIPT_NAME'], '/script/') + self.assertEqual(environ['PATH_INFO'], '') + + def test_path_info_pop_non_empty_no_pattern(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + popped = req.path_info_pop() + self.assertEqual(popped, 'path') + self.assertEqual(environ['SCRIPT_NAME'], '/script/path') + self.assertEqual(environ['PATH_INFO'], '/info') + + def test_path_info_pop_non_empty_w_pattern_miss(self): + import re + PATTERN = re.compile('miss') + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + popped = req.path_info_pop(PATTERN) + self.assertEqual(popped, None) + self.assertEqual(environ['SCRIPT_NAME'], '/script') + self.assertEqual(environ['PATH_INFO'], '/path/info') + + def test_path_info_pop_non_empty_w_pattern_hit(self): + import re + PATTERN = re.compile('path') + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + popped = req.path_info_pop(PATTERN) + self.assertEqual(popped, 'path') + self.assertEqual(environ['SCRIPT_NAME'], '/script/path') + self.assertEqual(environ['PATH_INFO'], '/info') + + def test_path_info_pop_skips_empty_elements(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '//path/info', + } + req = self._makeOne(environ) + popped = req.path_info_pop() + self.assertEqual(popped, 'path') + self.assertEqual(environ['SCRIPT_NAME'], '/script//path') + self.assertEqual(environ['PATH_INFO'], '/info') + + def test_path_info_peek_empty(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '', + } + req = self._makeOne(environ) + peeked = req.path_info_peek() + self.assertEqual(peeked, None) + self.assertEqual(environ['SCRIPT_NAME'], '/script') + self.assertEqual(environ['PATH_INFO'], '') + + def test_path_info_peek_just_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/', + } + req = self._makeOne(environ) + peeked = req.path_info_peek() + self.assertEqual(peeked, '') + self.assertEqual(environ['SCRIPT_NAME'], '/script') + self.assertEqual(environ['PATH_INFO'], '/') + + def test_path_info_peek_non_empty(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path', + } + req = self._makeOne(environ) + peeked = req.path_info_peek() + self.assertEqual(peeked, 'path') + self.assertEqual(environ['SCRIPT_NAME'], '/script') + self.assertEqual(environ['PATH_INFO'], '/path') + + def test_urlvars_getter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + self.assertEqual(req.urlvars, {'foo': 'bar'}) + + def test_urlvars_getter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': ((), {'foo': 'bar'}), + } + req = self._makeOne(environ) + self.assertEqual(req.urlvars, {'foo': 'bar'}) + + def test_urlvars_getter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + self.assertEqual(req.urlvars, {}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) + + def test_urlvars_setter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + req.urlvars = {'baz': 'bam'} + self.assertEqual(req.urlvars, {'baz': 'bam'}) + self.assertEqual(environ['paste.urlvars'], {'baz': 'bam'}) + self.assert_('wsgiorg.routing_args' not in environ) + + def test_urlvars_setter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': ((), {'foo': 'bar'}), + 'paste.urlvars': {'qux': 'spam'}, + } + req = self._makeOne(environ) + req.urlvars = {'baz': 'bam'} + self.assertEqual(req.urlvars, {'baz': 'bam'}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {'baz': 'bam'})) + self.assert_('paste.urlvars' not in environ) + + def test_urlvars_setter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + req.urlvars = {'baz': 'bam'} + self.assertEqual(req.urlvars, {'baz': 'bam'}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {'baz': 'bam'})) + self.assert_('paste.urlvars' not in environ) + + def test_urlvars_deleter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + del req.urlvars + self.assertEqual(req.urlvars, {}) + self.assert_('paste.urlvars' not in environ) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) + + def test_urlvars_deleter_w_wsgiorg_key_non_empty_tuple(self): + environ = {'wsgiorg.routing_args': (('a', 'b'), {'foo': 'bar'}), + 'paste.urlvars': {'qux': 'spam'}, + } + req = self._makeOne(environ) + del req.urlvars + self.assertEqual(req.urlvars, {}) + self.assertEqual(environ['wsgiorg.routing_args'], (('a', 'b'), {})) + self.assert_('paste.urlvars' not in environ) + + def test_urlvars_deleter_w_wsgiorg_key_empty_tuple(self): + environ = {'wsgiorg.routing_args': ((), {'foo': 'bar'}), + 'paste.urlvars': {'qux': 'spam'}, + } + req = self._makeOne(environ) + del req.urlvars + self.assertEqual(req.urlvars, {}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) + self.assert_('paste.urlvars' not in environ) + + def test_urlvars_deleter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + del req.urlvars + self.assertEqual(req.urlvars, {}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) + self.assert_('paste.urlvars' not in environ) + + def test_urlargs_getter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + self.assertEqual(req.urlargs, ()) + + def test_urlargs_getter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': (('a', 'b'), {'foo': 'bar'}), + } + req = self._makeOne(environ) + self.assertEqual(req.urlargs, ('a', 'b')) + + def test_urlargs_getter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + self.assertEqual(req.urlargs, ()) + self.assert_('wsgiorg.routing_args' not in environ) + + def test_urlargs_setter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + req.urlargs = ('a', 'b') + self.assertEqual(req.urlargs, ('a', 'b')) + self.assertEqual(environ['wsgiorg.routing_args'], + (('a', 'b'), {'foo': 'bar'})) + self.assert_('paste.urlvars' not in environ) + + def test_urlargs_setter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': ((), {'foo': 'bar'}), + } + req = self._makeOne(environ) + req.urlargs = ('a', 'b') + self.assertEqual(req.urlargs, ('a', 'b')) + self.assertEqual(environ['wsgiorg.routing_args'], + (('a', 'b'), {'foo': 'bar'})) + + def test_urlargs_setter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + req.urlargs = ('a', 'b') + self.assertEqual(req.urlargs, ('a', 'b')) + self.assertEqual(environ['wsgiorg.routing_args'], + (('a', 'b'), {})) + self.assert_('paste.urlvars' not in environ) + + def test_urlargs_deleter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': (('a', 'b'), {'foo': 'bar'}), + } + req = self._makeOne(environ) + del req.urlargs + self.assertEqual(req.urlargs, ()) + self.assertEqual(environ['wsgiorg.routing_args'], + ((), {'foo': 'bar'})) + + def test_urlargs_deleter_w_wsgiorg_key_empty(self): + environ = {'wsgiorg.routing_args': ((), {}), + } + req = self._makeOne(environ) + del req.urlargs + self.assertEqual(req.urlargs, ()) + self.assert_('paste.urlvars' not in environ) + self.assert_('wsgiorg.routing_args' not in environ) + + def test_urlargs_deleter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + del req.urlargs + self.assertEqual(req.urlargs, ()) + self.assert_('paste.urlvars' not in environ) + self.assert_('wsgiorg.routing_args' not in environ) + + def test_str_cookies_empty_environ(self): + req = self._makeOne({}) + self.assertEqual(req.str_cookies, {}) + + def test_str_cookies_w_webob_parsed_cookies_matching_source(self): + environ = { + 'HTTP_COOKIE': 'a=b', + 'webob._parsed_cookies': ('a=b', {'a': 'b'}), + } + req = self._makeOne(environ) + self.assertEqual(req.str_cookies, {'a': 'b'}) + + def test_str_cookies_w_webob_parsed_cookies_mismatched_source(self): + environ = { + 'HTTP_COOKIE': 'a=b', + 'webob._parsed_cookies': ('a=b;c=d', {'a': 'b', 'c': 'd'}), + } + req = self._makeOne(environ) + self.assertEqual(req.str_cookies, {'a': 'b'}) + + def test_is_xhr_no_header(self): + req = self._makeOne({}) + self.assert_(not req.is_xhr) + + def test_is_xhr_header_miss(self): + environ = {'HTTP_X_REQUESTED_WITH': 'notAnXMLHTTPRequest'} + req = self._makeOne(environ) + self.assert_(not req.is_xhr) + + def test_is_xhr_header_hit(self): + environ = {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'} + req = self._makeOne(environ) + self.assert_(req.is_xhr) + + # host + def test_host_getter_w_HTTP_HOST(self): + environ = {'HTTP_HOST': 'example.com:8888'} + req = self._makeOne(environ) + self.assertEqual(req.host, 'example.com:8888') + + def test_host_getter_wo_HTTP_HOST(self): + environ = {'SERVER_NAME': 'example.com', + 'SERVER_PORT': '8888'} + req = self._makeOne(environ) + self.assertEqual(req.host, 'example.com:8888') + + def test_host_setter(self): + environ = {} + req = self._makeOne(environ) + req.host = 'example.com:8888' + self.assertEqual(environ['HTTP_HOST'], 'example.com:8888') + + def test_host_deleter_hit(self): + environ = {'HTTP_HOST': 'example.com:8888'} + req = self._makeOne(environ) + del req.host + self.assert_('HTTP_HOST' not in environ) + + def test_host_deleter_miss(self): + environ = {} + req = self._makeOne(environ) + del req.host # doesn't raise + + # body + def test_body_getter(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + self.assertEqual(req.body, 'input') + self.assertEqual(req.content_length, len('input')) + def test_body_setter_None(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + req.body = None + self.assertEqual(req.body, '') + self.assertEqual(req.content_length, 0) + self.assert_(req.is_body_seekable) + def test_body_setter_non_string_raises(self): + req = self._makeOne({}) + def _test(): + req.body = object() + self.assertRaises(TypeError, _test) + def test_body_setter_value(self): + BEFORE = self._makeStringIO('before') + environ = {'wsgi.input': BEFORE, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('before'), + } + req = self._makeOne(environ) + req.body = 'after' + self.assertEqual(req.body, 'after') + self.assertEqual(req.content_length, len('after')) + self.assert_(req.is_body_seekable) + def test_body_deleter_None(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + del req.body + self.assertEqual(req.body, '') + self.assertEqual(req.content_length, 0) + self.assert_(req.is_body_seekable) + + def test_str_POST_not_POST_or_PUT(self): + from webob.multidict import NoVars + environ = {'REQUEST_METHOD': 'GET', + } + req = self._makeOne(environ) + result = req.str_POST + self.assert_(isinstance(result, NoVars)) + self.assert_(result.reason.startswith('Not a form request')) + + def test_str_POST_existing_cache_hit(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'REQUEST_METHOD': 'POST', + 'webob._parsed_post_vars': ({'foo': 'bar'}, INPUT), + } + req = self._makeOne(environ) + result = req.str_POST + self.assertEqual(result, {'foo': 'bar'}) + + def test_str_PUT_missing_content_type(self): + from webob.multidict import NoVars + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'REQUEST_METHOD': 'PUT', + } + req = self._makeOne(environ) + result = req.str_POST + self.assert_(isinstance(result, NoVars)) + self.assert_(result.reason.startswith('Not an HTML form submission')) + + def test_str_PUT_bad_content_type(self): + from webob.multidict import NoVars + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'REQUEST_METHOD': 'PUT', + 'CONTENT_TYPE': 'text/plain', + } + req = self._makeOne(environ) + result = req.str_POST + self.assert_(isinstance(result, NoVars)) + self.assert_(result.reason.startswith('Not an HTML form submission')) + + def test_str_POST_multipart(self): + BODY_TEXT = ( + '------------------------------deb95b63e42a\n' + 'Content-Disposition: form-data; name="foo"\n' + '\n' + 'foo\n' + '------------------------------deb95b63e42a\n' + 'Content-Disposition: form-data; name="bar"; filename="bar.txt"\n' + 'Content-type: application/octet-stream\n' + '\n' + 'these are the contents of the file "bar.txt"\n' + '\n' + '------------------------------deb95b63e42a--\n') + INPUT = self._makeStringIO(BODY_TEXT) + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data; ' + 'boundary=----------------------------deb95b63e42a', + 'CONTENT_LENGTH': len(BODY_TEXT), + } + req = self._makeOne(environ) + result = req.str_POST + self.assertEqual(result['foo'], 'foo') + bar = result['bar'] + self.assertEqual(bar.name, 'bar') + self.assertEqual(bar.filename, 'bar.txt') + self.assertEqual(bar.file.read(), + 'these are the contents of the file "bar.txt"\n') + + # POST + # str_GET + def test_str_GET_reflects_query_string(self): + environ = { + 'QUERY_STRING': 'foo=123', + } + req = self._makeOne(environ) + result = req.str_GET + self.assertEqual(result, {'foo': '123'}) + req.query_string = 'foo=456' + result = req.str_GET + self.assertEqual(result, {'foo': '456'}) + req.query_string = '' + result = req.str_GET + self.assertEqual(result, {}) + + def test_str_GET_updates_query_string(self): + environ = { + } + req = self._makeOne(environ) + result = req.query_string + self.assertEqual(result, '') + req.str_GET['foo'] = '123' + result = req.query_string + self.assertEqual(result, 'foo=123') + del req.str_GET['foo'] + result = req.query_string + self.assertEqual(result, '') + + # GET + # str_postvars + # postvars + # str_queryvars + # queryvars + # is_xhr + # str_params + # params + + def test_str_cookies_wo_webob_parsed_cookies(self): + from webob import Request + environ = { + 'HTTP_COOKIE': 'a=b', + } + req = Request.blank('/', environ) + self.assertEqual(req.str_cookies, {'a': 'b'}) + + # cookies + # copy + + def test_copy_get(self): + from webob import Request + environ = { + 'HTTP_COOKIE': 'a=b', + } + req = Request.blank('/', environ) + clone = req.copy_get() + for k, v in req.environ.items(): + if k in ('CONTENT_LENGTH', 'webob.is_body_seekable'): + self.assert_(k not in clone.environ) + elif k == 'wsgi.input': + self.assert_(clone.environ[k] is not v) + else: + self.assertEqual(clone.environ[k], v) + + def test_remove_conditional_headers_accept_encoding(self): + from webob import Request + req = Request.blank('/') + req.accept_encoding='gzip,deflate' + req.remove_conditional_headers() + self.assertEqual(bool(req.accept_encoding), False) + + def test_remove_conditional_headers_if_modified_since(self): + from datetime import datetime + from webob import Request, UTC + req = Request.blank('/') + req.if_modified_since = datetime(2006, 1, 1, 12, 0, tzinfo=UTC) + req.remove_conditional_headers() + self.assertEqual(req.if_modified_since, None) + + def test_remove_conditional_headers_if_none_match(self): + from webob import Request + req = Request.blank('/') + req.if_none_match = 'foo, bar' + req.remove_conditional_headers() + self.assertEqual(bool(req.if_none_match), False) + + def test_remove_conditional_headers_if_range(self): + from webob import Request + req = Request.blank('/') + req.if_range = 'foo, bar' + req.remove_conditional_headers() + self.assertEqual(bool(req.if_range), False) + + def test_remove_conditional_headers_range(self): + from webob import Request + req = Request.blank('/') + req.range = 'bytes=0-100' + req.remove_conditional_headers() + self.assertEqual(req.range, None) + + + # is_body_seekable + # make_body_seekable + # copy_body + # make_tempfile + # remove_conditional_headers + # accept + # accept_charset + # accept_encoding + # accept_language + # authorization + + # cache_control + def test_cache_control_reflects_environ(self): + environ = { + 'HTTP_CACHE_CONTROL': 'max-age=5', + } + req = self._makeOne(environ) + result = req.cache_control + self.assertEqual(result.properties, {'max-age': 5}) + req.environ.update(HTTP_CACHE_CONTROL='max-age=10') + result = req.cache_control + self.assertEqual(result.properties, {'max-age': 10}) + req.environ.update(HTTP_CACHE_CONTROL='') + result = req.cache_control + self.assertEqual(result.properties, {}) + + def test_cache_control_updates_environ(self): + environ = {} + req = self._makeOne(environ) + req.cache_control.max_age = 5 + result = req.environ['HTTP_CACHE_CONTROL'] + self.assertEqual(result, 'max-age=5') + req.cache_control.max_age = 10 + result = req.environ['HTTP_CACHE_CONTROL'] + self.assertEqual(result, 'max-age=10') + req.cache_control = None + result = req.environ['HTTP_CACHE_CONTROL'] + self.assertEqual(result, '') + del req.cache_control + self.assert_('HTTP_CACHE_CONTROL' not in req.environ) + + def test_cache_control_set_dict(self): + environ = {} + req = self._makeOne(environ) + req.cache_control = {'max-age': 5} + result = req.cache_control + self.assertEqual(result.max_age, 5) + + def test_cache_control_set_object(self): + from webob.cachecontrol import CacheControl + environ = {} + req = self._makeOne(environ) + req.cache_control = CacheControl({'max-age': 5}, type='request') + result = req.cache_control + self.assertEqual(result.max_age, 5) + + def test_cache_control_gets_cached(self): + environ = {} + req = self._makeOne(environ) + self.assert_(req.cache_control is req.cache_control) + + #if_match + #if_none_match + + #date + #if_modified_since + #if_unmodified_since + #if_range + #max_forwards + #pragma + #range + #referer + #referrer + #user_agent + #__repr__ + #__str__ + #from_file + + #call_application + def test_call_application_calls_application(self): + environ = {} + req = self._makeOne(environ) + def application(environ, start_response): + start_response('200 OK', [('content-type', 'text/plain')]) + return ['...\n'] + status, headers, output = req.call_application(application) + self.assertEqual(status, '200 OK') + self.assertEqual(headers, [('content-type', 'text/plain')]) + self.assertEqual(''.join(output), '...\n') + + def test_call_application_provides_write(self): + environ = {} + req = self._makeOne(environ) + def application(environ, start_response): + write = start_response('200 OK', [('content-type', 'text/plain')]) + write('...\n') + return [] + status, headers, output = req.call_application(application) + self.assertEqual(status, '200 OK') + self.assertEqual(headers, [('content-type', 'text/plain')]) + self.assertEqual(''.join(output), '...\n') + + def test_call_application_closes_iterable_when_mixed_with_write_calls(self): + environ = { + 'test._call_application_called_close': False + } + req = self._makeOne(environ) + def application(environ, start_response): + write = start_response('200 OK', [('content-type', 'text/plain')]) + class AppIter(object): + def __iter__(self): + yield '...\n' + def close(self): + environ['test._call_application_called_close'] = True + write('...\n') + return AppIter() + status, headers, output = req.call_application(application) + self.assertEqual(''.join(output), '...\n...\n') + self.assertEqual(environ['test._call_application_called_close'], True) + + def test_call_application_raises_exc_info(self): + environ = {} + req = self._makeOne(environ) + def application(environ, start_response): + try: + raise RuntimeError('OH NOES') + except: + import sys + exc_info = sys.exc_info() + start_response('200 OK', [('content-type', 'text/plain')], exc_info) + return ['...\n'] + self.assertRaises(RuntimeError, req.call_application, application) + + def test_call_application_returns_exc_info(self): + environ = {} + req = self._makeOne(environ) + def application(environ, start_response): + try: + raise RuntimeError('OH NOES') + except: + import sys + exc_info = sys.exc_info() + start_response('200 OK', [('content-type', 'text/plain')], exc_info) + return ['...\n'] + status, headers, output, exc_info = req.call_application(application, True) + self.assertEqual(status, '200 OK') + self.assertEqual(headers, [('content-type', 'text/plain')]) + self.assertEqual(''.join(output), '...\n') + self.assertEqual(exc_info[0], RuntimeError) + + #get_response + #blank + + #from_string + def test_from_string_extra_data(self): + from webob import BaseRequest + _test_req_copy = _test_req.replace('Content-Type', + 'Content-Length: 337\r\nContent-Type') + self.assertRaises(ValueError, BaseRequest.from_string, + _test_req_copy+'EXTRA!') + + #as_string + def test_as_string_skip_body(self): + from webob import BaseRequest + req = BaseRequest.from_string(_test_req) + body = req.as_string(skip_body=True) + self.assertEqual(body.count('\r\n\r\n'), 0) + self.assertEqual(req.as_string(skip_body=337), req.as_string()) + body = req.as_string(337-1).split('\r\n\r\n', 1)[1] + self.assertEqual(body, '<body skipped (len=337)>') + + def test_adhoc_attrs_set(self): + from webob import Request + req = Request.blank('/') + req.foo = 1 + self.assertEqual(req.environ['webob.adhoc_attrs'], {'foo': 1}) + + def test_adhoc_attrs_set_nonadhoc(self): + from webob import Request + req = Request.blank('/', environ={'webob.adhoc_attrs':{}}) + req.request_body_tempfile_limit = 1 + self.assertEqual(req.environ['webob.adhoc_attrs'], {}) + + def test_adhoc_attrs_get(self): + from webob import Request + req = Request.blank('/', environ={'webob.adhoc_attrs': {'foo': 1}}) + self.assertEqual(req.foo, 1) + + def test_adhoc_attrs_get_missing(self): + from webob import Request + req = Request.blank('/') + self.assertRaises(AttributeError, getattr, req, 'some_attr') + + def test_adhoc_attrs_del(self): + from webob import Request + req = Request.blank('/', environ={'webob.adhoc_attrs': {'foo': 1}}) + del req.foo + self.assertEqual(req.environ['webob.adhoc_attrs'], {}) + + def test_adhoc_attrs_del_missing(self): + from webob import Request + req = Request.blank('/') + self.assertRaises(AttributeError, delattr, req, 'some_attr') + + # TODO: webob/request.py:1143 + # the only known way to reach this line is by experimentation + + +class RequestTests_functional(unittest.TestCase): + + def test_gets(self): + from webtest import TestApp + app = TestApp(simpleapp) + res = app.get('/') + self.assert_('Hello' in res) + self.assert_("get is GET([])" in res) + self.assert_("post is <NoVars: Not a form request>" in res) + + res = app.get('/?name=george') + res.mustcontain("get is GET([('name', 'george')])") + res.mustcontain("Val is george") + + def test_language_parsing(self): + from webtest import TestApp + app = TestApp(simpleapp) + res = app.get('/') + self.assert_("The languages are: ['en-US']" in res) + + res = app.get('/', + headers={'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'}) + self.assert_("languages are: ['da', 'en-gb', 'en-US']" in res) + + res = app.get('/', + headers={'Accept-Language': 'en-gb;q=0.8, da, en;q=0.7'}) + self.assert_("languages are: ['da', 'en-gb', 'en-US']" in res) + + def test_mime_parsing(self): + from webtest import TestApp + app = TestApp(simpleapp) + res = app.get('/', headers={'Accept':'text/html'}) + self.assert_("accepttypes is: text/html" in res) + + res = app.get('/', headers={'Accept':'application/xml'}) + self.assert_("accepttypes is: application/xml" in res) + + res = app.get('/', headers={'Accept':'application/xml,*/*'}) + self.assert_("accepttypes is: application/xml" in res) + + def test_accept_best_match(self): + from webob import Request + self.assert_(not Request.blank('/').accept) + self.assert_(not Request.blank('/', headers={'Accept': ''}).accept) + req = Request.blank('/', headers={'Accept':'text/plain'}) + self.assert_(req.accept) + self.assertRaises(ValueError, req.accept.best_match, ['*/*']) + req = Request.blank('/', accept=['*/*','text/*']) + self.assertEqual( + req.accept.best_match(['application/x-foo', 'text/plain']), + 'text/plain') + self.assertEqual( + req.accept.best_match(['text/plain', 'application/x-foo']), + 'text/plain') + req = Request.blank('/', accept=['text/plain', 'message/*']) + self.assertEqual( + req.accept.best_match(['message/x-foo', 'text/plain']), + 'text/plain') + self.assertEqual( + req.accept.best_match(['text/plain', 'message/x-foo']), + 'text/plain') + + def test_from_mimeparse(self): + # http://mimeparse.googlecode.com/svn/trunk/mimeparse.py + from webob import Request + supported = ['application/xbel+xml', 'application/xml'] + tests = [('application/xbel+xml', 'application/xbel+xml'), + ('application/xbel+xml; q=1', 'application/xbel+xml'), + ('application/xml; q=1', 'application/xml'), + ('application/*; q=1', 'application/xbel+xml'), + ('*/*', 'application/xbel+xml')] + + for accept, get in tests: + req = Request.blank('/', headers={'Accept':accept}) + self.assertEqual(req.accept.best_match(supported), get) + + supported = ['application/xbel+xml', 'text/xml'] + tests = [('text/*;q=0.5,*/*; q=0.1', 'text/xml'), + ('text/html,application/atom+xml; q=0.9', None)] + + for accept, get in tests: + req = Request.blank('/', headers={'Accept':accept}) + self.assertEqual(req.accept.best_match(supported), get) + + supported = ['application/json', 'text/html'] + tests = [ + ('application/json, text/javascript, */*', 'application/json'), + ('application/json, text/html;q=0.9', 'application/json'), + ] + + for accept, get in tests: + req = Request.blank('/', headers={'Accept':accept}) + self.assertEqual(req.accept.best_match(supported), get) + + offered = ['image/png', 'application/xml'] + tests = [ + ('image/png', 'image/png'), + ('image/*', 'image/png'), + ('image/*, application/xml', 'application/xml'), + ] + + for accept, get in tests: + req = Request.blank('/', accept=accept) + self.assertEqual(req.accept.best_match(offered), get) + + def test_headers(self): + from webtest import TestApp + app = TestApp(simpleapp) + headers = { + 'If-Modified-Since': 'Sat, 29 Oct 1994 19:43:31 GMT', + 'Cookie': 'var1=value1', + 'User-Agent': 'Mozilla 4.0 (compatible; MSIE)', + 'If-None-Match': '"etag001", "etag002"', + 'X-Requested-With': 'XMLHttpRequest', + } + res = app.get('/?foo=bar&baz', headers=headers) + res.mustcontain( + 'if_modified_since: ' + + 'datetime.datetime(1994, 10, 29, 19, 43, 31, tzinfo=UTC)', + "user_agent: 'Mozilla", + 'is_xhr: True', + "cookies is {'var1': 'value1'}", + "params is NestedMultiDict([('foo', 'bar'), ('baz', '')])", + "if_none_match: <ETag etag001 or etag002>", + ) + + def test_bad_cookie(self): + from webob import Request + req = Request.blank('/') + req.headers['Cookie'] = '070-it-:><?0' + self.assertEqual(req.cookies, {}) + req.headers['Cookie'] = 'foo=bar' + self.assertEqual(req.cookies, {'foo': 'bar'}) + req.headers['Cookie'] = '...' + self.assertEqual(req.cookies, {}) + req.headers['Cookie'] = '=foo' + self.assertEqual(req.cookies, {}) + req.headers['Cookie'] = ('dismiss-top=6; CP=null*; ' + 'PHPSESSID=0a539d42abc001cdc762809248d4beed; a=42') + self.assertEqual(req.cookies, { + 'CP': u'null*', + 'PHPSESSID': u'0a539d42abc001cdc762809248d4beed', + 'a': u'42', + 'dismiss-top': u'6' + }) + req.headers['Cookie'] = 'fo234{=bar blub=Blah' + self.assertEqual(req.cookies, {'blub': 'Blah'}) + + def test_cookie_quoting(self): + from webob import Request + req = Request.blank('/') + req.headers['Cookie'] = 'foo="?foo"; Path=/' + self.assertEqual(req.cookies, {'foo': '?foo'}) + + def test_path_quoting(self): + from webob import Request + path = '/:@&+$,/bar' + req = Request.blank(path) + self.assertEqual(req.path, path) + self.assert_(req.url.endswith(path)) + + def test_params(self): + from webob import Request + req = Request.blank('/?a=1&b=2') + req.method = 'POST' + req.body = 'b=3' + self.assertEqual(req.params.items(), + [('a', '1'), ('b', '2'), ('b', '3')]) + new_params = req.params.copy() + self.assertEqual(new_params.items(), + [('a', '1'), ('b', '2'), ('b', '3')]) + new_params['b'] = '4' + self.assertEqual(new_params.items(), [('a', '1'), ('b', '4')]) + # The key name is \u1000: + req = Request.blank('/?%E1%80%80=x', + decode_param_names=True, charset='UTF-8') + self.assert_(req.decode_param_names) + self.assert_(u'\u1000' in req.GET.keys()) + self.assertEqual(req.GET[u'\u1000'], 'x') + + def test_copy_body(self): + from webob import Request + req = Request.blank('/', method='POST', body='some text', + request_body_tempfile_limit=1) + old_body_file = req.body_file_raw + req.copy_body() + self.assert_(req.body_file_raw is not old_body_file) + req = Request.blank('/', method='POST', + body_file=UnseekableInput('0123456789'), content_length=10) + self.assert_(not hasattr(req.body_file_raw, 'seek')) + old_body_file = req.body_file_raw + req.make_body_seekable() + self.assert_(req.body_file_raw is not old_body_file) + self.assertEqual(req.body, '0123456789') + old_body_file = req.body_file + req.make_body_seekable() + self.assert_(req.body_file_raw is old_body_file) + self.assert_(req.body_file is old_body_file) + + def test_broken_seek(self): + # copy() should work even when the input has a broken seek method + from webob import Request + req = Request.blank('/', method='POST', + body_file=UnseekableInputWithSeek('0123456789'), + content_length=10) + self.assert_(hasattr(req.body_file_raw, 'seek')) + self.assertRaises(IOError, req.body_file_raw.seek, 0) + old_body_file = req.body_file + req2 = req.copy() + self.assert_(req2.body_file_raw is req2.body_file is not old_body_file) + self.assertEqual(req2.body, '0123456789') + + def test_set_body(self): + from webob import BaseRequest + req = BaseRequest.blank('/', body='foo') + self.assert_(req.is_body_seekable) + self.assertEqual(req.body, 'foo') + self.assertEqual(req.content_length, 3) + del req.body + self.assertEqual(req.body, '') + self.assertEqual(req.content_length, 0) + + def test_broken_clen_header(self): + # if the UA sends "content_length: ..' header (the name is wrong) + # it should not break the req.headers.items() + from webob import Request + req = Request.blank('/') + req.environ['HTTP_CONTENT_LENGTH'] = '0' + req.headers.items() + + def test_nonstr_keys(self): + # non-string env keys shouldn't break req.headers + from webob import Request + req = Request.blank('/') + req.environ[1] = 1 + req.headers.items() + + + def test_authorization(self): + from webob import Request + req = Request.blank('/') + req.authorization = 'Digest uri="/?a=b"' + self.assertEqual(req.authorization, ('Digest', {'uri': '/?a=b'})) + + def test_authorization2(self): + from webob.descriptors import parse_auth_params + for s, d in [ + ('x=y', {'x': 'y'}), + ('x="y"', {'x': 'y'}), + ('x=y,z=z', {'x': 'y', 'z': 'z'}), + ('x=y, z=z', {'x': 'y', 'z': 'z'}), + ('x="y",z=z', {'x': 'y', 'z': 'z'}), + ('x="y", z=z', {'x': 'y', 'z': 'z'}), + ('x="y,x", z=z', {'x': 'y,x', 'z': 'z'}), + ]: + self.assertEqual(parse_auth_params(s), d) + + + def test_from_file(self): + from webob import Request + req = Request.blank('http://example.com:8000/test.html?params') + self.equal_req(req) + + req = Request.blank('http://example.com/test2') + req.method = 'POST' + req.body = 'test=example' + self.equal_req(req) + + def test_req_kw_none_val(self): + from webob import Request + request = Request({}, content_length=None) + self.assert_('content-length' not in request.headers) + self.assert_('content-type' not in request.headers) + + def test_env_keys(self): + from webob import Request + req = Request.blank('/') + # SCRIPT_NAME can be missing + del req.environ['SCRIPT_NAME'] + self.assertEqual(req.script_name, '') + self.assertEqual(req.uscript_name, u'') + + def test_repr_nodefault(self): + from webob.request import NoDefault + nd = NoDefault + self.assertEqual(repr(nd), '(No Default)') + + def test_request_noenviron_param(self): + # Environ is a a mandatory not null param in Request. + from webob import Request + self.assertRaises(TypeError, Request, environ=None) + + def test_environ_getter(self): + # Parameter environ_getter in Request is no longer valid and + # should raise an error in case it's used + from webob import Request + class env(object): + def __init__(self, env): + self.env = env + def env_getter(self): + return self.env + self.assertRaises(ValueError, + Request, environ_getter=env({'a':1}).env_getter) + + def test_unicode_errors(self): + # Passing unicode_errors != NoDefault should assign value to + # dictionary['unicode_errors'], else not + from webob.request import NoDefault + from webob import Request + r = Request({'a':1}, unicode_errors='strict') + self.assert_('unicode_errors' in r.__dict__) + r = Request({'a':1}, unicode_errors=NoDefault) + self.assert_('unicode_errors' not in r.__dict__) + + def test_charset_deprecation(self): + # Any class that inherits from BaseRequest cannot define a + # default_charset attribute. + # Any class that inherits from BaseRequest cannot define a + # charset attr that is instance of str + from webob import BaseRequest + from webob.request import AdhocAttrMixin + class NewRequest(BaseRequest): + default_charset = 'utf-8' + def __init__(self, environ, **kw): + super(NewRequest, self).__init__(environ, **kw) + self.assertRaises(DeprecationWarning, NewRequest, {'a':1}) + class NewRequest(BaseRequest): + charset = 'utf-8' + def __init__(self, environ, **kw): + super(NewRequest, self).__init__(environ, **kw) + self.assertRaises(DeprecationWarning, NewRequest, {'a':1}) + class NewRequest(AdhocAttrMixin, BaseRequest): + default_charset = 'utf-8' + def __init__(self, environ, **kw): + super(NewRequest, self).__init__(environ, **kw) + self.assertRaises(DeprecationWarning, NewRequest, {'a':1}) + class NewRequest(AdhocAttrMixin, BaseRequest): + charset = 'utf-8' + def __init__(self, environ, **kw): + super(NewRequest, self).__init__(environ, **kw) + self.assertRaises(DeprecationWarning, NewRequest, {'a':1}) + + def test_unexpected_kw(self): + # Passed an attr in kw that does not exist in the class, should + # raise an error + # Passed an attr in kw that does exist in the class, should be ok + from webob import Request + self.assertRaises(TypeError, + Request, {'a':1}, this_does_not_exist=1) + r = Request({'a':1}, **{'charset':'utf-8', 'server_name':'127.0.0.1'}) + self.assertEqual(getattr(r, 'charset', None), 'utf-8') + self.assertEqual(getattr(r, 'server_name', None), '127.0.0.1') + + def test_body_file_setter(self): + # If body_file is passed and it's instance of str, we define + # environ['wsgi.input'] and content_length. Plus, while deleting the + # attribute, we should get '' and 0 respectively + from webob import Request + r = Request({'a':1}, **{'body_file':'hello world'}) + self.assertEqual(r.environ['wsgi.input'].getvalue(), 'hello world') + self.assertEqual(int(r.environ['CONTENT_LENGTH']), len('hello world')) + del r.body_file + self.assertEqual(r.environ['wsgi.input'].getvalue(), '') + self.assertEqual(int(r.environ['CONTENT_LENGTH']), 0) + + def test_conttype_set_del(self): + # Deleting content_type attr from a request should update the + # environ dict + # Assigning content_type should replace first option of the environ + # dict + from webob import Request + r = Request({'a':1}, **{'content_type':'text/html'}) + self.assert_('CONTENT_TYPE' in r.environ) + self.assert_(hasattr(r, 'content_type')) + del r.content_type + self.assert_('CONTENT_TYPE' not in r.environ) + a = Request({'a':1}, + content_type='charset=utf-8;application/atom+xml;type=entry') + self.assert_(a.environ['CONTENT_TYPE']== + 'charset=utf-8;application/atom+xml;type=entry') + a.content_type = 'charset=utf-8' + self.assert_(a.environ['CONTENT_TYPE']== + 'charset=utf-8;application/atom+xml;type=entry') + + def test_headers2(self): + # Setting headers in init and later with a property, should update + # the info + from webob import Request + headers = {'Host': 'www.example.com', + 'Accept-Language': 'en-us,en;q=0.5', + 'Accept-Encoding': 'gzip,deflate', + 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', + 'Keep-Alive': '115', + 'Connection': 'keep-alive', + 'Cache-Control': 'max-age=0'} + r = Request({'a':1}, headers=headers) + for i in headers.keys(): + self.assert_(i in r.headers and + 'HTTP_'+i.upper().replace('-', '_') in r.environ) + r.headers = {'Server':'Apache'} + self.assertEqual(r.environ.keys(), ['a', 'HTTP_SERVER']) + + def test_host_url(self): + # Request has a read only property host_url that combines several + # keys to create a host_url + from webob import Request + a = Request({'wsgi.url_scheme':'http'}, **{'host':'www.example.com'}) + self.assertEqual(a.host_url, 'http://www.example.com') + a = Request({'wsgi.url_scheme':'http'}, **{'server_name':'localhost', + 'server_port':5000}) + self.assertEqual(a.host_url, 'http://localhost:5000') + a = Request({'wsgi.url_scheme':'https'}, **{'server_name':'localhost', + 'server_port':443}) + self.assertEqual(a.host_url, 'https://localhost') + + def test_path_info_p(self): + # Peek path_info to see what's coming + # Pop path_info until there's nothing remaining + from webob import Request + a = Request({'a':1}, **{'path_info':'/foo/bar','script_name':''}) + self.assertEqual(a.path_info_peek(), 'foo') + self.assertEqual(a.path_info_pop(), 'foo') + self.assertEqual(a.path_info_peek(), 'bar') + self.assertEqual(a.path_info_pop(), 'bar') + self.assertEqual(a.path_info_peek(), None) + self.assertEqual(a.path_info_pop(), None) + + def test_urlvars_property(self): + # Testing urlvars setter/getter/deleter + from webob import Request + a = Request({'wsgiorg.routing_args':((),{'x':'y'}), + 'paste.urlvars':{'test':'value'}}) + a.urlvars = {'hello':'world'} + self.assert_('paste.urlvars' not in a.environ) + self.assertEqual(a.environ['wsgiorg.routing_args'], + ((), {'hello':'world'})) + del a.urlvars + self.assert_('wsgiorg.routing_args' not in a.environ) + a = Request({'paste.urlvars':{'test':'value'}}) + self.assertEqual(a.urlvars, {'test':'value'}) + a.urlvars = {'hello':'world'} + self.assertEqual(a.environ['paste.urlvars'], {'hello':'world'}) + del a.urlvars + self.assert_('paste.urlvars' not in a.environ) + + def test_urlargs_property(self): + # Testing urlargs setter/getter/deleter + from webob import Request + a = Request({'paste.urlvars':{'test':'value'}}) + self.assertEqual(a.urlargs, ()) + a.urlargs = {'hello':'world'} + self.assertEqual(a.environ['wsgiorg.routing_args'], + ({'hello':'world'}, {'test':'value'})) + a = Request({'a':1}) + a.urlargs = {'hello':'world'} + self.assertEqual(a.environ['wsgiorg.routing_args'], + ({'hello':'world'}, {})) + del a.urlargs + self.assert_('wsgiorg.routing_args' not in a.environ) + + def test_host_property(self): + # Testing host setter/getter/deleter + from webob import Request + a = Request({'wsgi.url_scheme':'http'}, server_name='localhost', + server_port=5000) + self.assertEqual(a.host, "localhost:5000") + a.host = "localhost:5000" + self.assert_('HTTP_HOST' in a.environ) + del a.host + self.assert_('HTTP_HOST' not in a.environ) + + def test_body_property(self): + # Testing body setter/getter/deleter plus making sure body has a + # seek method + #a = Request({'a':1}, **{'CONTENT_LENGTH':'?'}) + # I cannot think of a case where somebody would put anything else + # than a # numerical value in CONTENT_LENGTH, Google didn't help + # either + #self.assertEqual(a.body, '') + # I need to implement a not seekable stringio like object. + import string + from webob import Request + from webob import BaseRequest + from cStringIO import StringIO + class DummyIO(object): + def __init__(self, txt): + self.txt = txt + def read(self, n=-1): + return self.txt[0:n] + limit = BaseRequest.request_body_tempfile_limit + len_strl = limit // len(string.letters) + 1 + r = Request({'a':1}, body_file=DummyIO(string.letters * len_strl)) + self.assertEqual(len(r.body), len(string.letters*len_strl)-1) + self.assertRaises(TypeError, + setattr, r, 'body', unicode('hello world')) + r.body = None + self.assertEqual(r.body, '') + r = Request({'a':1}, body_file=DummyIO(string.letters)) + self.assert_(not hasattr(r.body_file_raw, 'seek')) + r.make_body_seekable() + self.assert_(hasattr(r.body_file_raw, 'seek')) + r = Request({'a':1}, body_file=StringIO(string.letters)) + self.assert_(hasattr(r.body_file_raw, 'seek')) + r.make_body_seekable() + self.assert_(hasattr(r.body_file_raw, 'seek')) + + def test_repr_invalid(self): + # If we have an invalid WSGI environ, the repr should tell us. + from webob import BaseRequest + req = BaseRequest({'CONTENT_LENGTH':'0', 'body':''}) + self.assert_(repr(req).endswith('(invalid WSGI environ)>')) + + def test_from_garbage_file(self): + # If we pass a file with garbage to from_file method it should + # raise an error plus missing bits in from_file method + from cStringIO import StringIO + from webob import BaseRequest + self.assertRaises(ValueError, + BaseRequest.from_file, StringIO('hello world')) + val_file = StringIO( + "GET /webob/ HTTP/1.1\n" + "Host: pythonpaste.org\n" + "User-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.13)" + "Gecko/20101206 Ubuntu/10.04 (lucid) Firefox/3.6.13\n" + "Accept: " + "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;" + "q=0.8\n" + "Accept-Language: en-us,en;q=0.5\n" + "Accept-Encoding: gzip,deflate\n" + "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" + # duplicate on purpose + "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" + "Keep-Alive: 115\n" + "Connection: keep-alive\n" + ) + req = BaseRequest.from_file(val_file) + self.assert_(isinstance(req, BaseRequest)) + self.assert_(not repr(req).endswith('(invalid WSGI environ)>')) + val_file = StringIO( + "GET /webob/ HTTP/1.1\n" + "Host pythonpaste.org\n" + ) + self.assertRaises(ValueError, BaseRequest.from_file, val_file) + + def test_from_string(self): + # A valid request without a Content-Length header should still read + # the full body. + # Also test parity between as_string and from_string / from_file. + import cgi + from webob import BaseRequest + req = BaseRequest.from_string(_test_req) + self.assert_(isinstance(req, BaseRequest)) + self.assert_(not repr(req).endswith('(invalid WSGI environ)>')) + self.assert_('\n' not in req.http_version or '\r' in req.http_version) + self.assert_(',' not in req.host) + self.assert_(req.content_length is not None) + self.assertEqual(req.content_length, 337) + self.assert_('foo' in req.body) + bar_contents = "these are the contents of the file 'bar.txt'\r\n" + self.assert_(bar_contents in req.body) + self.assertEqual(req.params['foo'], 'foo') + bar = req.params['bar'] + self.assert_(isinstance(bar, cgi.FieldStorage)) + self.assertEqual(bar.type, 'application/octet-stream') + bar.file.seek(0) + self.assertEqual(bar.file.read(), bar_contents) + # out should equal contents, except for the Content-Length header, + # so insert that. + _test_req_copy = _test_req.replace('Content-Type', + 'Content-Length: 337\r\nContent-Type') + self.assertEqual(str(req), _test_req_copy) + + req2 = BaseRequest.from_string(_test_req2) + self.assert_('host' not in req2.headers) + self.assertEqual(str(req2), _test_req2.rstrip()) + self.assertRaises(ValueError, + BaseRequest.from_string, _test_req2 + 'xx') + + def test_blank(self): + # BaseRequest.blank class method + from webob import BaseRequest + self.assertRaises(ValueError, BaseRequest.blank, + 'www.example.com/foo?hello=world', None, + 'www.example.com/foo?hello=world') + self.assertRaises(ValueError, BaseRequest.blank, + 'gopher.example.com/foo?hello=world', None, + 'gopher://gopher.example.com') + req = BaseRequest.blank('www.example.com/foo?hello=world', None, + 'http://www.example.com') + self.assertEqual(req.environ.get('HTTP_HOST', None), + 'www.example.com:80') + self.assertEqual(req.environ.get('PATH_INFO', None), + 'www.example.com/foo') + self.assertEqual(req.environ.get('QUERY_STRING', None), + 'hello=world') + self.assertEqual(req.environ.get('REQUEST_METHOD', None), 'GET') + req = BaseRequest.blank('www.example.com/secure?hello=world', None, + 'https://www.example.com/secure') + self.assertEqual(req.environ.get('HTTP_HOST', None), + 'www.example.com:443') + self.assertEqual(req.environ.get('PATH_INFO', None), + 'www.example.com/secure') + self.assertEqual(req.environ.get('QUERY_STRING', None), 'hello=world') + self.assertEqual(req.environ.get('REQUEST_METHOD', None), 'GET') + self.assertEqual(req.environ.get('SCRIPT_NAME', None), '/secure') + self.assertEqual(req.environ.get('SERVER_NAME', None), + 'www.example.com') + self.assertEqual(req.environ.get('SERVER_PORT', None), '443') + + def test_environ_from_url(self): + # Generating an environ just from an url plus testing environ_add_POST + from webob.request import environ_add_POST + from webob.request import environ_from_url + self.assertRaises(TypeError, environ_from_url, + 'http://www.example.com/foo?bar=baz#qux') + self.assertRaises(TypeError, environ_from_url, + 'gopher://gopher.example.com') + req = environ_from_url('http://www.example.com/foo?bar=baz') + self.assertEqual(req.get('HTTP_HOST', None), 'www.example.com:80') + self.assertEqual(req.get('PATH_INFO', None), '/foo') + self.assertEqual(req.get('QUERY_STRING', None), 'bar=baz') + self.assertEqual(req.get('REQUEST_METHOD', None), 'GET') + self.assertEqual(req.get('SCRIPT_NAME', None), '') + self.assertEqual(req.get('SERVER_NAME', None), 'www.example.com') + self.assertEqual(req.get('SERVER_PORT', None), '80') + req = environ_from_url('https://www.example.com/foo?bar=baz') + self.assertEqual(req.get('HTTP_HOST', None), 'www.example.com:443') + self.assertEqual(req.get('PATH_INFO', None), '/foo') + self.assertEqual(req.get('QUERY_STRING', None), 'bar=baz') + self.assertEqual(req.get('REQUEST_METHOD', None), 'GET') + self.assertEqual(req.get('SCRIPT_NAME', None), '') + self.assertEqual(req.get('SERVER_NAME', None), 'www.example.com') + self.assertEqual(req.get('SERVER_PORT', None), '443') + environ_add_POST(req, None) + self.assert_('CONTENT_TYPE' not in req) + self.assert_('CONTENT_LENGTH' not in req) + environ_add_POST(req, {'hello':'world'}) + self.assert_(req.get('HTTP_HOST', None), 'www.example.com:443') + self.assertEqual(req.get('PATH_INFO', None), '/foo') + self.assertEqual(req.get('QUERY_STRING', None), 'bar=baz') + self.assertEqual(req.get('REQUEST_METHOD', None), 'POST') + self.assertEqual(req.get('SCRIPT_NAME', None), '') + self.assertEqual(req.get('SERVER_NAME', None), 'www.example.com') + self.assertEqual(req.get('SERVER_PORT', None), '443') + self.assertEqual(req.get('CONTENT_LENGTH', None),'11') + self.assertEqual(req.get('CONTENT_TYPE', None), + 'application/x-www-form-urlencoded') + self.assertEqual(req['wsgi.input'].read(), 'hello=world') + + + def test_post_does_not_reparse(self): + # test that there's no repetitive parsing is happening on every + # req.POST access + from webob import Request + req = Request.blank('/', + content_type='multipart/form-data; boundary=boundary', + POST=_cgi_escaping_body + ) + f0 = req.body_file_raw + post1 = req.str_POST + f1 = req.body_file_raw + self.assert_(f1 is not f0) + post2 = req.str_POST + f2 = req.body_file_raw + self.assert_(post1 is post2) + self.assert_(f1 is f2) + + + def test_middleware_body(self): + from webob import Request + def app(env, sr): + sr('200 OK', []) + return [env['wsgi.input'].read()] + + def mw(env, sr): + req = Request(env) + data = req.body_file.read() + resp = req.get_response(app) + resp.headers['x-data'] = data + return resp(env, sr) + + req = Request.blank('/', method='PUT', body='abc') + resp = req.get_response(mw) + self.assertEqual(resp.body, 'abc') + self.assertEqual(resp.headers['x-data'], 'abc') + + def test_body_file_noseek(self): + from webob import Request + req = Request.blank('/', method='PUT', body='abc') + lst = [req.body_file.read(1) for i in range(3)] + self.assertEqual(lst, ['a','b','c']) + + def test_cgi_escaping_fix(self): + from webob import Request + req = Request.blank('/', + content_type='multipart/form-data; boundary=boundary', + POST=_cgi_escaping_body + ) + self.assertEqual(req.POST.keys(), ['%20%22"']) + req.body_file.read() + self.assertEqual(req.POST.keys(), ['%20%22"']) + + def test_content_type_none(self): + from webob import Request + r = Request.blank('/', content_type='text/html') + self.assertEqual(r.content_type, 'text/html') + r.content_type = None + + def test_charset_in_content_type(self): + from webob import Request + r = Request({'CONTENT_TYPE':'text/html;charset=ascii'}) + r.charset = 'shift-jis' + self.assertEqual(r.charset, 'shift-jis') + + def test_body_file_seekable(self): + from cStringIO import StringIO + from webob import Request + r = Request.blank('/') + r.body_file = StringIO('body') + self.assertEqual(r.body_file_seekable.read(), 'body') + + def test_request_init(self): + # port from doctest (docs/reference.txt) + from webob import Request + req = Request.blank('/article?id=1') + self.assertEqual(req.environ['HTTP_HOST'], 'localhost:80') + self.assertEqual(req.environ['PATH_INFO'], '/article') + self.assertEqual(req.environ['QUERY_STRING'], 'id=1') + self.assertEqual(req.environ['REQUEST_METHOD'], 'GET') + self.assertEqual(req.environ['SCRIPT_NAME'], '') + self.assertEqual(req.environ['SERVER_NAME'], 'localhost') + self.assertEqual(req.environ['SERVER_PORT'], '80') + self.assertEqual(req.environ['SERVER_PROTOCOL'], 'HTTP/1.0') + self.assert_(hasattr(req.environ['wsgi.errors'], 'write') and + hasattr(req.environ['wsgi.errors'], 'flush')) + self.assert_(hasattr(req.environ['wsgi.input'], 'next')) + self.assertEqual(req.environ['wsgi.multiprocess'], False) + self.assertEqual(req.environ['wsgi.multithread'], False) + self.assertEqual(req.environ['wsgi.run_once'], False) + self.assertEqual(req.environ['wsgi.url_scheme'], 'http') + self.assertEqual(req.environ['wsgi.version'], (1, 0)) + + # Test body + self.assert_(hasattr(req.body_file, 'read')) + self.assertEqual(req.body, '') + req.body = 'test' + self.assert_(hasattr(req.body_file, 'read')) + self.assertEqual(req.body, 'test') + + # Test method & URL + self.assertEqual(req.method, 'GET') + self.assertEqual(req.scheme, 'http') + self.assertEqual(req.script_name, '') # The base of the URL + req.script_name = '/blog' # make it more interesting + self.assertEqual(req.path_info, '/article') + # Content-Type of the request body + self.assertEqual(req.content_type, '') + # The auth'ed user (there is none set) + self.assert_(req.remote_user is None) + self.assert_(req.remote_addr is None) + self.assertEqual(req.host, 'localhost:80') + self.assertEqual(req.host_url, 'http://localhost') + self.assertEqual(req.application_url, 'http://localhost/blog') + self.assertEqual(req.path_url, 'http://localhost/blog/article') + self.assertEqual(req.url, 'http://localhost/blog/article?id=1') + self.assertEqual(req.path, '/blog/article') + self.assertEqual(req.path_qs, '/blog/article?id=1') + self.assertEqual(req.query_string, 'id=1') + self.assertEqual(req.relative_url('archive'), + 'http://localhost/blog/archive') + + # Doesn't change request + self.assertEqual(req.path_info_peek(), 'article') + # Does change request! + self.assertEqual(req.path_info_pop(), 'article') + self.assertEqual(req.script_name, '/blog/article') + self.assertEqual(req.path_info, '') + + # Headers + req.headers['Content-Type'] = 'application/x-www-urlencoded' + self.assertEqual(sorted(req.headers.items()), + [('Content-Length', '4'), + ('Content-Type', 'application/x-www-urlencoded'), + ('Host', 'localhost:80')]) + self.assertEqual(req.environ['CONTENT_TYPE'], + 'application/x-www-urlencoded') + + def test_request_query_and_POST_vars(self): + # port from doctest (docs/reference.txt) + + # Query & POST variables + from webob import Request + from webob.multidict import MultiDict + from webob.multidict import NestedMultiDict + from webob.multidict import NoVars + from webob.multidict import TrackableMultiDict + req = Request.blank('/test?check=a&check=b&name=Bob') + GET = TrackableMultiDict([('check', 'a'), + ('check', 'b'), + ('name', 'Bob')]) + self.assertEqual(req.str_GET, GET) + self.assertEqual(req.str_GET['check'], 'b') + self.assertEqual(req.str_GET.getall('check'), ['a', 'b']) + self.assertEqual(req.str_GET.items(), + [('check', 'a'), ('check', 'b'), ('name', 'Bob')]) + + self.assert_(isinstance(req.str_POST, NoVars)) + # NoVars can be read like a dict, but not written + self.assertEqual(req.str_POST.items(), []) + req.method = 'POST' + req.body = 'name=Joe&email=joe@example.com' + self.assertEqual(req.str_POST, + MultiDict([('name', 'Joe'), + ('email', 'joe@example.com')])) + self.assertEqual(req.str_POST['name'], 'Joe') + + self.assert_(isinstance(req.str_params, NestedMultiDict)) + self.assertEqual(req.str_params.items(), + [('check', 'a'), + ('check', 'b'), + ('name', 'Bob'), + ('name', 'Joe'), + ('email', 'joe@example.com')]) + self.assertEqual(req.str_params['name'], 'Bob') + self.assertEqual(req.str_params.getall('name'), ['Bob', 'Joe']) + + def test_request_put(self): + from datetime import datetime + from webob import Request + from webob import Response + from webob import UTC + from webob.acceptparse import MIMEAccept + from webob.byterange import Range + from webob.etag import ETagMatcher + from webob.etag import _NoIfRange + from webob.multidict import MultiDict + from webob.multidict import TrackableMultiDict + from webob.multidict import UnicodeMultiDict + req = Request.blank('/test?check=a&check=b&name=Bob') + req.method = 'PUT' + req.body = 'var1=value1&var2=value2&rep=1&rep=2' + req.environ['CONTENT_LENGTH'] = str(len(req.body)) + req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded' + GET = TrackableMultiDict([('check', 'a'), + ('check', 'b'), + ('name', 'Bob')]) + self.assertEqual(req.str_GET, GET) + self.assertEqual(req.str_POST, MultiDict( + [('var1', 'value1'), + ('var2', 'value2'), + ('rep', '1'), + ('rep', '2')])) + + # Unicode + req.charset = 'utf8' + self.assert_(isinstance(req.GET, UnicodeMultiDict)) + self.assertEqual(req.GET.items(), + [('check', u'a'), ('check', u'b'), ('name', u'Bob')]) + + # Cookies + req.headers['Cookie'] = 'test=value' + self.assert_(isinstance(req.cookies, UnicodeMultiDict)) + self.assertEqual(req.cookies.items(), [('test', u'value')]) + req.charset = None + self.assertEqual(req.str_cookies, {'test': 'value'}) + + # Accept-* headers + self.assert_('text/html' in req.accept) + req.accept = 'text/html;q=0.5, application/xhtml+xml;q=1' + self.assert_(isinstance(req.accept, MIMEAccept)) + self.assert_('text/html' in req.accept) + + self.assertEqual(req.accept.first_match(['text/html', + 'application/xhtml+xml']), 'text/html') + self.assertEqual(req.accept.best_match(['text/html', + 'application/xhtml+xml']), + 'application/xhtml+xml') + self.assertEqual(req.accept.best_matches(), + ['application/xhtml+xml', 'text/html']) + + req.accept_language = 'es, pt-BR' + self.assertEqual(req.accept_language.best_matches('en-US'), + ['es', 'pt-BR', 'en-US']) + self.assertEqual(req.accept_language.best_matches('es'), ['es']) + + # Conditional Requests + server_token = 'opaque-token' + # shouldn't return 304 + self.assert_(not server_token in req.if_none_match) + req.if_none_match = server_token + self.assert_(isinstance(req.if_none_match, ETagMatcher)) + # You *should* return 304 + self.assert_(server_token in req.if_none_match) + + req.if_modified_since = datetime(2006, 1, 1, 12, 0, tzinfo=UTC) + self.assertEqual(req.headers['If-Modified-Since'], + 'Sun, 01 Jan 2006 12:00:00 GMT') + server_modified = datetime(2005, 1, 1, 12, 0, tzinfo=UTC) + self.assert_(req.if_modified_since) + self.assert_(req.if_modified_since >= server_modified) + + self.assert_(isinstance(req.if_range, _NoIfRange)) + self.assert_(req.if_range.match(etag='some-etag', + last_modified=datetime(2005, 1, 1, 12, 0))) + req.if_range = 'opaque-etag' + self.assert_(not req.if_range.match(etag='other-etag')) + self.assert_(req.if_range.match(etag='opaque-etag')) + + res = Response(etag='opaque-etag') + self.assert_(req.if_range.match_response(res)) + + req.range = 'bytes=0-100' + self.assert_(isinstance(req.range, Range)) + self.assertEqual(req.range.ranges, [(0, 101)]) + cr = req.range.content_range(length=1000) + self.assertEqual((cr.start, cr.stop, cr.length), (0, 101, 1000)) + + self.assert_(server_token in req.if_match) + # No If-Match means everything is ok + req.if_match = server_token + self.assert_(server_token in req.if_match) + # Still OK + req.if_match = 'other-token' + # Not OK, should return 412 Precondition Failed: + self.assert_(not server_token in req.if_match) + + def test_call_WSGI_app(self): + from webob import Request + req = Request.blank('/') + def wsgi_app(environ, start_response): + start_response('200 OK', [('Content-type', 'text/plain')]) + return ['Hi!'] + self.assertEqual(req.call_application(wsgi_app), + ('200 OK', [('Content-type', 'text/plain')], ['Hi!'])) + + res = req.get_response(wsgi_app) + from webob.response import Response + self.assert_(isinstance(res, Response)) + self.assertEqual(res.status, '200 OK') + from webob.headers import ResponseHeaders + self.assert_(isinstance(res.headers, ResponseHeaders)) + self.assertEqual(res.headers.items(), [('Content-type', 'text/plain')]) + self.assertEqual(res.body, 'Hi!') + + def equal_req(self, req): + from cStringIO import StringIO + from webob import Request + input = StringIO(str(req)) + req2 = Request.from_file(input) + self.assertEqual(req.url, req2.url) + headers1 = dict(req.headers) + headers2 = dict(req2.headers) + self.assertEqual(int(headers1.get('Content-Length', '0')), + int(headers2.get('Content-Length', '0'))) + if 'Content-Length' in headers1: + del headers1['Content-Length'] + if 'Content-Length' in headers2: + del headers2['Content-Length'] + self.assertEqual(headers1, headers2) + self.assertEqual(req.body, req2.body) + def simpleapp(environ, start_response): + from webob import Request status = '200 OK' response_headers = [('Content-type','text/plain')] start_response(status, response_headers) @@ -18,15 +2275,21 @@ def simpleapp(environ, start_response): 'Hello world!\n', 'The get is %r' % request.str_GET, ' and Val is %s\n' % request.str_GET.get('name'), - 'The languages are: %s\n' % request.accept_language.best_matches('en-US'), - 'The accepttypes is: %s\n' % request.accept.best_match(['application/xml', 'text/html']), + 'The languages are: %s\n' % + request.accept_language.best_matches('en-US'), + 'The accepttypes is: %s\n' % + request.accept.best_match(['application/xml', 'text/html']), 'post is %r\n' % request.str_POST, 'params is %r\n' % request.str_params, 'cookies is %r\n' % request.str_cookies, 'body: %r\n' % request.body, 'method: %s\n' % request.method, 'remote_user: %r\n' % request.environ['REMOTE_USER'], - 'host_url: %r; application_url: %r; path_url: %r; url: %r\n' % (request.host_url, request.application_url, request.path_url, request.url), + 'host_url: %r; application_url: %r; path_url: %r; url: %r\n' % + (request.host_url, + request.application_url, + request.path_url, + request.url), 'urlvars: %r\n' % request.urlvars, 'urlargs: %r\n' % (request.urlargs, ), 'is_xhr: %r\n' % request.is_xhr, @@ -35,537 +2298,13 @@ def simpleapp(environ, start_response): 'if_none_match: %r\n' % request.if_none_match, ] -def test_gets(): - app = TestApp(simpleapp) - res = app.get('/') - assert 'Hello' in res - assert "get is GET([])" in res - assert "post is <NoVars: Not a form request>" in res - - res = app.get('/?name=george') - res.mustcontain("get is GET([('name', 'george')])") - res.mustcontain("Val is george") - -def test_language_parsing(): - app = TestApp(simpleapp) - res = app.get('/') - assert "The languages are: ['en-US']" in res - - res = app.get('/', headers={'Accept-Language':'da, en-gb;q=0.8, en;q=0.7'}) - assert "languages are: ['da', 'en-gb', 'en-US']" in res - - res = app.get('/', headers={'Accept-Language':'en-gb;q=0.8, da, en;q=0.7'}) - assert "languages are: ['da', 'en-gb', 'en-US']" in res - -def test_mime_parsing(): - app = TestApp(simpleapp) - res = app.get('/', headers={'Accept':'text/html'}) - assert "accepttypes is: text/html" in res - - res = app.get('/', headers={'Accept':'application/xml'}) - assert "accepttypes is: application/xml" in res - - res = app.get('/', headers={'Accept':'application/xml,*/*'}) - assert "accepttypes is: application/xml" in res, res - - -def test_accept_best_match(): - assert not Request.blank('/').accept - assert not Request.blank('/', headers={'Accept': ''}).accept - req = Request.blank('/', headers={'Accept':'text/plain'}) - ok_(req.accept) - assert_raises(ValueError, req.accept.best_match, ['*/*']) - req = Request.blank('/', accept=['*/*','text/*']) - eq_(req.accept.best_match(['application/x-foo', 'text/plain']), 'text/plain') - eq_(req.accept.best_match(['text/plain', 'application/x-foo']), 'text/plain') - req = Request.blank('/', accept=['text/plain', 'message/*']) - eq_(req.accept.best_match(['message/x-foo', 'text/plain']), 'text/plain') - eq_(req.accept.best_match(['text/plain', 'message/x-foo']), 'text/plain') - -def test_from_mimeparse(): - # http://mimeparse.googlecode.com/svn/trunk/mimeparse.py - supported = ['application/xbel+xml', 'application/xml'] - tests = [('application/xbel+xml', 'application/xbel+xml'), - ('application/xbel+xml; q=1', 'application/xbel+xml'), - ('application/xml; q=1', 'application/xml'), - ('application/*; q=1', 'application/xbel+xml'), - ('*/*', 'application/xbel+xml')] - - for accept, get in tests: - req = Request.blank('/', headers={'Accept':accept}) - assert req.accept.best_match(supported) == get, ( - '%r generated %r instead of %r for %r' % (accept, req.accept.best_match(supported), get, supported)) - - supported = ['application/xbel+xml', 'text/xml'] - tests = [('text/*;q=0.5,*/*; q=0.1', 'text/xml'), - ('text/html,application/atom+xml; q=0.9', None)] - - for accept, get in tests: - req = Request.blank('/', headers={'Accept':accept}) - assert req.accept.best_match(supported) == get, ( - 'Got %r instead of %r for %r' % (req.accept.best_match(supported), get, supported)) - - supported = ['application/json', 'text/html'] - tests = [('application/json, text/javascript, */*', 'application/json'), - ('application/json, text/html;q=0.9', 'application/json')] - - for accept, get in tests: - req = Request.blank('/', headers={'Accept':accept}) - assert req.accept.best_match(supported) == get, ( - '%r generated %r instead of %r for %r' % (accept, req.accept.best_match(supported), get, supported)) - - offered = ['image/png', 'application/xml'] - tests = [ - ('image/png', 'image/png'), - ('image/*', 'image/png'), - ('image/*, application/xml', 'application/xml'), - ] - - for accept, get in tests: - req = Request.blank('/', accept=accept) - eq_(req.accept.best_match(offered), get) - -def test_headers(): - app = TestApp(simpleapp) - headers = { - 'If-Modified-Since': 'Sat, 29 Oct 1994 19:43:31 GMT', - 'Cookie': 'var1=value1', - 'User-Agent': 'Mozilla 4.0 (compatible; MSIE)', - 'If-None-Match': '"etag001", "etag002"', - 'X-Requested-With': 'XMLHttpRequest', - } - res = app.get('/?foo=bar&baz', headers=headers) - res.mustcontain( - 'if_modified_since: datetime.datetime(1994, 10, 29, 19, 43, 31, tzinfo=UTC)', - "user_agent: 'Mozilla", - 'is_xhr: True', - "cookies is {'var1': 'value1'}", - "params is NestedMultiDict([('foo', 'bar'), ('baz', '')])", - "if_none_match: <ETag etag001 or etag002>", - ) -def test_bad_cookie(): - req = Request.blank('/') - req.headers['Cookie'] = '070-it-:><?0' - assert req.cookies == {} - req.headers['Cookie'] = 'foo=bar' - assert req.cookies == {'foo': 'bar'} - req.headers['Cookie'] = '...' - assert req.cookies == {} - req.headers['Cookie'] = '=foo' - assert req.cookies == {} - req.headers['Cookie'] = 'dismiss-top=6; CP=null*; PHPSESSID=0a539d42abc001cdc762809248d4beed; a=42' - eq_(req.cookies, { - 'CP': u'null*', - 'PHPSESSID': u'0a539d42abc001cdc762809248d4beed', - 'a': u'42', - 'dismiss-top': u'6' - }) - req.headers['Cookie'] = 'fo234{=bar blub=Blah' - assert req.cookies == {'blub': 'Blah'} - -def test_cookie_quoting(): - req = Request.blank('/') - req.headers['Cookie'] = 'foo="?foo"; Path=/' - assert req.cookies == {'foo': '?foo'} - -def test_path_quoting(): - path = '/:@&+$,/bar' - req = Request.blank(path) - assert req.path == path - assert req.url.endswith(path) - -def test_params(): - req = Request.blank('/?a=1&b=2') - req.method = 'POST' - req.body = 'b=3' - assert req.params.items() == [('a', '1'), ('b', '2'), ('b', '3')] - new_params = req.params.copy() - assert new_params.items() == [('a', '1'), ('b', '2'), ('b', '3')] - new_params['b'] = '4' - assert new_params.items() == [('a', '1'), ('b', '4')] - # The key name is \u1000: - req = Request.blank('/?%E1%80%80=x', decode_param_names=True, charset='UTF-8') - assert req.decode_param_names - assert u'\u1000' in req.GET.keys() - assert req.GET[u'\u1000'] == 'x' - -class UnseekableInput(object): - def __init__(self, data): - self.data = data - self.pos = 0 - def read(self, size=-1): - if size == -1: - t = self.data[self.pos:] - self.pos = len(self.data) - return t - else: - assert self.pos + size <= len(self.data), ( - "Attempt to read past end (length=%s, position=%s, reading %s bytes)" - % (len(self.data), self.pos, size)) - t = self.data[self.pos:self.pos+size] - self.pos += size - return t -def test_copy_body(): - req = Request.blank('/', method='POST', body='some text', request_body_tempfile_limit=1) - old_body_file = req.body_file_raw - req.copy_body() - assert req.body_file_raw is not old_body_file - req = Request.blank('/', method='POST', body_file=UnseekableInput('0123456789'), content_length=10) - assert not hasattr(req.body_file_raw, 'seek') - old_body_file = req.body_file_raw - req.make_body_seekable() - assert req.body_file_raw is not old_body_file - assert req.body == '0123456789' - old_body_file = req.body_file - req.make_body_seekable() - assert req.body_file_raw is old_body_file - assert req.body_file is old_body_file +_cgi_escaping_body = '''--boundary +Content-Disposition: form-data; name="%20%22"" -class UnseekableInputWithSeek(UnseekableInput): - def seek(self, pos, rel=0): - raise IOError("Invalid seek!") -def test_broken_seek(): - # copy() should work even when the input has a broken seek method - req = Request.blank('/', method='POST', body_file=UnseekableInputWithSeek('0123456789'), content_length=10) - assert hasattr(req.body_file_raw, 'seek') - assert_raises(IOError, req.body_file_raw.seek, 0) - old_body_file = req.body_file - req2 = req.copy() - assert req2.body_file_raw is req2.body_file is not old_body_file - assert req2.body == '0123456789' - -def test_set_body(): - req = BaseRequest.blank('/', body='foo') - assert req.is_body_seekable - eq_(req.body, 'foo') - eq_(req.content_length, 3) - del req.body - eq_(req.body, '') - eq_(req.content_length, 0) - - - -def test_broken_clen_header(): - # if the UA sends "content_length: ..' header (the name is wrong) - # it should not break the req.headers.items() - req = Request.blank('/') - req.environ['HTTP_CONTENT_LENGTH'] = '0' - req.headers.items() - - -def test_nonstr_keys(): - # non-string env keys shouldn't break req.headers - req = Request.blank('/') - req.environ[1] = 1 - req.headers.items() - - -def test_authorization(): - req = Request.blank('/') - req.authorization = 'Digest uri="/?a=b"' - assert req.authorization == ('Digest', {'uri': '/?a=b'}) - -def test_authorization2(): - from webob.descriptors import parse_auth_params - for s, d in [ - ('x=y', {'x': 'y'}), - ('x="y"', {'x': 'y'}), - ('x=y,z=z', {'x': 'y', 'z': 'z'}), - ('x=y, z=z', {'x': 'y', 'z': 'z'}), - ('x="y",z=z', {'x': 'y', 'z': 'z'}), - ('x="y", z=z', {'x': 'y', 'z': 'z'}), - ('x="y,x", z=z', {'x': 'y,x', 'z': 'z'}), - ]: - eq_(parse_auth_params(s), d) - - -def test_from_file(): - req = Request.blank('http://example.com:8000/test.html?params') - equal_req(req) - - req = Request.blank('http://example.com/test2') - req.method = 'POST' - req.body = 'test=example' - equal_req(req) - -def equal_req(req): - input = StringIO(str(req)) - req2 = Request.from_file(input) - eq_(req.url, req2.url) - headers1 = dict(req.headers) - headers2 = dict(req2.headers) - eq_(int(headers1.get('Content-Length', '0')), - int(headers2.get('Content-Length', '0'))) - if 'Content-Length' in headers1: - del headers1['Content-Length'] - if 'Content-Length' in headers2: - del headers2['Content-Length'] - eq_(headers1, headers2) - eq_(req.body, req2.body) - -def test_req_kw_none_val(): - assert 'content-length' not in Request({}, content_length=None).headers - assert 'content-type' not in Request({}, content_type=None).headers - -def test_env_keys(): - req = Request.blank('/') - # SCRIPT_NAME can be missing - del req.environ['SCRIPT_NAME'] - eq_(req.script_name, '') - eq_(req.uscript_name, u'') - -def test_repr_nodefault(): - nd = NoDefault - eq_(repr(nd), '(No Default)') - -def test_request_noenviron_param(): - """Environ is a a mandatory not null param in Request""" - assert_raises(TypeError, Request, environ=None) - -def test_environ_getter(): - """ - Parameter environ_getter in Request is no longer valid and should raise - an error in case it's used - """ - class env(object): - def __init__(self, env): - self.env = env - def env_getter(self): - return self.env - assert_raises(ValueError, Request, environ_getter=env({'a':1}).env_getter) - -def test_unicode_errors(): - """ - Passing unicode_errors != NoDefault should assign value to - dictionary['unicode_errors'], else not - """ - r = Request({'a':1}, unicode_errors='strict') - ok_('unicode_errors' in r.__dict__) - r = Request({'a':1}, unicode_errors=NoDefault) - ok_('unicode_errors' not in r.__dict__) - -def test_charset_deprecation(): - """ - Any class that inherits from BaseRequest cannot define a default_charset - attribute. - Any class that inherits from BaseRequest cannot define a charset attr - that is instance of str - """ - class NewRequest(BaseRequest): - default_charset = 'utf-8' - def __init__(self, environ, **kw): - super(NewRequest, self).__init__(environ, **kw) - assert_raises(DeprecationWarning, NewRequest, {'a':1}) - class NewRequest(BaseRequest): - charset = 'utf-8' - def __init__(self, environ, **kw): - super(NewRequest, self).__init__(environ, **kw) - assert_raises(DeprecationWarning, NewRequest, {'a':1}) - class NewRequest(AdhocAttrMixin, BaseRequest): - default_charset = 'utf-8' - def __init__(self, environ, **kw): - super(NewRequest, self).__init__(environ, **kw) - assert_raises(DeprecationWarning, NewRequest, {'a':1}) - class NewRequest(AdhocAttrMixin, BaseRequest): - charset = 'utf-8' - def __init__(self, environ, **kw): - super(NewRequest, self).__init__(environ, **kw) - assert_raises(DeprecationWarning, NewRequest, {'a':1}) - -def test_unexpected_kw(): - """ - Passed an attr in kw that does not exist in the class, should raise an - error - Passed an attr in kw that does exist in the class, should be ok - """ - assert_raises(TypeError, Request, {'a':1}, **{'this_does_not_exist':1}) - r = Request({'a':1}, **{'charset':'utf-8', 'server_name':'127.0.0.1'}) - eq_(getattr(r, 'charset', None), 'utf-8') - eq_(getattr(r, 'server_name', None), '127.0.0.1') - -def test_body_file_setter(): - """" - If body_file is passed and it's instance of str, we define - environ['wsgi.input'] and content_length. Plus, while deleting the - attribute, we should get '' and 0 respectively - """ - r = Request({'a':1}, **{'body_file':'hello world'}) - eq_(r.environ['wsgi.input'].getvalue(), 'hello world') - eq_(int(r.environ['CONTENT_LENGTH']), len('hello world')) - del r.body_file - eq_(r.environ['wsgi.input'].getvalue(), '') - eq_(int(r.environ['CONTENT_LENGTH']), 0) - -def test_conttype_set_del(): - """ - Deleting content_type attr from a request should update the environ dict - Assigning content_type should replace first option of the environ dict - """ - r = Request({'a':1}, **{'content_type':'text/html'}) - ok_('CONTENT_TYPE' in r.environ) - ok_(hasattr(r, 'content_type')) - del r.content_type - ok_('CONTENT_TYPE' not in r.environ) - a = Request({'a':1},**{'content_type':'charset=utf-8;application/atom+xml;type=entry'}) - ok_(a.environ['CONTENT_TYPE']=='charset=utf-8;application/atom+xml;type=entry') - a.content_type = 'charset=utf-8' - ok_(a.environ['CONTENT_TYPE']=='charset=utf-8;application/atom+xml;type=entry') - -def test_headers(): - """ - Setting headers in init and later with a property, should update the info - """ - headers = {'Host': 'www.example.com', - 'Accept-Language': 'en-us,en;q=0.5', - 'Accept-Encoding': 'gzip,deflate', - 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', - 'Keep-Alive': '115', - 'Connection': 'keep-alive', - 'Cache-Control': 'max-age=0'} - r = Request({'a':1}, headers=headers) - for i in headers.keys(): - ok_(i in r.headers and - 'HTTP_'+i.upper().replace('-', '_') in r.environ) - r.headers = {'Server':'Apache'} - eq_(r.environ.keys(), ['a', 'HTTP_SERVER']) - -def test_host_url(): - """ - Request has a read only property host_url that combines several keys to - create a host_url - """ - a = Request({'wsgi.url_scheme':'http'}, **{'host':'www.example.com'}) - eq_(a.host_url, 'http://www.example.com') - a = Request({'wsgi.url_scheme':'http'}, **{'server_name':'localhost', - 'server_port':5000}) - eq_(a.host_url, 'http://localhost:5000') - a = Request({'wsgi.url_scheme':'https'}, **{'server_name':'localhost', - 'server_port':443}) - eq_(a.host_url, 'https://localhost') - -def test_path_info_p(): - """ - Peek path_info to see what's coming - Pop path_info until there's nothing remaining - """ - a = Request({'a':1}, **{'path_info':'/foo/bar','script_name':''}) - eq_(a.path_info_peek(), 'foo') - eq_(a.path_info_pop(), 'foo') - eq_(a.path_info_peek(), 'bar') - eq_(a.path_info_pop(), 'bar') - eq_(a.path_info_peek(), None) - eq_(a.path_info_pop(), None) - -def test_urlvars_property(): - """ - Testing urlvars setter/getter/deleter - """ - a = Request({'wsgiorg.routing_args':((),{'x':'y'}), - 'paste.urlvars':{'test':'value'}}) - a.urlvars = {'hello':'world'} - ok_('paste.urlvars' not in a.environ) - eq_(a.environ['wsgiorg.routing_args'], ((), {'hello':'world'})) - del a.urlvars - ok_('wsgiorg.routing_args' not in a.environ) - a = Request({'paste.urlvars':{'test':'value'}}) - eq_(a.urlvars, {'test':'value'}) - a.urlvars = {'hello':'world'} - eq_(a.environ['paste.urlvars'], {'hello':'world'}) - del a.urlvars - ok_('paste.urlvars' not in a.environ) - -def test_urlargs_property(): - """ - Testing urlargs setter/getter/deleter - """ - a = Request({'paste.urlvars':{'test':'value'}}) - eq_(a.urlargs, ()) - a.urlargs = {'hello':'world'} - eq_(a.environ['wsgiorg.routing_args'], ({'hello':'world'}, - {'test':'value'})) - a = Request({'a':1}) - a.urlargs = {'hello':'world'} - eq_(a.environ['wsgiorg.routing_args'], ({'hello':'world'}, {})) - del a.urlargs - ok_('wsgiorg.routing_args' not in a.environ) - -def test_host_property(): - """ - Testing host setter/getter/deleter - """ - a = Request({'wsgi.url_scheme':'http'}, **{'server_name':'localhost', - 'server_port':5000}) - eq_(a.host, "localhost:5000") - a.host = "localhost:5000" - ok_('HTTP_HOST' in a.environ) - del a.host - ok_('HTTP_HOST' not in a.environ) - -def test_body_property(): - """ - Testing body setter/getter/deleter plus making sure body has a seek method - """ - #a = Request({'a':1}, **{'CONTENT_LENGTH':'?'}) - # I cannot think of a case where somebody would put anything else than a - # numerical value in CONTENT_LENGTH, Google didn't help either - #eq_(a.body, '') - # I need to implement a not seekable stringio like object. - class DummyIO(object): - def __init__(self, txt): - self.txt = txt - def read(self, n=-1): - return self.txt[0:n] - len_strl = BaseRequest.request_body_tempfile_limit/len(string.letters)+1 - r = Request({'a':1}, **{'body_file':DummyIO(string.letters*len_strl)}) - eq_(len(r.body), len(string.letters*len_strl)-1) - assert_raises(TypeError, setattr, r, 'body', unicode('hello world')) - r.body = None - eq_(r.body, '') - r = Request({'a':1}, **{'body_file':DummyIO(string.letters)}) - assert not hasattr(r.body_file_raw, 'seek') - r.make_body_seekable() - assert hasattr(r.body_file_raw, 'seek') - r = Request({'a':1}, **{'body_file':StringIO(string.letters)}) - assert hasattr(r.body_file_raw, 'seek') - r.make_body_seekable() - assert hasattr(r.body_file_raw, 'seek') - -def test_repr_invalid(): - """If we have an invalid WSGI environ, the repr should tell us""" - req = BaseRequest({'CONTENT_LENGTH':'0', 'body':''}) - ok_(repr(req).endswith('(invalid WSGI environ)>')) - -def test_from_file(): - """If we pass a file with garbage to from_file method it should raise an - error plus missing bits in from_file method - """ - assert_raises(ValueError, BaseRequest.from_file, StringIO('hello world')) - val_file = StringIO( - "GET /webob/ HTTP/1.1\n" - "Host: pythonpaste.org\n" - "User-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.13)" - "Gecko/20101206 Ubuntu/10.04 (lucid) Firefox/3.6.13\n" - "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;" - "q=0.8\n" - "Accept-Language: en-us,en;q=0.5\n" - "Accept-Encoding: gzip,deflate\n" - "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" - # duplicate on porpouse - "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" - "Keep-Alive: 115\n" - "Connection: keep-alive\n" - ) - req = BaseRequest.from_file(val_file) - assert isinstance(req, BaseRequest) - assert_false(repr(req).endswith('(invalid WSGI environ)>')) - val_file = StringIO( - "GET /webob/ HTTP/1.1\n" - "Host pythonpaste.org\n" - ) - assert_raises(ValueError, BaseRequest.from_file, val_file) +--boundary--''' def _norm_req(s): return '\r\n'.join(s.strip().replace('\r','').split('\n')) @@ -590,6 +2329,7 @@ these are the contents of the file 'bar.txt' ------------------------------deb95b63e42a-- """ + _test_req2 = """ POST / HTTP/1.0 Content-Length: 0 @@ -599,149 +2339,128 @@ Content-Length: 0 _test_req = _norm_req(_test_req) _test_req2 = _norm_req(_test_req2) + '\r\n' -def test_from_string(): - """A valid request without a Content-Length header should still read the full body. - Also test parity between as_string and from_string / from_file. - """ - req = BaseRequest.from_string(_test_req) - assert isinstance(req, BaseRequest) - assert_false(repr(req).endswith('(invalid WSGI environ)>')) - assert_false('\n' in req.http_version or '\r' in req.http_version) - assert ',' not in req.host - assert req.content_length is not None - assert req.content_length == 337 - assert 'foo' in req.body - bar_contents = "these are the contents of the file 'bar.txt'\r\n" - assert bar_contents in req.body - assert req.params['foo'] == 'foo' - bar = req.params['bar'] - assert isinstance(bar, cgi.FieldStorage) - assert bar.type == 'application/octet-stream' - bar.file.seek(0) - assert bar.file.read() == bar_contents - # out should equal contents, except for the Content-Length header, - # so insert that. - _test_req_copy = _test_req.replace('Content-Type', 'Content-Length: 337\r\nContent-Type') - assert str(req) == _test_req_copy - - req2 = BaseRequest.from_string(_test_req2) - assert 'host' not in req2.headers - eq_(str(req2), _test_req2.rstrip()) - assert_raises(ValueError, BaseRequest.from_string, _test_req2+'xx') - - -def test_blank(): - """BaseRequest.blank class method""" - assert_raises(ValueError, BaseRequest.blank, - 'www.example.com/foo?hello=world', None, - 'www.example.com/foo?hello=world') - assert_raises(ValueError, BaseRequest.blank, - 'gopher.example.com/foo?hello=world', None, - 'gopher://gopher.example.com') - req = BaseRequest.blank('www.example.com/foo?hello=world', None, - 'http://www.example.com') - ok_(req.environ.get('HTTP_HOST', None)== 'www.example.com:80' and - req.environ.get('PATH_INFO', None)== 'www.example.com/foo' and - req.environ.get('QUERY_STRING', None)== 'hello=world' and - req.environ.get('REQUEST_METHOD', None)== 'GET') - req = BaseRequest.blank('www.example.com/secure?hello=world', None, - 'https://www.example.com/secure') - ok_(req.environ.get('HTTP_HOST', None)== 'www.example.com:443' and - req.environ.get('PATH_INFO', None)== 'www.example.com/secure' and - req.environ.get('QUERY_STRING', None)== 'hello=world' and - req.environ.get('REQUEST_METHOD', None)== 'GET' and - req.environ.get('SCRIPT_NAME', None)== '/secure' and - req.environ.get('SERVER_NAME', None)== 'www.example.com' and - req.environ.get('SERVER_PORT', None)== '443') - -def test_environ_from_url(): - """Generating an environ just from an url plus testing environ_add_POST""" - assert_raises(TypeError, environ_from_url, - 'http://www.example.com/foo?bar=baz#qux') - assert_raises(TypeError, environ_from_url, - 'gopher://gopher.example.com') - req = environ_from_url('http://www.example.com/foo?bar=baz') - ok_(req.get('HTTP_HOST', None)== 'www.example.com:80' and - req.get('PATH_INFO', None)== '/foo' and - req.get('QUERY_STRING', None)== 'bar=baz' and - req.get('REQUEST_METHOD', None)== 'GET' and - req.get('SCRIPT_NAME', None)== '' and - req.get('SERVER_NAME', None)== 'www.example.com' and - req.get('SERVER_PORT', None)== '80') - req = environ_from_url('https://www.example.com/foo?bar=baz') - ok_(req.get('HTTP_HOST', None)== 'www.example.com:443' and - req.get('PATH_INFO', None)== '/foo' and - req.get('QUERY_STRING', None)== 'bar=baz' and - req.get('REQUEST_METHOD', None)== 'GET' and - req.get('SCRIPT_NAME', None)== '' and - req.get('SERVER_NAME', None)== 'www.example.com' and - req.get('SERVER_PORT', None)== '443') - environ_add_POST(req, None) - assert_false('CONTENT_TYPE' in req and 'CONTENT_LENGTH' in req) - environ_add_POST(req, {'hello':'world'}) - ok_(req.get('HTTP_HOST', None)== 'www.example.com:443' and - req.get('PATH_INFO', None)== '/foo' and - req.get('QUERY_STRING', None)== 'bar=baz' and - req.get('REQUEST_METHOD', None)== 'POST' and - req.get('SCRIPT_NAME', None)== '' and - req.get('SERVER_NAME', None)== 'www.example.com' and - req.get('SERVER_PORT', None)== '443' and - req.get('CONTENT_LENGTH', None)=='11' and - req.get('CONTENT_TYPE', None)=='application/x-www-form-urlencoded') - eq_(req['wsgi.input'].read(), 'hello=world') - - -def test_post_does_not_reparse(): - # test that there's no repetitive parsing is happening on every req.POST access - req = Request.blank('/', - content_type='multipart/form-data; boundary=boundary', - POST=_cgi_escaping_body - ) - f0 = req.body_file_raw - post1 = req.str_POST - f1 = req.body_file_raw - assert f1 is not f0 - post2 = req.str_POST - f2 = req.body_file_raw - assert post1 is post2 - assert f1 is f2 - - -def test_middleware_body(): - def app(env, sr): - sr('200 OK', []) - return [env['wsgi.input'].read()] - - def mw(env, sr): - req = Request(env) - data = req.body_file.read() - resp = req.get_response(app) - resp.headers['x-data'] = data - return resp(env, sr) - - req = Request.blank('/', method='PUT', body='abc') - resp = req.get_response(mw) - eq_(resp.body, 'abc') - eq_(resp.headers['x-data'], 'abc') - -def test_body_file_noseek(): - req = Request.blank('/', method='PUT', body='abc') - lst = [req.body_file.read(1) for i in range(3)] - eq_(lst, ['a','b','c']) - -def test_cgi_escaping_fix(): - req = Request.blank('/', - content_type='multipart/form-data; boundary=boundary', - POST=_cgi_escaping_body - ) - eq_(req.POST.keys(), ['%20%22"']) - req.body_file.read() - eq_(req.POST.keys(), ['%20%22"']) - -_cgi_escaping_body = '''--boundary -Content-Disposition: form-data; name="%20%22"" +class UnseekableInput(object): + def __init__(self, data): + self.data = data + self.pos = 0 + def read(self, size=-1): + if size == -1: + t = self.data[self.pos:] + self.pos = len(self.data) + return t + else: + assert(self.pos + size <= len(self.data)) + t = self.data[self.pos:self.pos+size] + self.pos += size + return t +class UnseekableInputWithSeek(UnseekableInput): + def seek(self, pos, rel=0): + raise IOError("Invalid seek!") ---boundary--''' +class FakeCGIBodyTests(unittest.TestCase): + + def test_encode_multipart_value_type_options(self): + from StringIO import StringIO + from cgi import FieldStorage + from webob.request import BaseRequest, FakeCGIBody + from webob.multidict import MultiDict + multipart_type = 'multipart/form-data; boundary=foobar' + multipart_body = StringIO( + '--foobar\r\n' + 'Content-Disposition: form-data; name="bananas"; filename="bananas.txt"\r\n' + 'Content-type: text/plain; charset="utf-9"\r\n' + '\r\n' + "these are the contents of the file 'bananas.txt'\r\n" + '\r\n' + '--foobar--' + ) + environ = BaseRequest.blank('/').environ + environ.update(CONTENT_TYPE=multipart_type) + environ.update(REQUEST_METHOD='POST') + fs = FieldStorage(multipart_body, environ=environ) + vars = MultiDict.from_fieldstorage(fs) + self.assertEqual(vars['bananas'].__class__, FieldStorage) + body = FakeCGIBody(vars, multipart_type) + self.assertEqual(body.read(), multipart_body.getvalue()) + + def test_encode_multipart_no_boundary(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({}, 'multipart/form-data') + self.assertRaises(ValueError, body.read) + + def test_repr(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'multipart/form-data; boundary=foobar') + body.read(1) + import re + self.assertEqual( + re.sub(r'\b0x[0-9a-f]+\b', '<whereitsat>', repr(body)), + "<FakeCGIBody at <whereitsat> viewing {'bananas': 'ba...nas'} at position 1>", + ) + def test_seek_tell(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'multipart/form-data; boundary=foobar') + self.assertEqual(body.tell(), 0) + body.seek(1) + self.assertEqual(body.tell(), 1) + self.assertRaises(IOError, body.seek, 0, 2) + + def test_iter(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'multipart/form-data; boundary=foobar') + self.assertEqual(list(body), [ + '--foobar\r\n', + 'Content-Disposition: form-data; name="bananas"\r\n', + '\r\n', + 'bananas\r\n', + '--foobar--', + ]) + + def test_readline(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'multipart/form-data; boundary=foobar') + self.assertEqual(body.readline(), '--foobar\r\n') + self.assertEqual(body.readline(), 'Content-Disposition: form-data; name="bananas"\r\n') + self.assertEqual(body.readline(), '\r\n') + self.assertEqual(body.readline(), 'bananas\r\n') + self.assertEqual(body.readline(), '--foobar--') + # subsequent calls to readline will return '' + + def test_read_bad_content_type(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'application/jibberjabber') + self.assertRaises(AssertionError, body.read) + + def test_read_urlencoded(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'application/x-www-form-urlencoded') + self.assertEqual(body.read(), 'bananas=bananas') + + +class Test_cgi_FieldStorage__repr__patch(unittest.TestCase): + def _callFUT(self, fake): + from webob.request import _cgi_FieldStorage__repr__patch + return _cgi_FieldStorage__repr__patch(fake) + + def test_with_file(self): + class Fake(object): + name = 'name' + file = 'file' + filename = 'filename' + value = 'value' + fake = Fake() + result = self._callFUT(fake) + self.assertEqual(result, "FieldStorage('name', 'filename')") + + def test_without_file(self): + class Fake(object): + name = 'name' + file = None + filename = 'filename' + value = 'value' + fake = Fake() + result = self._callFUT(fake) + self.assertEqual(result, "FieldStorage('name', 'filename', 'value')") |