diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/performance_test.py | 3 | ||||
-rw-r--r-- | tests/test_acceptparse.py | 387 | ||||
-rw-r--r-- | tests/test_byterange.py | 231 | ||||
-rw-r--r-- | tests/test_cachecontrol.py | 266 | ||||
-rw-r--r-- | tests/test_cookies.py | 85 | ||||
-rw-r--r-- | tests/test_datetime_utils.py | 61 | ||||
-rw-r--r-- | tests/test_dec.py | 267 | ||||
-rw-r--r-- | tests/test_dec.txt | 103 | ||||
-rw-r--r-- | tests/test_descriptors.py | 797 | ||||
-rw-r--r-- | tests/test_etag.py | 451 | ||||
-rw-r--r-- | tests/test_exc.py | 342 | ||||
-rw-r--r-- | tests/test_headers.py | 100 | ||||
-rw-r--r-- | tests/test_multidict.py | 330 | ||||
-rw-r--r-- | tests/test_request.py | 3081 | ||||
-rw-r--r-- | tests/test_request.txt | 553 | ||||
-rw-r--r-- | tests/test_response.py | 820 | ||||
-rw-r--r-- | tests/test_response.txt | 446 |
17 files changed, 6449 insertions, 1874 deletions
diff --git a/tests/performance_test.py b/tests/performance_test.py index 02fba96..79f01af 100644 --- a/tests/performance_test.py +++ b/tests/performance_test.py @@ -1,8 +1,8 @@ #!/usr/bin/env python import webob -from repoze.profile.profiler import AccumulatingProfileMiddleware def make_middleware(app): + from repoze.profile.profiler import AccumulatingProfileMiddleware return AccumulatingProfileMiddleware( app, log_filename='/tmp/profile.log', @@ -11,7 +11,6 @@ def make_middleware(app): path='/__profile__') def simple_app(environ, start_response): - req = webob.Request(environ) resp = webob.Response('Hello world!') return resp(environ, start_response) diff --git a/tests/test_acceptparse.py b/tests/test_acceptparse.py new file mode 100644 index 0000000..07ae07a --- /dev/null +++ b/tests/test_acceptparse.py @@ -0,0 +1,387 @@ +from unittest import TestCase + +class Test_parse_accept_badq(TestCase): + from webob.acceptparse import parse_accept + assert parse_accept("value1; q=0.1.2") == [('value1', 1)] + + +class TestAccept(TestCase): + def Accept(self, *args, **kwargs): + from webob.acceptparse import Accept + return Accept(*args, **kwargs) + + def test_init_accept_content_type(self): + name, value = ('Content-Type', 'text/html') + accept = self.Accept(name, value) + assert accept.header_name == name + assert accept.header_value == value + assert accept._parsed == [('text/html', 1)] + + def test_init_accept_accept_charset(self): + name, value = ('Accept-Charset', 'iso-8859-5, unicode-1-1;q=0.8') + accept = self.Accept(name, value) + assert accept.header_name == name + assert accept.header_value == value + assert accept._parsed == [('iso-8859-5', 1), + ('unicode-1-1', 0.80000000000000004), + ('iso-8859-1', 1)] + + def test_init_accept_accept_charset_with_iso_8859_1(self): + name, value = ('Accept-Charset', 'iso-8859-1') + accept = self.Accept(name, value) + assert accept.header_name == name + assert accept.header_value == value + assert accept._parsed == [('iso-8859-1', 1)] + + def test_init_accept_accept_charset_wildcard(self): + name, value = ('Accept-Charset', '*') + accept = self.Accept(name, value) + assert accept.header_name == name + assert accept.header_value == value + assert accept._parsed == [('*', 1)] + + def test_init_accept_accept_language(self): + name, value = ('Accept-Language', 'da, en-gb;q=0.8, en;q=0.7') + accept = self.Accept(name, value) + assert accept.header_name == name + assert accept.header_value == value + assert accept._parsed == [('da', 1), + ('en-gb', 0.80000000000000004), + ('en', 0.69999999999999996)] + + def test_init_accept_invalid_value(self): + name, value = ('Accept-Language', 'da, q, en-gb;q=0.8') + accept = self.Accept(name, value) + # The "q" value should not be there. + assert accept._parsed == [('da', 1), + ('en-gb', 0.80000000000000004)] + + def test_init_accept_invalid_q_value(self): + name, value = ('Accept-Language', 'da, en-gb;q=foo') + accept = self.Accept(name, value) + # I can't get to cover line 40-41 (webob.acceptparse) as the regex + # will prevent from hitting these lines (aconrad) + assert accept._parsed == [('da', 1), ('en-gb', 1)] + + def test_accept_repr(self): + name, value = ('Content-Type', 'text/html') + accept = self.Accept(name, value) + assert repr(accept) == '<%s at 0x%x %s: %s>' % ('Accept', + abs(id(accept)), + name, + str(accept)) + + def test_accept_str(self): + name, value = ('Content-Type', 'text/html') + accept = self.Accept(name, value) + assert str(accept) == value + + def test_accept_str_with_q_not_1(self): + name, value = ('Content-Type', 'text/html;q=0.5') + accept = self.Accept(name, value) + assert str(accept) == value + + def test_accept_str_with_q_not_1_multiple(self): + name, value = ('Content-Type', 'text/html;q=0.5, foo/bar') + accept = self.Accept(name, value) + assert str(accept) == value + + def test_accept_add_other_accept(self): + accept = self.Accept('Content-Type', 'text/html') + \ + self.Accept('Content-Type', 'foo/bar') + assert str(accept) == 'text/html, foo/bar' + accept += self.Accept('Content-Type', 'bar/baz;q=0.5') + assert str(accept) == 'text/html, foo/bar, bar/baz;q=0.5' + + def test_accept_add_other_list_of_tuples(self): + accept = self.Accept('Content-Type', 'text/html') + accept += [('foo/bar', 1)] + assert str(accept) == 'text/html, foo/bar' + accept += [('bar/baz', 0.5)] + assert str(accept) == 'text/html, foo/bar, bar/baz;q=0.5' + accept += ['she/bangs', 'the/house'] + assert str(accept) == ('text/html, foo/bar, bar/baz;q=0.5, ' + 'she/bangs, the/house') + + def test_accept_add_other_dict(self): + accept = self.Accept('Content-Type', 'text/html') + accept += {'foo/bar': 1} + assert str(accept) == 'text/html, foo/bar' + accept += {'bar/baz': 0.5} + assert str(accept) == 'text/html, foo/bar, bar/baz;q=0.5' + + def test_accept_add_other_empty_str(self): + accept = self.Accept('Content-Type', 'text/html') + accept += '' + assert str(accept) == 'text/html' + + def test_accept_with_no_value_add_other_str(self): + accept = self.Accept('Content-Type', '') + accept += 'text/html' + assert str(accept) == 'text/html' + + def test_contains(self): + accept = self.Accept('Content-Type', 'text/html') + assert 'text/html' in accept + + def test_contains_not(self): + accept = self.Accept('Content-Type', 'text/html') + assert not 'foo/bar' in accept + + def test_quality(self): + accept = self.Accept('Content-Type', 'text/html') + assert accept.quality('text/html') == 1 + accept = self.Accept('Content-Type', 'text/html;q=0.5') + assert accept.quality('text/html') == 0.5 + + def test_quality_not_found(self): + accept = self.Accept('Content-Type', 'text/html') + assert accept.quality('foo/bar') is None + + def test_first_match(self): + accept = self.Accept('Content-Type', 'text/html, foo/bar') + assert accept.first_match(['text/html', 'foo/bar']) == 'text/html' + assert accept.first_match(['foo/bar', 'text/html']) == 'foo/bar' + assert accept.first_match(['xxx/xxx', 'text/html']) == 'text/html' + assert accept.first_match(['xxx/xxx']) == 'xxx/xxx' + assert accept.first_match([None, 'text/html']) is None + assert accept.first_match(['text/html', None]) == 'text/html' + assert accept.first_match(['foo/bar', None]) == 'foo/bar' + self.assertRaises(ValueError, accept.first_match, []) + + def test_best_match(self): + accept = self.Accept('Content-Type', 'text/html, foo/bar') + assert accept.best_match(['text/html', 'foo/bar']) == 'text/html' + assert accept.best_match(['foo/bar', 'text/html']) == 'foo/bar' + assert accept.best_match([('foo/bar', 0.5), + 'text/html']) == 'text/html' + assert accept.best_match([('foo/bar', 0.5), + ('text/html', 0.4)]) == 'foo/bar' + self.assertRaises(ValueError, accept.best_match, ['text/*']) + + def test_best_match_with_one_lower_q(self): + accept = self.Accept('Content-Type', 'text/html, foo/bar;q=0.5') + assert accept.best_match(['text/html', 'foo/bar']) == 'text/html' + accept = self.Accept('Content-Type', 'text/html;q=0.5, foo/bar') + assert accept.best_match(['text/html', 'foo/bar']) == 'foo/bar' + + def test_best_matches(self): + accept = self.Accept('Content-Type', 'text/html, foo/bar') + assert accept.best_matches() == ['text/html', 'foo/bar'] + accept = self.Accept('Content-Type', 'text/html, foo/bar;q=0.5') + assert accept.best_matches() == ['text/html', 'foo/bar'] + accept = self.Accept('Content-Type', 'text/html;q=0.5, foo/bar') + assert accept.best_matches() == ['foo/bar', 'text/html'] + + def test_best_matches_with_fallback(self): + accept = self.Accept('Content-Type', 'text/html, foo/bar') + assert accept.best_matches('xxx/yyy') == ['text/html', + 'foo/bar', + 'xxx/yyy'] + accept = self.Accept('Content-Type', 'text/html;q=0.5, foo/bar') + assert accept.best_matches('xxx/yyy') == ['foo/bar', + 'text/html', + 'xxx/yyy'] + assert accept.best_matches('foo/bar') == ['foo/bar'] + assert accept.best_matches('text/html') == ['foo/bar', 'text/html'] + + def test_accept_match(self): + accept = self.Accept('Content-Type', 'text/html') + #FIXME: Accept._match should be standalone function _match that is + # attached as Accept._match during Accept.__init__. + assert accept._match('*', 'text/html') + assert accept._match('text/html', 'text/html') + assert accept._match('TEXT/HTML', 'text/html') + assert not accept._match('foo/bar', 'text/html') + + def test_accept_match_lang(self): + accept = self.Accept('Accept-Language', 'da, en-gb;q=0.8, en;q=0.7') + #FIXME: Accept._match_lang should be standalone function _match_lang + # that is attached as Accept._match during Accept.__init__. + assert accept._match('*', 'da') + assert accept._match('da', 'DA') + assert accept._match('en', 'en-gb') + assert accept._match('en-gb', 'en-gb') + assert not accept._match('en-gb', 'fr-fr') + + +class TestNilAccept(TestCase): + def NilAccept(self, *args, **kwargs): + from webob.acceptparse import NilAccept + return NilAccept(*args, **kwargs) + + def Accept(self, *args, **kwargs): + from webob.acceptparse import Accept + return Accept(*args, **kwargs) + + def test_init(self): + nilaccept = self.NilAccept('Connection-Close') + assert nilaccept.header_name == 'Connection-Close' + + def test_repr(self): + nilaccept = self.NilAccept('Connection-Close') + assert repr(nilaccept) == ("<NilAccept for Connection-Close: <class " + "'webob.acceptparse.Accept'>>") + + def test_str(self): + nilaccept = self.NilAccept('Connection-Close') + assert str(nilaccept) == '' + + def test_nonzero(self): + nilaccept = self.NilAccept('Connection-Close') + assert not nilaccept + + def test_add(self): + nilaccept = self.NilAccept('Connection-Close') + accept = self.Accept('Content-Type', 'text/html') + assert nilaccept + accept is accept + new_accept = nilaccept + nilaccept + assert isinstance(new_accept, accept.__class__) + assert new_accept.header_name == 'Connection-Close' + assert new_accept.header_value == '' + new_accept = nilaccept + 'foo' + assert isinstance(new_accept, accept.__class__) + assert new_accept.header_name == 'Connection-Close' + assert new_accept.header_value == 'foo' + + def test_radd(self): + nilaccept = self.NilAccept('Connection-Close') + accept = self.Accept('Content-Type', 'text/html') + assert isinstance('foo' + nilaccept, accept.__class__) + assert ('foo' + nilaccept).header_value == 'foo' + # How to test ``if isinstance(item, self.MasterClass): return item`` + # under NilAccept.__radd__ ?? + + def test_radd_masterclass(self): + # Is this "reaching into" __radd__ legit? + nilaccept = self.NilAccept('Connection-Close') + accept = self.Accept('Content-Type', 'text/html') + assert nilaccept.__radd__(accept) is accept + + def test_contains(self): + nilaccept = self.NilAccept('Connection-Close') + # NilAccept.__contains__ always returns True + assert '' in nilaccept + assert 'dummy' in nilaccept + assert nilaccept in nilaccept + + def test_quality(self): + nilaccept = self.NilAccept('Connection-Close') + # NilAccept.quality always returns 0 + assert nilaccept.quality('dummy') == 0 + + def test_first_match(self): + nilaccept = self.NilAccept('Connection-Close') + # NilAccept.first_match always returns element 0 of the list + assert nilaccept.first_match(['dummy', '']) == 'dummy' + assert nilaccept.first_match(['', 'dummy']) == '' + + def test_best_match(self): + nilaccept = self.NilAccept('Connection-Close') + assert nilaccept.best_match(['foo', 'bar']) == 'foo' + assert nilaccept.best_match([('foo', 1), ('bar', 0.5)]) == 'foo' + assert nilaccept.best_match([('foo', 0.5), ('bar', 1)]) == 'bar' + assert nilaccept.best_match([('foo', 0.5), 'bar']) == 'bar' + # default_match has no effect on NilAccept class + assert nilaccept.best_match([('foo', 0.5), 'bar'], + default_match=True) == 'bar' + assert nilaccept.best_match([('foo', 0.5), 'bar'], + default_match=False) == 'bar' + + def test_best_matches(self): + nilaccept = self.NilAccept('Connection-Close') + assert nilaccept.best_matches() == [] + assert nilaccept.best_matches('foo') == ['foo'] + + +class TestNoAccept(TestCase): + def NoAccept(self, *args, **kwargs): + from webob.acceptparse import NoAccept + return NoAccept(*args, **kwargs) + + def test_contains(self): + noaccept = self.NoAccept('Connection-Close') + # NoAccept.__contains__ always returns False + assert not '' in noaccept + assert not True in noaccept + assert not False in noaccept + assert not noaccept in noaccept + + +class TestMIMEAccept(TestCase): + def MIMEAccept(self, *args, **kwargs): + from webob.acceptparse import MIMEAccept + return MIMEAccept(*args, **kwargs) + + def test_init(self): + mimeaccept = self.MIMEAccept('Content-Type', 'image/jpg') + assert mimeaccept._parsed == [('image/jpg', 1)] + mimeaccept = self.MIMEAccept('Content-Type', 'image/png, image/jpg;q=0.5') + assert mimeaccept._parsed == [('image/png', 1), ('image/jpg', 0.5)] + mimeaccept = self.MIMEAccept('Content-Type', 'image, image/jpg;q=0.5') + assert mimeaccept._parsed == [('image/jpg', 0.5)] + mimeaccept = self.MIMEAccept('Content-Type', '*/*') + assert mimeaccept._parsed == [('*/*', 1)] + mimeaccept = self.MIMEAccept('Content-Type', '*/png') + assert mimeaccept._parsed == [] + mimeaccept = self.MIMEAccept('Content-Type', 'image/*') + assert mimeaccept._parsed == [('image/*', 1)] + + def test_accept_html(self): + mimeaccept = self.MIMEAccept('Content-Type', 'image/jpg') + assert not mimeaccept.accept_html() + mimeaccept = self.MIMEAccept('Content-Type', 'image/jpg, text/html') + assert mimeaccept.accept_html() + + def test_match(self): + mimeaccept = self.MIMEAccept('Content-Type', 'image/jpg') + assert mimeaccept._match('image/jpg', 'image/jpg') + assert mimeaccept._match('image/*', 'image/jpg') + assert mimeaccept._match('*/*', 'image/jpg') + assert not mimeaccept._match('text/html', 'image/jpg') + self.assertRaises(AssertionError, mimeaccept._match, 'image/jpg', '*/*') + + +class TestAcceptProperty(TestCase): + def test_accept_property_fget(self): + from webob.acceptparse import accept_property + from webob import Request + desc = accept_property('Accept-Charset', '14.2') + req = Request.blank('/', environ={'envkey': 'envval'}) + desc.fset(req, 'val') + self.assertEqual(desc.fget(req).header_value, 'val') + + def test_accept_property_fget_nil(self): + from webob.acceptparse import NilAccept + from webob.acceptparse import accept_property + from webob import Request + desc = accept_property('Accept-Charset', '14.2') + req = Request.blank('/') + self.assertEqual(type(desc.fget(req)), NilAccept) + + def test_accept_property_fset(self): + from webob.acceptparse import accept_property + from webob import Request + desc = accept_property('Accept-Charset', '14.2') + req = Request.blank('/', environ={'envkey': 'envval'}) + desc.fset(req, 'baz') + self.assertEqual(desc.fget(req).header_value, 'baz') + + def test_accept_property_fset_acceptclass(self): + from webob.acceptparse import accept_property + from webob import Request + desc = accept_property('Accept-Charset', '14.2') + req = Request.blank('/', environ={'envkey': 'envval'}) + desc.fset(req, ['utf-8', 'latin-11']) + self.assertEqual(desc.fget(req).header_value, 'utf-8, latin-11, iso-8859-1') + + def test_accept_property_fdel(self): + from webob.acceptparse import NilAccept + from webob.acceptparse import accept_property + from webob import Request + desc = accept_property('Accept-Charset', '14.2') + req = Request.blank('/', environ={'envkey': 'envval'}) + desc.fset(req, 'val') + assert desc.fget(req).header_value == 'val' + desc.fdel(req) + self.assertEqual(type(desc.fget(req)), NilAccept) diff --git a/tests/test_byterange.py b/tests/test_byterange.py new file mode 100644 index 0000000..daad184 --- /dev/null +++ b/tests/test_byterange.py @@ -0,0 +1,231 @@ +from webob.byterange import Range, ContentRange, _is_content_range_valid + +from nose.tools import assert_true, assert_false, assert_equal, assert_raises + +# Range class + +def test_satisfiable(): + range = Range( ((0,99),) ) + assert_true( range.satisfiable(100) ) + assert_false( range.satisfiable(99) ) + +def test_range_for_length(): + range = Range( ((0,99), (100,199) ) ) + assert_equal( range.range_for_length( 'None'), None ) + +def test_range_content_range_length_none(): + range = Range( ((0, 100),) ) + assert_equal( range.content_range( None ), None ) + +def test_range_content_range_length_ok(): + range = Range( ((0, 100),) ) + assert_true( range.content_range( 100 ).__class__, ContentRange ) + +def test_range_for_length_more_than_one_range(): + # More than one range + range = Range( ((0,99), (100,199) ) ) + assert_equal( range.range_for_length(100), None ) + +def test_range_for_length_one_range_and_length_none(): + # One range and length is None + range = Range( ((0,99), ) ) + assert_equal( range.range_for_length( None ), None ) + +def test_range_for_length_end_is_none(): + # End is None + range = Range( ((0, None), ) ) + assert_equal( range.range_for_length(100), (0,100) ) + +def test_range_for_length_end_is_none_negative_start(): + # End is None and start is negative + range = Range( ((-5, None), ) ) + assert_equal( range.range_for_length(100), (95,100) ) + +def test_range_start_none(): + # Start is None + range = Range( ((None, 99), ) ) + assert_equal( range.range_for_length(100), None ) + +def test_range_str_end_none(): + range = Range( ((0, 100), ) ) + # Manually set test values + range.ranges = ( (0, None), ) + assert_equal( range.__str__(), 'bytes=0-' ) + +def test_range_str_end_none_negative_start(): + range = Range( ((0, 100), ) ) + # Manually set test values + range.ranges = ( (-5, None), ) + assert_equal( range.__str__(), 'bytes=-5' ) + +def test_range_str_1(): + # Single range + range = Range( ((0, 100), ) ) + assert_equal( str(range), 'bytes=0-99' ) + +def test_range_str_2(): + # Two ranges + range = Range( ((0, 100), (101, 200) ) ) + assert_equal( str(range), 'bytes=0-99,101-199' ) + +def test_range_str_3(): + # Negative start + range = Range( ((-1, 100),) ) + assert_raises( ValueError, range.__str__ ) + +def test_range_str_4(): + # Negative end + range = Range( ((0, 100),) ) + # Manually set test values + range.ranges = ( (0, -100), ) + assert_raises( ValueError, range.__str__ ) + +def test_range_repr(): + range = Range( ((0, 99),) ) + assert_true( range.__repr__(), '<Range bytes 0-98>' ) + +def test_parse_valid_input(): + range = Range( ((0, 100),) ) + assert_equal( range.parse( 'bytes=0-99' ).__class__, Range ) + +def test_parse_missing_equals_sign(): + range = Range( ((0, 100),) ) + assert_equal( range.parse( 'bytes 0-99' ), None ) + +def test_parse_invalid_units(): + range = Range( ((0, 100),) ) + assert_equal( range.parse( 'words=0-99' ), None ) + +def test_parse_bytes_valid_input(): + range = Range( ((0, 100),) ) + assert_equal( range.parse_bytes( 'bytes=0-99' ), ('bytes', [(0,100)] ) ) + +def test_parse_bytes_missing_equals_sign(): + range = Range( ((0, 100),) ) + assert_equal( range.parse_bytes( 'bytes 0-99'), None ) + +def test_parse_bytes_missing_dash(): + range = Range( ((0, 100),) ) + assert_equal( range.parse_bytes( 'bytes=0 99'), None ) + +def test_parse_bytes_null_input(): + range = Range( ((0, 100),) ) + assert_raises( TypeError, range.parse_bytes, '' ) + +def test_parse_bytes_two_many_ranges(): + range = Range( ((0, 100),) ) + assert_equal( range.parse_bytes( 'bytes=-100,-100' ), None ) + +def test_parse_bytes_negative_start(): + range = Range( ((0, 100),) ) + assert_equal( range.parse_bytes( 'bytes=-0-99' ), None ) + +def test_parse_bytes_start_greater_than_end(): + range = Range( ((0, 100),) ) + assert_equal( range.parse_bytes( 'bytes=99-0'), None ) + +def test_parse_bytes_start_greater_than_last_end(): + range = Range( ((0, 100),) ) + assert_equal( range.parse_bytes( 'bytes=0-99,0-199'), None ) + +def test_parse_bytes_only_start(): + range = Range( ((0, 100),) ) + assert_equal( range.parse_bytes( 'bytes=0-'), ('bytes', [(0, None)]) ) + +# ContentRange class + +def test_contentrange_bad_input(): + assert_raises( ValueError, ContentRange, None, 99, None ) + +def test_contentrange_repr(): + contentrange = ContentRange( 0, 99, 100 ) + assert_true( contentrange.__repr__(), '<ContentRange bytes 0-98/100>' ) + +def test_contentrange_str_length_none(): + contentrange = ContentRange( 0, 99, 100 ) + contentrange.length = None + assert_equal( contentrange.__str__(), 'bytes 0-98/*' ) + +def test_contentrange_str_start_none(): + contentrange = ContentRange( 0, 99, 100 ) + contentrange.start = None + contentrange.stop = None + assert_equal( contentrange.__str__(), 'bytes */100' ) + +def test_contentrange_iter(): + contentrange = ContentRange( 0, 99, 100 ) + assert_true( type(contentrange.__iter__()), iter ) + +def test_cr_parse_ok(): + contentrange = ContentRange( 0, 99, 100 ) + assert_true( contentrange.parse( 'bytes 0-99/100' ).__class__, ContentRange ) + +def test_cr_parse_none(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( None ), None ) + +def test_cr_parse_no_bytes(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( '0-99 100' ), None ) + +def test_cr_parse_missing_slash(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( 'bytes 0-99 100' ), None ) + +def test_cr_parse_invalid_length(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( 'bytes 0-99/xxx' ), None ) + +def test_cr_parse_no_range(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( 'bytes 0 99/100' ), None ) + +def test_cr_parse_range_star(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( 'bytes */100' ).__class__, ContentRange ) + +def test_cr_parse_parse_problem_1(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( 'bytes A-99/100' ), None ) + +def test_cr_parse_parse_problem_2(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( 'bytes 0-B/100' ), None ) + +def test_cr_parse_content_invalid(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse( 'bytes 99-0/100' ), None ) + +def test_contentrange_str_length_start(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( contentrange.parse('bytes 0 99/*'), None ) + +# _is_content_range_valid function + +def test_is_content_range_valid_start_none(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( _is_content_range_valid( None, 99, 90), False ) + +def test_is_content_range_valid_stop_none(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( _is_content_range_valid( 99, None, 90), False ) + +def test_is_content_range_valid_start_stop_none(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( _is_content_range_valid( None, None, 90), True ) + +def test_is_content_range_valid_start_none(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( _is_content_range_valid( None, 99, 90), False ) + +def test_is_content_range_valid_length_none(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( _is_content_range_valid( 0, 99, None), True ) + +def test_is_content_range_valid_stop_greater_than_length_response(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( _is_content_range_valid( 0, 99, 90, response=True), False ) + +def test_is_content_range_valid_stop_greater_than_length(): + contentrange = ContentRange( 0, 99, 100 ) + assert_equal( _is_content_range_valid( 0, 99, 90), True ) diff --git a/tests/test_cachecontrol.py b/tests/test_cachecontrol.py new file mode 100644 index 0000000..2388020 --- /dev/null +++ b/tests/test_cachecontrol.py @@ -0,0 +1,266 @@ +from nose.tools import eq_ +from nose.tools import raises +import unittest + + +def test_cache_control_object_max_age_None(): + from webob.cachecontrol import CacheControl + cc = CacheControl({}, 'a') + cc.properties['max-age'] = None + eq_(cc.max_age, -1) + + +class TestUpdateDict(unittest.TestCase): + + def setUp(self): + self.call_queue = [] + def callback(args): + self.call_queue.append("Called with: %s" % repr(args)) + self.callback = callback + + def make_one(self, callback): + from webob.cachecontrol import UpdateDict + ud = UpdateDict() + ud.updated = callback + return ud + + def test_clear(self): + newone = self.make_one(self.callback) + newone['first'] = 1 + assert len(newone) == 1 + newone.clear() + assert len(newone) == 0 + + def test_update(self): + newone = self.make_one(self.callback) + d = {'one' : 1 } + newone.update(d) + assert newone == d + + def test_set_delete(self): + newone = self.make_one(self.callback) + newone['first'] = 1 + assert len(self.call_queue) == 1 + assert self.call_queue[-1] == "Called with: {'first': 1}" + + del newone['first'] + assert len(self.call_queue) == 2 + assert self.call_queue[-1] == 'Called with: {}' + + def test_setdefault(self): + newone = self.make_one(self.callback) + assert newone.setdefault('haters', 'gonna-hate') == 'gonna-hate' + assert len(self.call_queue) == 1 + assert self.call_queue[-1] == "Called with: {'haters': 'gonna-hate'}", self.call_queue[-1] + + # no effect if failobj is not set + assert newone.setdefault('haters', 'gonna-love') == 'gonna-hate' + assert len(self.call_queue) == 1 + + def test_pop(self): + newone = self.make_one(self.callback) + newone['first'] = 1 + newone.pop('first') + assert len(self.call_queue) == 2 + assert self.call_queue[-1] == 'Called with: {}', self.call_queue[-1] + + def test_popitem(self): + newone = self.make_one(self.callback) + newone['first'] = 1 + assert newone.popitem() == ('first', 1) + assert len(self.call_queue) == 2 + assert self.call_queue[-1] == 'Called with: {}', self.call_queue[-1] + + def test_callback_args(self): + assert True + #assert False + + +class TestExistProp(unittest.TestCase): + """ + Test webob.cachecontrol.exists_property + """ + + def setUp(self): + pass + + def make_one(self): + from webob.cachecontrol import exists_property + + class Dummy(object): + properties = dict(prop=1) + type = 'dummy' + prop = exists_property('prop', 'dummy') + badprop = exists_property('badprop', 'big_dummy') + + return Dummy + + def test_get_on_class(self): + from webob.cachecontrol import exists_property + Dummy = self.make_one() + assert isinstance(Dummy.prop, exists_property), Dummy.prop + + def test_get_on_instance(self): + obj = self.make_one()() + assert obj.prop is True + + @raises(AttributeError) + def test_type_mismatch_raise(self): + obj = self.make_one()() + obj.badprop = True + + def test_set_w_value(self): + obj = self.make_one()() + obj.prop = True + assert obj.prop is True + assert obj.properties['prop'] is None + + def test_del_value(self): + obj = self.make_one()() + del obj.prop + assert not 'prop' in obj.properties + + +class TestValueProp(unittest.TestCase): + """ + Test webob.cachecontrol.exists_property + """ + + def setUp(self): + pass + + def make_one(self): + from webob.cachecontrol import value_property + + class Dummy(object): + properties = dict(prop=1) + type = 'dummy' + prop = value_property('prop', 'dummy') + badprop = value_property('badprop', 'big_dummy') + + return Dummy + + def test_get_on_class(self): + from webob.cachecontrol import value_property + Dummy = self.make_one() + assert isinstance(Dummy.prop, value_property), Dummy.prop + + def test_get_on_instance(self): + dummy = self.make_one()() + assert dummy.prop, dummy.prop + #assert isinstance(Dummy.prop, value_property), Dummy.prop + + def test_set_on_instance(self): + dummy = self.make_one()() + dummy.prop = "new" + assert dummy.prop == "new", dummy.prop + assert dummy.properties['prop'] == "new", dict(dummy.properties) + + def test_set_on_instance_bad_attribute(self): + dummy = self.make_one()() + dummy.prop = "new" + assert dummy.prop == "new", dummy.prop + assert dummy.properties['prop'] == "new", dict(dummy.properties) + + def test_set_wrong_type(self): + from webob.cachecontrol import value_property + class Dummy(object): + properties = dict(prop=1, type='fail') + type = 'dummy' + prop = value_property('prop', 'dummy', type='failingtype') + dummy = Dummy() + def assign(): + dummy.prop = 'foo' + self.assertRaises(AttributeError, assign) + + def test_set_type_true(self): + dummy = self.make_one()() + dummy.prop = True + self.assertEquals(dummy.prop, None) + + def test_set_on_instance_w_default(self): + dummy = self.make_one()() + dummy.prop = "dummy" + assert dummy.prop == "dummy", dummy.prop + #@@ this probably needs more tests + + def test_del(self): + dummy = self.make_one()() + dummy.prop = 'Ian Bicking likes to skip' + del dummy.prop + assert dummy.prop == "dummy", dummy.prop + + +def test_copy_cc(): + from webob.cachecontrol import CacheControl + cc = CacheControl({'header':'%', "msg":'arewerichyet?'}, 'request') + cc2 = cc.copy() + assert cc.properties is not cc2.properties + assert cc.type is cc2.type + +# 212 + +def test_serialize_cache_control_emptydict(): + from webob.cachecontrol import serialize_cache_control + result = serialize_cache_control(dict()) + assert result == '' + +def test_serialize_cache_control_cache_control_object(): + from webob.cachecontrol import serialize_cache_control, CacheControl + result = serialize_cache_control(CacheControl({}, 'request')) + assert result == '' + +def test_serialize_cache_control_object_with_headers(): + from webob.cachecontrol import serialize_cache_control, CacheControl + result = serialize_cache_control(CacheControl({'header':'a'}, 'request')) + assert result == 'header=a' + +def test_serialize_cache_control_value_is_None(): + from webob.cachecontrol import serialize_cache_control, CacheControl + result = serialize_cache_control(CacheControl({'header':None}, 'request')) + assert result == 'header' + +def test_serialize_cache_control_value_needs_quote(): + from webob.cachecontrol import serialize_cache_control, CacheControl + result = serialize_cache_control(CacheControl({'header':'""'}, 'request')) + assert result == 'header=""""' + +class TestCacheControl(unittest.TestCase): + def make_one(self, props, typ): + from webob.cachecontrol import CacheControl + return CacheControl(props, typ) + + def test_ctor(self): + cc = self.make_one({'a':1}, 'typ') + self.assertEquals(cc.properties, {'a':1}) + self.assertEquals(cc.type, 'typ') + + def test_parse(self): + from webob.cachecontrol import CacheControl + cc = CacheControl.parse("public, max-age=315360000") + self.assertEquals(type(cc), CacheControl) + self.assertEquals(cc.max_age, 315360000) + self.assertEquals(cc.public, True) + + def test_parse_updates_to(self): + from webob.cachecontrol import CacheControl + def foo(arg): return { 'a' : 1 } + cc = CacheControl.parse("public, max-age=315360000", updates_to=foo) + self.assertEquals(type(cc), CacheControl) + self.assertEquals(cc.max_age, 315360000) + + def test_parse_valueerror_int(self): + from webob.cachecontrol import CacheControl + def foo(arg): return { 'a' : 1 } + cc = CacheControl.parse("public, max-age=abc") + self.assertEquals(type(cc), CacheControl) + self.assertEquals(cc.max_age, 'abc') + + def test_repr(self): + cc = self.make_one({'a':'1'}, 'typ') + result = repr(cc) + self.assertEqual(result, "<CacheControl 'a=1'>") + + + + diff --git a/tests/test_cookies.py b/tests/test_cookies.py index 9a66101..b985fce 100644 --- a/tests/test_cookies.py +++ b/tests/test_cookies.py @@ -1,20 +1,21 @@ -# -*- coding: UTF-8 -*- +# -*- coding: utf-8 -*- from datetime import timedelta from webob import cookies -from nose.tools import ok_, assert_raises, eq_ +from nose.tools import eq_ -def test_cookie(): - """ - Test cookie parsing, serialization and `repr` - """ +def test_cookie_empty(): c = cookies.Cookie() # empty cookie eq_(repr(c), '<Cookie: []>') - # a cookie with one value + +def test_cookie_one_value(): c = cookies.Cookie('dismiss-top=6') eq_(repr(c), "<Cookie: [<Morsel: dismiss-top='6'>]>") + +def test_cookie_one_value_with_trailing_semi(): c = cookies.Cookie('dismiss-top=6;') eq_(repr(c), "<Cookie: [<Morsel: dismiss-top='6'>]>") - # more complex cookie, (also mixing commas and semicolons) + +def test_cookie_complex(): c = cookies.Cookie('dismiss-top=6; CP=null*, '\ 'PHPSESSID=0a539d42abc001cdc762809248d4beed, a="42,"') c_dict = dict((k,v.value) for k,v in c.items()) @@ -23,10 +24,32 @@ def test_cookie(): 'PHPSESSID': '0a539d42abc001cdc762809248d4beed', 'dismiss-top': '6' }) + +def test_cookie_complex_serialize(): + c = cookies.Cookie('dismiss-top=6; CP=null*, '\ + 'PHPSESSID=0a539d42abc001cdc762809248d4beed, a="42,"') eq_(c.serialize(), 'CP=null*, PHPSESSID=0a539d42abc001cdc762809248d4beed, a="42,", ' 'dismiss-top=6') - # reserved keys ($xx) + +def test_cookie_load_multiple(): + c = cookies.Cookie('a=1; Secure=true') + eq_(repr(c), "<Cookie: [<Morsel: a='1'>]>") + eq_(c['a']['secure'], 'true') + +def test_cookie_secure(): + c = cookies.Cookie() + c['foo'] = 'bar' + c['foo'].secure = True + eq_(c.serialize(), 'foo=bar; secure') + +def test_cookie_httponly(): + c = cookies.Cookie() + c['foo'] = 'bar' + c['foo'].httponly = True + eq_(c.serialize(), 'foo=bar; HttpOnly') + +def test_cookie_reserved_keys(): c = cookies.Cookie('dismiss-top=6; CP=null*; $version=42; a=42') assert '$version' not in c c = cookies.Cookie('$reserved=42; a=$42') @@ -37,20 +60,15 @@ def test_serialize_cookie_date(): Testing webob.cookies.serialize_cookie_date. Missing scenarios: * input value is an str, should be returned verbatim - * input value is an int, should be converted to timedelta and we should - continue the rest of the process + * input value is an int, should be converted to timedelta and we + should continue the rest of the process """ - ok_(cookies.serialize_cookie_date('Tue, 04-Jan-2011 13:43:50 GMT')==\ - 'Tue, 04-Jan-2011 13:43:50 GMT', 'We passed a string, should get the ' - 'same one') - ok_(cookies.serialize_cookie_date(None) is None, 'We passed None, should ' - 'get None') - + eq_(cookies.serialize_cookie_date('Tue, 04-Jan-2011 13:43:50 GMT'), + 'Tue, 04-Jan-2011 13:43:50 GMT') + eq_(cookies.serialize_cookie_date(None), None) cdate_delta = cookies.serialize_cookie_date(timedelta(seconds=10)) cdate_int = cookies.serialize_cookie_date(10) - eq_(cdate_delta, cdate_int, - 'Passing a int to method should return the same result as passing a timedelta' - ) + eq_(cdate_delta, cdate_int) def test_ch_unquote(): eq_(cookies._unquote(u'"hello world'), u'"hello world') @@ -66,3 +84,30 @@ def test_ch_unquote(): ]: eq_(cookies._unquote(q), unq) eq_(cookies._quote(unq), q) + +def test_cookie_setitem_needs_quoting(): + c = cookies.Cookie() + c['La Pe\xc3\xb1a'] = '1' + eq_(len(c), 0) + +def test_morsel_serialize_with_expires(): + morsel = cookies.Morsel('bleh', 'blah') + morsel.expires = 'Tue, 04-Jan-2011 13:43:50 GMT' + result = morsel.serialize() + eq_(result, 'bleh=blah; expires="Tue, 04-Jan-2011 13:43:50 GMT"') + +def test_serialize_max_age_timedelta(): + import datetime + val = datetime.timedelta(86400) + result = cookies.serialize_max_age(val) + eq_(result, '7464960000') + +def test_serialize_max_age_int(): + val = 86400 + result = cookies.serialize_max_age(val) + eq_(result, '86400') + +def test_serialize_max_age_str(): + val = '86400' + result = cookies.serialize_max_age(val) + eq_(result, '86400') diff --git a/tests/test_datetime_utils.py b/tests/test_datetime_utils.py index 3b91808..bcaf541 100644 --- a/tests/test_datetime_utils.py +++ b/tests/test_datetime_utils.py @@ -1,11 +1,19 @@ -# -*- coding: UTF-8 -*- +# -*- coding: utf-8 -*- +import datetime import calendar -from datetime import * from rfc822 import formatdate from webob import datetime_utils from nose.tools import ok_, eq_, assert_raises +def test_UTC(): + """Test missing function in _UTC""" + x = datetime_utils.UTC + ok_(x.tzname(datetime.datetime.now())=='UTC') + eq_(x.dst(datetime.datetime.now()), datetime.timedelta(0)) + eq_(x.utcoffset(datetime.datetime.now()), datetime.timedelta(0)) + eq_(repr(x), 'UTC') + def test_parse_date(): """Testing datetime_utils.parse_date. We need to verify the following scenarios: @@ -30,9 +38,11 @@ def test_parse_date(): "to parse_date. We should get None but instead we got %s" %\ ret) ret = datetime_utils.parse_date('Mon, 20 Nov 1995 19:12:08 -0500') - eq_(ret, datetime(1995, 11, 21, 0, 12, 8, tzinfo=datetime_utils.UTC)) + eq_(ret, datetime.datetime( + 1995, 11, 21, 0, 12, 8, tzinfo=datetime_utils.UTC)) ret = datetime_utils.parse_date('Mon, 20 Nov 1995 19:12:08') - eq_(ret, datetime(1995, 11, 20, 19, 12, 8, tzinfo=datetime_utils.UTC)) + eq_(ret, + datetime.datetime(1995, 11, 20, 19, 12, 8, tzinfo=datetime_utils.UTC)) def test_serialize_date(): """Testing datetime_utils.serialize_date @@ -44,8 +54,10 @@ def test_serialize_date(): ret = datetime_utils.serialize_date(u'Mon, 20 Nov 1995 19:12:08 GMT') assert type(ret) is (str) eq_(ret, 'Mon, 20 Nov 1995 19:12:08 GMT') - dt = formatdate(calendar.timegm((datetime.now()+timedelta(1)).timetuple())) - eq_(dt, datetime_utils.serialize_date(timedelta(1))) + dt = formatdate( + calendar.timegm( + (datetime.datetime.now()+datetime.timedelta(1)).timetuple())) + eq_(dt, datetime_utils.serialize_date(datetime.timedelta(1))) assert_raises(ValueError, datetime_utils.serialize_date, None) def test_parse_date_delta(): @@ -58,7 +70,18 @@ def test_parse_date_delta(): ok_(datetime_utils.parse_date_delta(None) is None, 'Passing none value, ' 'should return None') ret = datetime_utils.parse_date_delta('Mon, 20 Nov 1995 19:12:08 -0500') - eq_(ret, datetime(1995, 11, 21, 0, 12, 8, tzinfo=datetime_utils.UTC)) + eq_(ret, datetime.datetime( + 1995, 11, 21, 0, 12, 8, tzinfo=datetime_utils.UTC)) + WHEN = datetime.datetime(2011, 3, 16, 10, 10, 37, tzinfo=datetime_utils.UTC) + #with _NowRestorer(WHEN): Dammit, only Python 2.5 w/ __future__ + nr = _NowRestorer(WHEN) + nr.__enter__() + try: + ret = datetime_utils.parse_date_delta(1) + eq_(ret, WHEN + datetime.timedelta(0, 1)) + finally: + nr.__exit__(None, None, None) + def test_serialize_date_delta(): """Testing datetime_utils.serialize_date_delta @@ -66,13 +89,29 @@ def test_serialize_date_delta(): * if we pass something that's not an int or float, it should delegate the task to serialize_date """ + eq_(datetime_utils.serialize_date_delta(1), '1') + eq_(datetime_utils.serialize_date_delta(1.5), '1') ret = datetime_utils.serialize_date_delta(u'Mon, 20 Nov 1995 19:12:08 GMT') assert type(ret) is (str) eq_(ret, 'Mon, 20 Nov 1995 19:12:08 GMT') -def test_UTC(): - """Test missing function in _UTC""" - x = datetime_utils.UTC - ok_(x.tzname(datetime.now())=='UTC') +def test_timedelta_to_seconds(): + val = datetime.timedelta(86400) + result = datetime_utils.timedelta_to_seconds(val) + eq_(result, 7464960000) + + +class _NowRestorer(object): + + def __init__(self, new_NOW): + self._new_NOW = new_NOW + self._old_NOW = None + def __enter__(self): + import webob.datetime_utils + self._old_NOW = webob.datetime_utils._NOW + webob.datetime_utils._NOW = self._new_NOW + def __exit__(self, exc_type, exc_value, traceback): + import webob.datetime_utils + webob.datetime_utils._NOW = self._old_NOW diff --git a/tests/test_dec.py b/tests/test_dec.py new file mode 100644 index 0000000..0f65590 --- /dev/null +++ b/tests/test_dec.py @@ -0,0 +1,267 @@ +import unittest +from webob import Request +from webob import Response +from webob.dec import _format_args +from webob.dec import _func_name +from webob.dec import wsgify + +class DecoratorTests(unittest.TestCase): + def _testit(self, app, req): + if isinstance(req, basestring): + req = Request.blank(req) + resp = req.get_response(app) + return resp + + def test_wsgify(self): + resp_str = 'hey, this is a test: %s' + @wsgify + def test_app(req): + return resp_str % req.url + resp = self._testit(test_app, '/a url') + self.assertEqual(resp.body, resp_str % 'http://localhost/a%20url') + self.assertEqual(resp.content_length, 45) + self.assertEqual(resp.content_type, 'text/html') + self.assertEqual(resp.charset, 'UTF-8') + self.assertEqual('%r' % (test_app,), 'wsgify(tests.test_dec.test_app)') + + def test_wsgify_empty_repr(self): + self.assertEqual('%r' % (wsgify(),), 'wsgify()') + + def test_wsgify_args(self): + resp_str = 'hey hey my my' + @wsgify(args=(resp_str,)) + def test_app(req, strarg): + return strarg + resp = self._testit(test_app, '/a url') + self.assertEqual(resp.body, resp_str) + self.assertEqual(resp.content_length, 13) + self.assertEqual(resp.content_type, 'text/html') + self.assertEqual(resp.charset, 'UTF-8') + self.assertEqual('%r' % (test_app,), + "wsgify(tests.test_dec.test_app, args=('%s',))" % resp_str) + + def test_wsgify_kwargs(self): + resp_str = 'hey hey my my' + @wsgify(kwargs=dict(strarg=resp_str)) + def test_app(req, strarg=''): + return strarg + resp = self._testit(test_app, '/a url') + self.assertEqual(resp.body, resp_str) + self.assertEqual(resp.content_length, 13) + self.assertEqual(resp.content_type, 'text/html') + self.assertEqual(resp.charset, 'UTF-8') + self.assertEqual('%r' % test_app, + "wsgify(tests.test_dec.test_app, " + "kwargs={'strarg': '%s'})" % resp_str) + + def test_wsgify_raise_httpexception(self): + from webob.exc import HTTPBadRequest + @wsgify + def test_app(req): + import sys + if sys.version_info < (2,5): + raise HTTPBadRequest(None).exception + raise HTTPBadRequest + resp = self._testit(test_app, '/a url') + self.assert_(resp.body.startswith('400 Bad Request')) + self.assertEqual(resp.content_type, 'text/plain') + self.assertEqual(resp.charset, 'UTF-8') + self.assertEqual('%r' % test_app, + "wsgify(tests.test_dec.test_app)") + + def test_wsgify_no___get__(self): + # use a class instance instead of a fn so we wrap something w/ + # no __get__ + class TestApp(object): + def __call__(self, req): + return 'nothing to see here' + test_app = wsgify(TestApp()) + resp = self._testit(test_app, '/a url') + self.assertEqual(resp.body, 'nothing to see here') + self.assert_(test_app.__get__(test_app) is test_app) + + def test_wsgify_args_no_func(self): + test_app = wsgify(None, args=(1,)) + self.assertRaises(TypeError, self._testit, test_app, '/a url') + + def test_wsgify_wrong_sig(self): + @wsgify + def test_app(req): + return 'What have you done for me lately?' + req = dict() + self.assertRaises(TypeError, test_app, req, 1, 2) + self.assertRaises(TypeError, test_app, req, 1, key='word') + + def test_wsgify_none_response(self): + @wsgify + def test_app(req): + return + resp = self._testit(test_app, '/a url') + self.assertEqual(resp.body, '') + self.assertEqual(resp.content_type, 'text/html') + self.assertEqual(resp.content_length, 0) + + def test_wsgify_get(self): + resp_str = "What'choo talkin' about, Willis?" + @wsgify + def test_app(req): + return Response(resp_str) + resp = test_app.get('/url/path') + self.assertEqual(resp.body, resp_str) + + def test_wsgify_post(self): + post_dict = dict(speaker='Robin', + words='Holy test coverage, Batman!') + @wsgify + def test_app(req): + return Response('%s: %s' % (req.POST['speaker'], + req.POST['words'])) + resp = test_app.post('/url/path', post_dict) + self.assertEqual(resp.body, '%s: %s' % (post_dict['speaker'], + post_dict['words'])) + + def test_wsgify_request_method(self): + resp_str = 'Nice body!' + @wsgify + def test_app(req): + self.assertEqual(req.method, 'PUT') + return Response(req.body) + resp = test_app.request('/url/path', method='PUT', + body=resp_str) + self.assertEqual(resp.body, resp_str) + self.assertEqual(resp.content_length, 10) + self.assertEqual(resp.content_type, 'text/html') + + def test_wsgify_undecorated(self): + def test_app(req): + return Response('whoa') + wrapped_test_app = wsgify(test_app) + self.assert_(wrapped_test_app.undecorated is test_app) + + def test_wsgify_custom_request(self): + resp_str = 'hey, this is a test: %s' + class MyRequest(Request): + pass + @wsgify(RequestClass=MyRequest) + def test_app(req): + return resp_str % req.url + resp = self._testit(test_app, '/a url') + self.assertEqual(resp.body, resp_str % 'http://localhost/a%20url') + self.assertEqual(resp.content_length, 45) + self.assertEqual(resp.content_type, 'text/html') + self.assertEqual(resp.charset, 'UTF-8') + self.assertEqual('%r' % (test_app,), "wsgify(tests.test_dec.test_app, " + "RequestClass=<class 'tests.test_dec.MyRequest'>)") + + def test_middleware(self): + resp_str = "These are the vars: %s" + @wsgify.middleware + def set_urlvar(req, app, **vars): + req.urlvars.update(vars) + return app(req) + from webob.dec import _MiddlewareFactory + self.assert_(set_urlvar.__class__ is _MiddlewareFactory) + repr = '%r' % (set_urlvar,) + self.assert_(repr.startswith('wsgify.middleware(<function set_urlvar at ')) + @wsgify + def show_vars(req): + return resp_str % (sorted(req.urlvars.items())) + show_vars2 = set_urlvar(show_vars, a=1, b=2) + self.assertEqual('%r' % (show_vars2,), + 'wsgify.middleware(tests.test_dec.set_urlvar)' + '(wsgify(tests.test_dec.show_vars), a=1, b=2)') + resp = self._testit(show_vars2, '/path') + self.assertEqual(resp.body, resp_str % "[('a', 1), ('b', 2)]") + self.assertEqual(resp.content_type, 'text/html') + self.assertEqual(resp.charset, 'UTF-8') + self.assertEqual(resp.content_length, 40) + + def test_unbound_middleware(self): + @wsgify + def test_app(req): + return Response('Say wha!?') + unbound = wsgify.middleware(None, test_app, some='thing') + from webob.dec import _UnboundMiddleware + self.assert_(unbound.__class__ is _UnboundMiddleware) + self.assertEqual(unbound.kw, dict(some='thing')) + self.assertEqual('%r' % (unbound,), + "wsgify.middleware(wsgify(tests.test_dec.test_app), " + "some='thing')") + @unbound + def middle(req, app, **kw): + return app(req) + self.assert_(middle.__class__ is wsgify) + self.assertEqual('%r' % (middle,), + "wsgify.middleware(tests.test_dec.middle)" + "(wsgify(tests.test_dec.test_app), some='thing')") + + def test_unbound_middleware_no_app(self): + unbound = wsgify.middleware(None, None) + from webob.dec import _UnboundMiddleware + self.assert_(unbound.__class__ is _UnboundMiddleware) + self.assertEqual(unbound.kw, dict()) + self.assertEqual('%r' % (unbound,), + "wsgify.middleware()") + + def test_classapp(self): + class HostMap(dict): + @wsgify + def __call__(self, req): + return self[req.host.split(':')[0]] + app = HostMap() + app['example.com'] = Response('1') + app['other.com'] = Response('2') + resp = Request.blank('http://example.com/').get_response(wsgify(app)) + self.assertEqual(resp.content_type, 'text/html') + self.assertEqual(resp.charset, 'UTF-8') + self.assertEqual(resp.content_length, 1) + self.assertEqual(resp.body, '1') + + def test__func_name(self): + def func(): + pass + name = _func_name(func) + self.assertEqual(name, 'tests.test_dec.func') + name = _func_name('a') + self.assertEqual(name, "'a'") + class Klass(object): + @classmethod + def classmeth(cls): + pass + def meth(self): + pass + name = _func_name(Klass) + self.assertEqual(name, 'tests.test_dec.Klass') + k = Klass() + kname = _func_name(k) + self.assert_(kname.startswith('<tests.test_dec.Klass object at 0x')) + name = _func_name(k.meth) + self.assert_(name.startswith('tests.test_dec.%s' % kname)) + self.assert_(name.endswith('>.meth')) + name = _func_name(Klass.meth) + self.assertEqual(name, 'tests.test_dec.Klass.meth') + name = _func_name(Klass.classmeth) + self.assertEqual(name, "tests.test_dec.<class " + "'tests.test_dec.Klass'>.classmeth") + + def test__format_args(self): + args_rep = _format_args() + self.assertEqual(args_rep, '') + kw = dict(a=4, b=5, c=6) + args_rep = _format_args(args=(1, 2, 3), kw=kw) + self.assertEqual(args_rep, '1, 2, 3, a=4, b=5, c=6') + args_rep = _format_args(args=(1, 2, 3), kw=kw, leading_comma=True) + self.assertEqual(args_rep, ', 1, 2, 3, a=4, b=5, c=6') + class Klass(object): + a = 1 + b = 2 + c = 3 + names = ['a', 'b', 'c'] + obj = Klass() + self.assertRaises(AssertionError, _format_args, names=names) + args_rep = _format_args(obj=obj, names='a') + self.assertEqual(args_rep, 'a=1') + args_rep = _format_args(obj=obj, names=names) + self.assertEqual(args_rep, 'a=1, b=2, c=3') + args_rep = _format_args(kw=kw, defaults=dict(a=4, b=5)) + self.assertEqual(args_rep, 'c=6') diff --git a/tests/test_dec.txt b/tests/test_dec.txt deleted file mode 100644 index 40a0fc8..0000000 --- a/tests/test_dec.txt +++ /dev/null @@ -1,103 +0,0 @@ -A test of the decorator module:: - - >>> from dtopt import ELLIPSIS - >>> from webob.dec import wsgify - >>> from webob import Response, Request - >>> from webob import exc - >>> @wsgify - ... def test_app(req): - ... return 'hey, this is a test: %s' % req.url - >>> def testit(app, req): - ... if isinstance(req, basestring): - ... req = Request.blank(req) - ... resp = req.get_response(app) - ... print resp - >>> testit(test_app, '/a url') - 200 OK - Content-Type: text/html; charset=UTF-8 - Content-Length: 45 - <BLANKLINE> - hey, this is a test: http://localhost/a%20url - >>> test_app - wsgify(test_app) - -Now some middleware testing:: - - >>> @wsgify.middleware - ... def set_urlvar(req, app, **vars): - ... req.urlvars.update(vars) - ... return app(req) - >>> @wsgify - ... def show_vars(req): - ... return 'These are the vars: %r' % (sorted(req.urlvars.items())) - >>> show_vars2 = set_urlvar(show_vars, a=1, b=2) - >>> show_vars2 - wsgify.middleware(set_urlvar)(wsgify(show_vars), a=1, b=2) - >>> testit(show_vars2, '/path') - 200 OK - Content-Type: text/html; charset=UTF-8 - Content-Length: 40 - <BLANKLINE> - These are the vars: [('a', 1), ('b', 2)] - -Some examples from Sergey:: - - >>> class HostMap(dict): - ... @wsgify - ... def __call__(self, req): - ... return self[req.host.split(':')[0]] - >>> app = HostMap() - >>> app['example.com'] = Response('1') - >>> app['other.com'] = Response('2') - >>> print Request.blank('http://example.com/').get_response(wsgify(app)) - 200 OK - Content-Type: text/html; charset=UTF-8 - Content-Length: 1 - <BLANKLINE> - 1 - - >>> @wsgify.middleware - ... def override_https(req, normal_app, secure_app): - ... if req.scheme == 'https': - ... return secure_app - ... else: - ... return normal_app - >>> app = override_https(Response('http'), secure_app=Response('https')) - >>> print Request.blank('http://x.com/').get_response(app) - 200 OK - Content-Type: text/html; charset=UTF-8 - Content-Length: 4 - <BLANKLINE> - http - -A status checking middleware:: - - >>> @wsgify.middleware - ... def catch(req, app, catchers): - ... resp = req.get_response(app) - ... return catchers.get(resp.status_int, resp) - >>> @wsgify - ... def simple(req): - ... return other_app # Just to mess around - >>> @wsgify - ... def other_app(req): - ... return Response('hey', status_int=int(req.path_info.strip('/'))) - >>> app = catch(simple, catchers={500: Response('error!'), 404: Response('nothing')}) - >>> print Request.blank('/200').get_response(app) - 200 OK - Content-Type: text/html; charset=UTF-8 - Content-Length: 3 - <BLANKLINE> - hey - >>> print Request.blank('/500').get_response(app) - 200 OK - Content-Type: text/html; charset=UTF-8 - Content-Length: 6 - <BLANKLINE> - error! - >>> print Request.blank('/404').get_response(app) - 200 OK - Content-Type: text/html; charset=UTF-8 - Content-Length: 7 - <BLANKLINE> - nothing diff --git a/tests/test_descriptors.py b/tests/test_descriptors.py new file mode 100644 index 0000000..9329068 --- /dev/null +++ b/tests/test_descriptors.py @@ -0,0 +1,797 @@ +# -*- coding: utf-8 -*- + +from datetime import tzinfo +from datetime import timedelta + +from nose.tools import eq_ +from nose.tools import ok_ +from nose.tools import assert_raises + +from webob import Request + + +class GMT(tzinfo): + """UTC""" + ZERO = timedelta(0) + def utcoffset(self, dt): + return self.ZERO + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return self.ZERO + + +class MockDescriptor: + _val = 'avalue' + def __get__(self, obj, type=None): + return self._val + def __set__(self, obj, val): + self._val = val + def __delete__(self, obj): + self._val = None + + +def test_environ_getter_docstring(): + from webob.descriptors import environ_getter + desc = environ_getter('akey') + eq_(desc.__doc__, "Gets and sets the 'akey' key in the environment.") + +def test_environ_getter_nodefault_keyerror(): + from webob.descriptors import environ_getter + req = Request.blank('/') + desc = environ_getter('akey') + assert_raises(KeyError, desc.fget, req) + +def test_environ_getter_nodefault_fget(): + from webob.descriptors import environ_getter + req = Request.blank('/') + desc = environ_getter('akey') + desc.fset(req, 'bar') + eq_(req.environ['akey'], 'bar') + +def test_environ_getter_nodefault_fdel(): + from webob.descriptors import environ_getter + desc = environ_getter('akey') + eq_(desc.fdel, None) + +def test_environ_getter_default_fget(): + from webob.descriptors import environ_getter + req = Request.blank('/') + desc = environ_getter('akey', default='the_default') + eq_(desc.fget(req), 'the_default') + +def test_environ_getter_default_fset(): + from webob.descriptors import environ_getter + req = Request.blank('/') + desc = environ_getter('akey', default='the_default') + desc.fset(req, 'bar') + eq_(req.environ['akey'], 'bar') + +def test_environ_getter_default_fset_none(): + from webob.descriptors import environ_getter + req = Request.blank('/') + desc = environ_getter('akey', default='the_default') + desc.fset(req, 'baz') + desc.fset(req, None) + ok_('akey' not in req.environ) + +def test_environ_getter_default_fdel(): + from webob.descriptors import environ_getter + req = Request.blank('/') + desc = environ_getter('akey', default='the_default') + desc.fset(req, 'baz') + assert 'akey' in req.environ + desc.fdel(req) + ok_('akey' not in req.environ) + +def test_environ_getter_rfc_section(): + from webob.descriptors import environ_getter + desc = environ_getter('akey', rfc_section='14.3') + eq_(desc.__doc__, "Gets and sets the 'akey' key in the environment. For " + "more information on akey see `section 14.3 " + "<http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.3>`_.") + +def test_upath_property_fget(): + from webob.descriptors import upath_property + req = Request.blank('/') + desc = upath_property('akey') + eq_(desc.fget(req), '') + +def test_upath_property_fset(): + from webob.descriptors import upath_property + req = Request.blank('/') + desc = upath_property('akey') + desc.fset(req, 'avalue') + eq_(desc.fget(req), 'avalue') + +def test_header_getter_doc(): + from webob.descriptors import header_getter + desc = header_getter('AHEADER', '14.3') + eq_(desc.__doc__, "Gets and sets and deletes the AHEADER header. For " + "more information on AHEADER see `section 14.3 " + "<http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.3>`_.") + +def test_header_getter_fget(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + eq_(desc.fget(resp), None) + +def test_header_getter_fset(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + desc.fset(resp, 'avalue') + eq_(desc.fget(resp), 'avalue') + +def test_header_getter_fset_none(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + desc.fset(resp, 'avalue') + desc.fset(resp, None) + eq_(desc.fget(resp), None) + +def test_header_getter_fdel(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + desc.fset(resp, 'avalue2') + desc.fdel(resp) + eq_(desc.fget(resp), None) + +def test_header_getter_unicode(): + from webob.descriptors import header_getter + desc = header_getter('AHEADER', '14.3') + eq_(desc.__doc__, "Gets and sets and deletes the AHEADER header. For " + "more information on AHEADER see `section 14.3 " + "<http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.3>`_.") + +def test_header_getter_unicode_fget_none(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + eq_(desc.fget(resp), None) + +def test_header_getter_unicode_fget(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + desc.fset(resp, u'avalue') + eq_(desc.fget(resp), u'avalue') + +def test_header_getter_unicode_fset_none(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + desc.fset(resp, None) + eq_(desc.fget(resp), None) + +def test_header_getter_unicode_fset(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + desc.fset(resp, u'avalue2') + eq_(desc.fget(resp), u'avalue2') + +def test_header_getter_unicode_fdel(): + from webob.descriptors import header_getter + from webob import Response + resp = Response('aresp') + desc = header_getter('AHEADER', '14.3') + desc.fset(resp, u'avalue3') + desc.fdel(resp) + eq_(desc.fget(resp), None) + +def test_converter_not_prop(): + from webob.descriptors import converter + from webob.descriptors import parse_int_safe + from webob.descriptors import serialize_int + assert_raises(AssertionError,converter, + ('CONTENT_LENGTH', None, '14.13'), + parse_int_safe, serialize_int, 'int') + +def test_converter_with_name_docstring(): + from webob.descriptors import converter + from webob.descriptors import environ_getter + from webob.descriptors import parse_int_safe + from webob.descriptors import serialize_int + desc = converter( + environ_getter('CONTENT_LENGTH', '666', '14.13'), + parse_int_safe, serialize_int, 'int') + eq_(desc.__doc__, "Gets and sets the 'CONTENT_LENGTH' key in the " + "environment. For more information on CONTENT_LENGTH see `section 14.13 " + "<http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.13>`_. " + "Converts it using int.") + +def test_converter_with_name_fget(): + from webob.descriptors import converter + from webob.descriptors import environ_getter + from webob.descriptors import parse_int_safe + from webob.descriptors import serialize_int + req = Request.blank('/') + desc = converter( + environ_getter('CONTENT_LENGTH', '666', '14.13'), + parse_int_safe, serialize_int, 'int') + eq_(desc.fget(req), 666) + +def test_converter_with_name_fset(): + from webob.descriptors import converter + from webob.descriptors import environ_getter + from webob.descriptors import parse_int_safe + from webob.descriptors import serialize_int + req = Request.blank('/') + desc = converter( + environ_getter('CONTENT_LENGTH', '666', '14.13'), + parse_int_safe, serialize_int, 'int') + desc.fset(req, '999') + eq_(desc.fget(req), 999) + +def test_converter_without_name_fget(): + from webob.descriptors import converter + from webob.descriptors import environ_getter + from webob.descriptors import parse_int_safe + from webob.descriptors import serialize_int + req = Request.blank('/') + desc = converter( + environ_getter('CONTENT_LENGTH', '666', '14.13'), + parse_int_safe, serialize_int) + eq_(desc.fget(req), 666) + +def test_converter_without_name_fset(): + from webob.descriptors import converter + from webob.descriptors import environ_getter + from webob.descriptors import parse_int_safe + from webob.descriptors import serialize_int + req = Request.blank('/') + desc = converter( + environ_getter('CONTENT_LENGTH', '666', '14.13'), + parse_int_safe, serialize_int) + desc.fset(req, '999') + eq_(desc.fget(req), 999) + +def test_converter_none_for_wrong_type(): + from webob.descriptors import converter + from webob.descriptors import environ_getter + from webob.descriptors import parse_int_safe + from webob.descriptors import serialize_int + req = Request.blank('/') + desc = converter( + ## XXX: Should this fail if the type is wrong? + environ_getter('CONTENT_LENGTH', 'sixsixsix', '14.13'), + parse_int_safe, serialize_int, 'int') + eq_(desc.fget(req), None) + +def test_converter_delete(): + from webob.descriptors import converter + from webob.descriptors import environ_getter + from webob.descriptors import parse_int_safe + from webob.descriptors import serialize_int + req = Request.blank('/') + desc = converter( + ## XXX: Should this fail if the type is wrong? + environ_getter('CONTENT_LENGTH', '666', '14.13'), + parse_int_safe, serialize_int, 'int') + assert_raises(KeyError, desc.fdel, req) + +def test_list_header(): + from webob.descriptors import list_header + desc = list_header('CONTENT_LENGTH', '14.13') + eq_(type(desc), property) + +def test_parse_list_single(): + from webob.descriptors import parse_list + result = parse_list('avalue') + eq_(result, ('avalue',)) + +def test_parse_list_multiple(): + from webob.descriptors import parse_list + result = parse_list('avalue,avalue2') + eq_(result, ('avalue', 'avalue2')) + +def test_parse_list_none(): + from webob.descriptors import parse_list + result = parse_list(None) + eq_(result, None) + +def test_parse_list_unicode_single(): + from webob.descriptors import parse_list + result = parse_list(u'avalue') + eq_(result, ('avalue',)) + +def test_parse_list_unicode_multiple(): + from webob.descriptors import parse_list + result = parse_list(u'avalue,avalue2') + eq_(result, ('avalue', 'avalue2')) + +def test_serialize_list(): + from webob.descriptors import serialize_list + result = serialize_list(('avalue', 'avalue2')) + eq_(result, 'avalue, avalue2') + +def test_serialize_list_string(): + from webob.descriptors import serialize_list + result = serialize_list('avalue') + eq_(result, 'avalue') + +def test_serialize_list_unicode(): + from webob.descriptors import serialize_list + result = serialize_list(u'avalue') + eq_(result, u'avalue') + +def test_converter_date(): + import datetime + from webob.descriptors import converter_date + from webob.descriptors import environ_getter + req = Request.blank('/') + UTC = GMT() + desc = converter_date(environ_getter( + "HTTP_DATE", "Tue, 15 Nov 1994 08:12:31 GMT", "14.8")) + eq_(desc.fget(req), + datetime.datetime(1994, 11, 15, 8, 12, 31, tzinfo=UTC)) + +def test_converter_date_docstring(): + from webob.descriptors import converter_date + from webob.descriptors import environ_getter + desc = converter_date(environ_getter( + "HTTP_DATE", "Tue, 15 Nov 1994 08:12:31 GMT", "14.8")) + eq_(desc.__doc__, "Gets and sets the 'HTTP_DATE' key in the environment. " + "For more information on Date see `section 14.8 " + "<http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.8>`_. " + "Converts it using HTTP date.") + +def test_date_header_fget_none(): + from webob import Response + from webob.descriptors import date_header + resp = Response('aresponse') + desc = date_header('HTTP_DATE', "14.8") + eq_(desc.fget(resp), None) + +def test_date_header_fset_fget(): + import datetime + from webob import Response + from webob.descriptors import date_header + resp = Response('aresponse') + UTC = GMT() + desc = date_header('HTTP_DATE', "14.8") + desc.fset(resp, "Tue, 15 Nov 1994 08:12:31 GMT") + eq_(desc.fget(resp), datetime.datetime(1994, 11, 15, 8, 12, 31, tzinfo=UTC)) + +def test_date_header_fdel(): + from webob import Response + from webob.descriptors import date_header + resp = Response('aresponse') + desc = date_header('HTTP_DATE', "14.8") + desc.fset(resp, "Tue, 15 Nov 1994 08:12:31 GMT") + desc.fdel(resp) + eq_(desc.fget(resp), None) + +def test_deprecated_property_ctor_prop(): + from webob.descriptors import deprecated_property + prop = property() + dep = deprecated_property(prop, + 'deprecated_property', + "Don't use it", + warning=False) + eq_(dep.descriptor, prop) + +def test_deprecated_property_ctor_attr(): + from webob.descriptors import deprecated_property + prop = property() + dep = deprecated_property(prop, + 'deprecated_property', + "Don't use it", + warning=False) + eq_(dep.attr, 'deprecated_property') + +def test_deprecated_property_ctor_message(): + from webob.descriptors import deprecated_property + prop = property() + dep = deprecated_property(prop, + 'deprecated_property', + "Don't use it", + warning=False) + eq_(dep.message, "Don't use it") + +def test_deprecated_property_ctor_raises(): + from webob.descriptors import deprecated_property + prop = property() + dep = deprecated_property(prop, + 'deprecated_property', + "Don't use it", + warning=False) + assert_raises(DeprecationWarning, dep.warn) + +def test_deprecated_property_get(): + from webob.descriptors import deprecated_property + dep = deprecated_property(deprecated_property, + 'deprecated_property', + 'DEPRECATED', + warning=False) + assert_raises(DeprecationWarning, dep.__get__, dep) + +def test_deprecated_property_get_none(): + from webob.descriptors import deprecated_property + dep = deprecated_property(None, + 'deprecated_property', + 'DEPRECATED', + warning=False) + eq_(dep.__get__(None), dep) + +def test_deprecated_property_set(): + from webob.descriptors import deprecated_property + dep = deprecated_property(deprecated_property, + 'deprecated_property', + 'DEPRECATED', + warning=False) + assert_raises(DeprecationWarning, dep.__set__, dep, 'avalue') + +def test_deprecated_property_delete(): + from webob.descriptors import deprecated_property + dep = deprecated_property(deprecated_property, + 'deprecated_property', + 'DEPRECATED', + warning=False) + assert_raises(DeprecationWarning, dep.__delete__, dep) + +def test_deprecated_property_repr(): + import warnings + from webob.descriptors import deprecated_property + mock = MockDescriptor() + try: + warnings.simplefilter('ignore') + dep = deprecated_property(mock, + 'mock_property', + 'DEPRECATED') + assert dep.__repr__().startswith( + "<Deprecated attribute mock_property: " + "<tests.test_descriptors.MockDescriptor instance at") + finally: + warnings.resetwarnings() + +def test_deprecated_property_warn_get(): + import warnings + from webob.descriptors import deprecated_property + mock = MockDescriptor() + dep = deprecated_property(mock, + 'mock_property', + 'DEPRECATED') + try: + warnings.simplefilter('error') + assert_raises(DeprecationWarning, dep.__get__, mock) + finally: + warnings.resetwarnings() + +def test_deprecated_property_warn_set(): + import warnings + from webob.descriptors import deprecated_property + mock = MockDescriptor() + dep = deprecated_property(mock, + 'mock_property', + 'DEPRECATED') + try: + warnings.simplefilter('error') + assert_raises(DeprecationWarning, dep.__set__, mock, 'avalue') + finally: + warnings.resetwarnings() + +def test_deprecated_property_warn_delete(): + import warnings + from webob.descriptors import deprecated_property + mock = MockDescriptor() + dep = deprecated_property(mock, + 'mock_property', + 'DEPRECATED') + try: + warnings.simplefilter('error') + assert_raises(DeprecationWarning, dep.__delete__, mock) + finally: + warnings.resetwarnings() + +def test_deprecated_property_warn_get_call(): + import warnings + from webob.descriptors import deprecated_property + mock = MockDescriptor() + dep = deprecated_property(mock, + 'mock_property', + 'DEPRECATED') + try: + warnings.simplefilter('ignore') + eq_(dep.__get__(mock), 'avalue') + finally: + warnings.resetwarnings() + +def test_deprecated_property_warn_set_call(): + import warnings + from webob.descriptors import deprecated_property + mock = MockDescriptor() + dep = deprecated_property(mock, + 'mock_property', + 'DEPRECATED') + try: + warnings.simplefilter('ignore') + dep.__set__(mock, 'avalue2') + eq_(dep.__get__(mock), 'avalue2') + finally: + warnings.resetwarnings() + +def test_deprecated_property_warn_delete_call(): + import warnings + from webob.descriptors import deprecated_property + mock = MockDescriptor() + dep = deprecated_property(mock, + 'mock_property', + 'DEPRECATED') + try: + warnings.simplefilter('ignore') + dep.__delete__(mock) + eq_(dep.__get__(mock), None) + finally: + warnings.resetwarnings() + +def test_parse_etag_response(): + from webob.descriptors import parse_etag_response + etresp = parse_etag_response("etag") + eq_(etresp, "etag") + +def test_parse_etag_response_quoted(): + from webob.descriptors import parse_etag_response + etresp = parse_etag_response('"etag"') + eq_(etresp, "etag") + +def test_parse_etag_response_is_none(): + from webob.descriptors import parse_etag_response + etresp = parse_etag_response(None) + eq_(etresp, None) + +def test_serialize_etag_response(): + from webob.descriptors import serialize_etag_response + etresp = serialize_etag_response("etag") + eq_(etresp, '"etag"') + +def test_parse_if_range_is_None(): + from webob.descriptors import parse_if_range + from webob.descriptors import NoIfRange + eq_(NoIfRange, parse_if_range(None)) + +def test_parse_if_range_date_ifr(): + from webob.descriptors import parse_if_range + from webob.descriptors import IfRange + ifr = parse_if_range("2011-03-15 01:24:43.272409") + eq_(type(ifr), IfRange) + +def test_parse_if_range_date_etagmatcher(): + from webob.descriptors import parse_if_range + from webob.etag import ETagMatcher + ifr = parse_if_range("2011-03-15 01:24:43.272409") + eq_(type(ifr.etag), ETagMatcher) + +def test_serialize_if_range_string(): + from webob.descriptors import serialize_if_range + val = serialize_if_range("avalue") + eq_(val, "avalue") + +def test_serialize_if_range_unicode(): + from webob.descriptors import serialize_if_range + val = serialize_if_range(u"avalue") + eq_(val, u"avalue") + +def test_serialize_if_range_datetime(): + import datetime + from webob.descriptors import serialize_if_range + UTC = GMT() + val = serialize_if_range(datetime.datetime(1994, 11, 15, 8, 12, 31, tzinfo=UTC)) + eq_(val, "Tue, 15 Nov 1994 08:12:31 GMT") + +def test_serialize_if_range_other(): + from webob.descriptors import serialize_if_range + val = serialize_if_range(123456) + eq_(val, '123456') + +def test_parse_range_none(): + from webob.descriptors import parse_range + val = parse_range(None) + eq_(val, None) + +def test_parse_range_type(): + from webob.byterange import Range + from webob.descriptors import parse_range + val = parse_range("bytes=1-500") + eq_(type(val), type(Range.parse("bytes=1-500"))) + +def test_parse_range_values(): + from webob.byterange import Range + from webob.descriptors import parse_range + val = parse_range("bytes=1-500") + eq_(val.ranges, Range.parse("bytes=1-500").ranges) + +def test_serialize_range_none(): + from webob.descriptors import serialize_range + val = serialize_range(None) + eq_(val, None) + +def test_serialize_range(): + from webob.descriptors import serialize_range + val = serialize_range((1,500)) + eq_(val, 'bytes=1-499') + +def test_serialize_invalid_len(): + from webob.descriptors import serialize_range + assert_raises(ValueError, serialize_range, (1,)) + +def test_parse_int_none(): + from webob.descriptors import parse_int + val = parse_int(None) + eq_(val, None) + +def test_parse_int_emptystr(): + from webob.descriptors import parse_int + val = parse_int('') + eq_(val, None) + +def test_parse_int(): + from webob.descriptors import parse_int + val = parse_int('123') + eq_(val, 123) + +def test_parse_int_invalid(): + from webob.descriptors import parse_int + assert_raises(ValueError, parse_int, 'abc') + +def test_parse_int_safe_none(): + from webob.descriptors import parse_int_safe + eq_(parse_int_safe(None), None) + +def test_parse_int_safe_emptystr(): + from webob.descriptors import parse_int_safe + eq_(parse_int_safe(''), None) + +def test_parse_int_safe(): + from webob.descriptors import parse_int_safe + eq_(parse_int_safe('123'), 123) + +def test_parse_int_safe_invalid(): + from webob.descriptors import parse_int_safe + eq_(parse_int_safe('abc'), None) + +def test_serialize_int(): + from webob.descriptors import serialize_int + assert serialize_int is str + +def test_parse_content_range_none(): + from webob.descriptors import parse_content_range + eq_(parse_content_range(None), None) + +def test_parse_content_range_emptystr(): + from webob.descriptors import parse_content_range + eq_(parse_content_range(' '), None) + +def test_parse_content_range_length(): + from webob.byterange import ContentRange + from webob.descriptors import parse_content_range + val = parse_content_range("bytes 0-499/1234") + eq_(val.length, ContentRange.parse("bytes 0-499/1234").length) + +def test_parse_content_range_start(): + from webob.byterange import ContentRange + from webob.descriptors import parse_content_range + val = parse_content_range("bytes 0-499/1234") + eq_(val.start, ContentRange.parse("bytes 0-499/1234").start) + +def test_parse_content_range_stop(): + from webob.byterange import ContentRange + from webob.descriptors import parse_content_range + val = parse_content_range("bytes 0-499/1234") + eq_(val.stop, ContentRange.parse("bytes 0-499/1234").stop) + +def test_serialize_content_range_none(): + from webob.descriptors import serialize_content_range + eq_(serialize_content_range(None), 'None') ### XXX: Seems wrong + +def test_serialize_content_range_emptystr(): + from webob.descriptors import serialize_content_range + eq_(serialize_content_range(''), None) + +def test_serialize_content_range_invalid(): + from webob.descriptors import serialize_content_range + assert_raises(ValueError, serialize_content_range, (1,)) + +def test_serialize_content_range_asterisk(): + from webob.descriptors import serialize_content_range + eq_(serialize_content_range((0, 500)), 'bytes 0-499/*') + +def test_serialize_content_range_defined(): + from webob.descriptors import serialize_content_range + eq_(serialize_content_range((0, 500, 1234)), 'bytes 0-499/1234') + +def test_parse_auth_params_leading_capital_letter(): + from webob.descriptors import parse_auth_params + val = parse_auth_params('Basic Realm=WebOb') + eq_(val, {'ealm': 'WebOb'}) + +def test_parse_auth_params_trailing_capital_letter(): + from webob.descriptors import parse_auth_params + val = parse_auth_params('Basic realM=WebOb') + eq_(val, {}) + +def test_parse_auth_params_doublequotes(): + from webob.descriptors import parse_auth_params + val = parse_auth_params('Basic realm="Web Object"') + eq_(val, {'realm': 'Web Object'}) + +def test_parse_auth_params_multiple_values(): + from webob.descriptors import parse_auth_params + val = parse_auth_params("foo='blah &&234', qop=foo, nonce='qwerty1234'") + eq_(val, {'nonce': "'qwerty1234'", 'foo': "'blah &&234'", 'qop': 'foo'}) + +def test_parse_auth_params_truncate_on_comma(): + from webob.descriptors import parse_auth_params + val = parse_auth_params("Basic realm=WebOb,this_will_truncate") + eq_(val, {'realm': 'WebOb'}) + +def test_parse_auth_params_emptystr(): + from webob.descriptors import parse_auth_params + eq_(parse_auth_params(''), {}) + +def test_parse_auth_none(): + from webob.descriptors import parse_auth + eq_(parse_auth(None), None) + +def test_parse_auth_emptystr(): + from webob.descriptors import parse_auth + assert_raises(ValueError, parse_auth, '') + +def test_parse_auth_basic(): + from webob.descriptors import parse_auth + eq_(parse_auth("Basic realm=WebOb"), ('Basic', 'realm=WebOb')) + +def test_parse_auth_basic_quoted(): + from webob.descriptors import parse_auth + eq_(parse_auth('Basic realm="Web Ob"'), ('Basic', {'realm': 'Web Ob'})) + +def test_parse_auth_basic_quoted_multiple_unknown(): + from webob.descriptors import parse_auth + eq_(parse_auth("foo='blah &&234', qop=foo, nonce='qwerty1234'"), + ("foo='blah", "&&234', qop=foo, nonce='qwerty1234'")) + +def test_parse_auth_basic_quoted_known_multiple(): + from webob.descriptors import parse_auth + eq_(parse_auth("Basic realm='blah &&234', qop=foo, nonce='qwerty1234'"), + ('Basic', "realm='blah &&234', qop=foo, nonce='qwerty1234'")) + +def test_serialize_auth_none(): + from webob.descriptors import serialize_auth + eq_(serialize_auth(None), None) + +def test_serialize_auth_emptystr(): + from webob.descriptors import serialize_auth + eq_(serialize_auth(''), '') + +def test_serialize_auth_basic_quoted(): + from webob.descriptors import serialize_auth + val = serialize_auth(('Basic', 'realm="WebOb"')) + eq_(val, 'Basic realm="WebOb"') + +def test_serialize_auth_digest_multiple(): + from webob.descriptors import serialize_auth + val = serialize_auth(('Digest', 'realm="WebOb", nonce=abcde12345, qop=foo')) + flags = val[len('Digest'):] + result = sorted([ x.strip() for x in flags.split(',') ]) + eq_(result, ['nonce=abcde12345', 'qop=foo', 'realm="WebOb"']) + +def test_serialize_auth_digest_tuple(): + from webob.descriptors import serialize_auth + val = serialize_auth(('Digest', {'realm':'"WebOb"', 'nonce':'abcde12345', 'qop':'foo'})) + flags = val[len('Digest'):] + result = sorted([ x.strip() for x in flags.split(',') ]) + eq_(result, ['nonce="abcde12345"', 'qop="foo"', 'realm=""WebOb""']) diff --git a/tests/test_etag.py b/tests/test_etag.py new file mode 100644 index 0000000..ab182e4 --- /dev/null +++ b/tests/test_etag.py @@ -0,0 +1,451 @@ +import unittest + +class etag_propertyTests(unittest.TestCase): + def _getTargetClass(self): + from webob.etag import etag_property + return etag_property + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def _makeDummyRequest(self, **kw): + """ + Return a DummyRequest object with attrs from kwargs. + Use like: dr = _makeDummyRequest(environment={'userid': 'johngalt'}) + Then you can: uid = dr.environment.get('userid', 'SomeDefault') + """ + class Dummy(object): + def __init__(self, **kwargs): + self.__dict__.update(**kwargs) + d = Dummy(**kw) + return d + + def test_fget_missing_key(self): + ep = self._makeOne("KEY", "DEFAULT", "RFC_SECTION") + req = self._makeDummyRequest(environ={}) + self.assertEquals(ep.fget(req), "DEFAULT") + + def test_fget_found_key(self): + ep = self._makeOne("KEY", "DEFAULT", "RFC_SECTION") + req = self._makeDummyRequest(environ={'KEY':'VALUE'}) + res = ep.fget(req) + self.assertEquals(res.etags, ['VALUE']) + self.assertEquals(res.weak_etags, []) + + def test_fget_star_key(self): + ep = self._makeOne("KEY", "DEFAULT", "RFC_SECTION") + req = self._makeDummyRequest(environ={'KEY':'*'}) + res = ep.fget(req) + import webob.etag + self.assertEquals(type(res), webob.etag._AnyETag) + self.assertEquals(res.__dict__, {}) + + def test_fset_None(self): + ep = self._makeOne("KEY", "DEFAULT", "RFC_SECTION") + req = self._makeDummyRequest(environ={'KEY':'*'}) + res = ep.fset(req, None) + self.assertEquals(res, None) + + def test_fset_not_None(self): + ep = self._makeOne("KEY", "DEFAULT", "RFC_SECTION") + req = self._makeDummyRequest(environ={'KEY':'OLDVAL'}) + res = ep.fset(req, "NEWVAL") + self.assertEquals(res, None) + self.assertEquals(req.environ['KEY'], 'NEWVAL') + + def test_fedl(self): + ep = self._makeOne("KEY", "DEFAULT", "RFC_SECTION") + req = self._makeDummyRequest(environ={'KEY':'VAL', 'QUAY':'VALYOU'}) + res = ep.fdel(req) + self.assertEquals(res, None) + self.assertFalse('KEY' in req.environ) + self.assertEquals(req.environ['QUAY'], 'VALYOU') + +class AnyETagTests(unittest.TestCase): + def _getTargetClass(self): + from webob.etag import _AnyETag + return _AnyETag + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test___repr__(self): + etag = self._makeOne() + self.assertEqual(etag.__repr__(), '<ETag *>') + + def test___nonzero__(self): + etag = self._makeOne() + self.assertEqual(etag.__nonzero__(), False) + + def test___contains__None(self): + etag = self._makeOne() + self.assertEqual(etag.__contains__(None), True) + + def test___contains__empty_list(self): + etag = self._makeOne() + self.assertEqual(etag.__contains__([]), True) + + def test___contains__empty_string(self): + etag = self._makeOne() + self.assertEqual(etag.__contains__(''), True) + + def test___contains__something(self): + etag = self._makeOne() + self.assertEqual(etag.__contains__('something'), True) + + def test_weak_match_None(self): + etag = self._makeOne() + self.assertEqual(etag.weak_match(None), True) + + def test_weak_match_empty_list(self): + etag = self._makeOne() + self.assertEqual(etag.weak_match([]), True) + + def test_weak_match_empty_string(self): + etag = self._makeOne() + self.assertEqual(etag.weak_match(''), True) + + def test_weak_match_something(self): + etag = self._makeOne() + self.assertEqual(etag.weak_match('something'), True) + + def test___str__(self): + etag = self._makeOne() + self.assertEqual(etag.__str__(), '*') + +class NoETagTests(unittest.TestCase): + def _getTargetClass(self): + from webob.etag import _NoETag + return _NoETag + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test___repr__(self): + etag = self._makeOne() + self.assertEqual(etag.__repr__(), '<No ETag>') + + def test___nonzero__(self): + etag = self._makeOne() + self.assertEqual(etag.__nonzero__(), False) + + def test___contains__None(self): + etag = self._makeOne() + self.assertEqual(etag.__contains__(None), False) + + def test___contains__empty_list(self): + etag = self._makeOne() + self.assertEqual(etag.__contains__([]), False) + + def test___contains__empty_string(self): + etag = self._makeOne() + self.assertEqual(etag.__contains__(''), False) + + def test___contains__something(self): + etag = self._makeOne() + self.assertEqual(etag.__contains__('something'), False) + + def test_weak_match_None(self): + etag = self._makeOne() + self.assertEqual(etag.weak_match(None), False) + + def test_weak_match_empty_list(self): + etag = self._makeOne() + self.assertEqual(etag.weak_match([]), False) + + def test_weak_match_empty_string(self): + etag = self._makeOne() + self.assertEqual(etag.weak_match(''), False) + + def test_weak_match_something(self): + etag = self._makeOne() + self.assertEqual(etag.weak_match('something'), False) + + def test___str__(self): + etag = self._makeOne() + self.assertEqual(etag.__str__(), '') + +class ETagMatcherTests(unittest.TestCase): + def _getTargetClass(self): + from webob.etag import ETagMatcher + return ETagMatcher + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test___init__wo_weak_etags(self): + matcher = self._makeOne(("ETAGS",)) + self.assertEqual(matcher.etags, ("ETAGS",)) + self.assertEqual(matcher.weak_etags, ()) + + def test___init__w_weak_etags(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertEqual(matcher.etags, ("ETAGS",)) + self.assertEqual(matcher.weak_etags, ("WEAK",)) + + def test___contains__tags(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertTrue("ETAGS" in matcher) + + def test___contains__weak_tags(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertTrue("WEAK" in matcher) + + def test___contains__not(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertFalse("BEER" in matcher) + + def test___contains__None(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertFalse(None in matcher) + + def test_weak_match_etags(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertTrue(matcher.weak_match("W/ETAGS")) + + def test_weak_match_weak_etags(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertTrue(matcher.weak_match("W/WEAK")) + + def test_weak_match_weak_not(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertFalse(matcher.weak_match("W/BEER")) + + def test_weak_match_weak_wo_wslash(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertTrue(matcher.weak_match("ETAGS")) + + def test_weak_match_weak_wo_wslash_not(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertFalse(matcher.weak_match("BEER")) + + def test___repr__one(self): + matcher = self._makeOne(("ETAGS",), ("WEAK",)) + self.assertEqual(matcher.__repr__(), '<ETag ETAGS>') + + def test___repr__multi(self): + matcher = self._makeOne(("ETAG1","ETAG2"), ("WEAK",)) + self.assertEqual(matcher.__repr__(), '<ETag ETAG1 or ETAG2>') + + def test_parse_None(self): + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse(None) + self.assertEqual(et.etags, []) + self.assertEqual(et.weak_etags, []) + + def test_parse_anyetag(self): + # these tests smell bad, are they useful? + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse('*') + self.assertEqual(et.__dict__, {}) + self.assertEqual(et.__repr__(), '<ETag *>') + + def test_parse_one(self): + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse('ONE') + self.assertEqual(et.etags, ['ONE']) + self.assertEqual(et.weak_etags, []) + + # .parse doesn't use the contructor values (etags [ ,weak_etags]) + + def test_parse_commasep(self): + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse('ONE, TWO') + self.assertEqual(et.etags, ['ONE', 'TWO']) + self.assertEqual(et.weak_etags, []) + + def test_parse_commasep_w_weak(self): + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse('ONE, w/TWO') + self.assertEqual(et.etags, ['ONE']) + self.assertEqual(et.weak_etags, ['TWO']) + + def test_parse_quoted(self): + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse('"ONE"') + self.assertEqual(et.etags, ['ONE']) + self.assertEqual(et.weak_etags, []) + + def test_parse_quoted_two(self): + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse('"ONE", "TWO"') + self.assertEqual(et.etags, ['ONE', 'TWO']) + self.assertEqual(et.weak_etags, []) + + def test_parse_quoted_two_weak(self): + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse('"ONE", W/"TWO"') + self.assertEqual(et.etags, ['ONE']) + self.assertEqual(et.weak_etags, ['TWO']) + + def test_parse_wo_close_quote(self): + # Unsure if this is testing likely input + matcher = self._makeOne(("ETAG",), ("WEAK",)) + et = matcher.parse('"ONE') + self.assertEqual(et.etags, ['ONE']) + self.assertEqual(et.weak_etags, []) + + def test___str__etag(self): + matcher = self._makeOne(("ETAG",)) + self.assertEqual(matcher.__str__(), 'ETAG') + + def test___str__etag_w_weak(self): + matcher = self._makeOne(("ETAG",), ("WEAK",)) + self.assertEqual(matcher.__str__(), 'ETAG, W/WEAK') + +class IfRangeTests(unittest.TestCase): + def _getTargetClass(self): + from webob.etag import IfRange + return IfRange + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test___init__(self): + ir = self._makeOne() + self.assertEqual(ir.etag, None) + self.assertEqual(ir.date, None) + + def test___init__etag(self): + ir = self._makeOne(etag='ETAG') + self.assertEqual(ir.etag, 'ETAG') + self.assertEqual(ir.date, None) + + def test___init__date(self): + ir = self._makeOne(date='DATE') + self.assertEqual(ir.etag, None) + self.assertEqual(ir.date, 'DATE') + + def test___init__etag_date(self): + ir = self._makeOne(etag='ETAG', date='DATE') + self.assertEqual(ir.etag, 'ETAG') + self.assertEqual(ir.date, 'DATE') + + def test___repr__(self): + ir = self._makeOne() + self.assertEqual(ir.__repr__(), '<IfRange etag=*, date=*>') + + def test___repr__etag(self): + ir = self._makeOne(etag='ETAG') + self.assertEqual(ir.__repr__(), '<IfRange etag=ETAG, date=*>') + + def test___repr__date(self): + ir = self._makeOne(date='Fri, 09 Nov 2001 01:08:47 -0000') + self.assertEqual(ir.__repr__(), + '<IfRange etag=*, ' + + 'date=Fri, 09 Nov 2001 01:08:47 -0000>') + + def test___repr__etag_date(self): + ir = self._makeOne(etag='ETAG', date='Fri, 09 Nov 2001 01:08:47 -0000') + self.assertEqual(ir.__repr__(), + '<IfRange etag=ETAG, ' + + 'date=Fri, 09 Nov 2001 01:08:47 -0000>') + + def test___str__(self): + ir = self._makeOne() + self.assertEqual(ir.__str__(), '') + + def test___str__etag(self): + ir = self._makeOne(etag='ETAG', date='Fri, 09 Nov 2001 01:08:47 -0000') + self.assertEqual(ir.__str__(), 'ETAG') + + def test___str__date(self): + ir = self._makeOne(date='Fri, 09 Nov 2001 01:08:47 -0000') + self.assertEqual(ir.__str__(), 'Fri, 09 Nov 2001 01:08:47 -0000') + + def test_match(self): + ir = self._makeOne() + self.assertTrue(ir.match()) + + def test_match_date_none(self): + ir = self._makeOne(date='Fri, 09 Nov 2001 01:08:47 -0000') + self.assertFalse(ir.match()) + + def test_match_date_earlier(self): + ir = self._makeOne(date='Fri, 09 Nov 2001 01:08:47 -0000') + self.assertTrue(ir.match(last_modified= + 'Fri, 09 Nov 2001 01:00:00 -0000')) + + def test_match_etag_none(self): + ir = self._makeOne(etag="ETAG") + self.assertFalse(ir.match()) + + def test_match_etag_different(self): + ir = self._makeOne(etag="ETAG") + self.assertFalse(ir.match("DIFFERENT")) + + def test_match_response_no_date(self): + class DummyResponse(object): + etag = "ETAG" + last_modified = None + ir = self._makeOne(etag="ETAG", date='Fri, 09 Nov 2001 01:08:47 -0000') + response = DummyResponse() + self.assertFalse(ir.match_response(response)) + + def test_match_response_w_date_earlier(self): + class DummyResponse(object): + etag = "ETAG" + last_modified = 'Fri, 09 Nov 2001 01:00:00 -0000' + ir = self._makeOne(etag="ETAG", date='Fri, 09 Nov 2001 01:08:47 -0000') + response = DummyResponse() + self.assertTrue(ir.match_response(response)) + + def test_match_response_etag(self): + class DummyResponse(object): + etag = "ETAG" + last_modified = None + ir = self._makeOne(etag="ETAG") + response = DummyResponse() + self.assertTrue(ir.match_response(response)) + + def test_parse_none(self): + ir = self._makeOne(etag="ETAG") + # I believe this identifies a bug: '_NoETag' object is not callable + self.assertRaises(TypeError, ir.parse, None) + + def test_parse_wo_gmt(self): + ir = self._makeOne(etag="ETAG") + res = ir.parse('INTERPRETED_AS_ETAG') + self.assertEquals(res.etag.etags, ['INTERPRETED_AS_ETAG']) + self.assertEquals(res.date, None) + + def test_parse_with_gmt(self): + import datetime + class UTC(datetime.tzinfo): + def utcoffset(self, dt): + return datetime.timedelta(0) + def tzname(self, dt): + return 'UTC' + ir = self._makeOne(etag="ETAG") + res = ir.parse('Fri, 09 Nov 2001 01:08:47 GMT') + self.assertEquals(res.etag, None) + dt = datetime.datetime(2001,11,9, 1,8,47,0, UTC()) + self.assertEquals(res.date, dt) + +class NoIfRangeTests(unittest.TestCase): + def _getTargetClass(self): + from webob.etag import _NoIfRange + return _NoIfRange + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test___repr__(self): + ir = self._makeOne() + self.assertEquals(ir.__repr__(), '<Empty If-Range>') + + def test___str__(self): + ir = self._makeOne() + self.assertEquals(ir.__str__(), '') + + def test___nonzero__(self): + ir = self._makeOne() + self.assertEquals(ir.__nonzero__(), False) + + def test_match(self): + ir = self._makeOne() + self.assertEquals(ir.match(), True) + + def test_match_response(self): + ir = self._makeOne() + self.assertEquals(ir.match_response("IGNORED"), True) diff --git a/tests/test_exc.py b/tests/test_exc.py index 560cfae..7233168 100644 --- a/tests/test_exc.py +++ b/tests/test_exc.py @@ -1,7 +1,62 @@ -import sys -from webob import * +from webob.request import Request from webob.dec import wsgify -from webob.exc import * +from webob.exc import sys +from webob.exc import no_escape +from webob.exc import strip_tags +from webob.exc import HTTPException +from webob.exc import WSGIHTTPException +from webob.exc import HTTPError +from webob.exc import HTTPRedirection +from webob.exc import HTTPRedirection +from webob.exc import HTTPOk +from webob.exc import HTTPCreated +from webob.exc import HTTPAccepted +from webob.exc import HTTPNonAuthoritativeInformation +from webob.exc import HTTPNoContent +from webob.exc import HTTPResetContent +from webob.exc import HTTPPartialContent +from webob.exc import _HTTPMove +from webob.exc import HTTPMultipleChoices +from webob.exc import HTTPMovedPermanently +from webob.exc import HTTPFound +from webob.exc import HTTPSeeOther +from webob.exc import HTTPNotModified +from webob.exc import HTTPUseProxy +from webob.exc import HTTPTemporaryRedirect +from webob.exc import HTTPClientError +from webob.exc import HTTPBadRequest +from webob.exc import HTTPUnauthorized +from webob.exc import HTTPPaymentRequired +from webob.exc import HTTPForbidden +from webob.exc import HTTPNotFound +from webob.exc import HTTPMethodNotAllowed +from webob.exc import HTTPNotAcceptable +from webob.exc import HTTPProxyAuthenticationRequired +from webob.exc import HTTPRequestTimeout +from webob.exc import HTTPConflict +from webob.exc import HTTPGone +from webob.exc import HTTPLengthRequired +from webob.exc import HTTPPreconditionFailed +from webob.exc import HTTPRequestEntityTooLarge +from webob.exc import HTTPRequestURITooLong +from webob.exc import HTTPUnsupportedMediaType +from webob.exc import HTTPRequestRangeNotSatisfiable +from webob.exc import HTTPExpectationFailed +from webob.exc import HTTPUnprocessableEntity +from webob.exc import HTTPLocked +from webob.exc import HTTPFailedDependency +from webob.exc import HTTPServerError +from webob.exc import HTTPInternalServerError +from webob.exc import HTTPNotImplemented +from webob.exc import HTTPBadGateway +from webob.exc import HTTPServiceUnavailable +from webob.exc import HTTPGatewayTimeout +from webob.exc import HTTPVersionNotSupported +from webob.exc import HTTPInsufficientStorage +from webob.exc import HTTPExceptionMiddleware +from webob import exc + +from nose.tools import eq_, ok_, assert_equal, assert_raises @wsgify def method_not_allowed_app(req): @@ -12,6 +67,52 @@ def method_not_allowed_app(req): raise HTTPMethodNotAllowed().exception return 'hello!' +def test_noescape_null(): + assert_equal(no_escape(None), '') + +def test_noescape_not_basestring(): + assert_equal(no_escape(42), '42') + +def test_noescape_unicode(): + class DummyUnicodeObject(object): + def __unicode__(self): + return u'42' + duo = DummyUnicodeObject() + assert_equal(no_escape(duo), u'42') + +def test_strip_tags_empty(): + assert_equal(strip_tags(''), '') + +def test_strip_tags_newline_to_space(): + assert_equal(strip_tags('a\nb'), 'a b') + +def test_strip_tags_zaps_carriage_return(): + assert_equal(strip_tags('a\rb'), 'ab') + +def test_strip_tags_br_to_newline(): + assert_equal(strip_tags('a<br/>b'), 'a\nb') + +def test_strip_tags_zaps_comments(): + assert_equal(strip_tags('a<!--b-->'), 'ab') + +def test_strip_tags_zaps_tags(): + assert_equal(strip_tags('foo<bar>baz</bar>'), 'foobaz') + +def test_HTTPException(): + _called = [] + _result = object() + def _response(environ, start_response): + _called.append((environ, start_response)) + return _result + environ = {} + start_response = object() + exc = HTTPException('testing', _response) + ok_(exc.wsgi_response is _response) + ok_(exc.exception is exc) + result = exc(environ, start_response) + ok_(result is result) + assert_equal(_called, [(environ, start_response)]) + def test_exception_with_unicode_data(): req = Request.blank('/', method=u'POST') res = req.get_response(method_not_allowed_app) @@ -22,3 +123,238 @@ def test_WSGIHTTPException_headers(): ('Set-Cookie', 'a=2')]) mixed = exc.headers.mixed() assert mixed['set-cookie'] == ['a=1', 'a=2'] + +def test_WSGIHTTPException_w_body_template(): + from string import Template + TEMPLATE = '$foo: $bar' + exc = WSGIHTTPException(body_template = TEMPLATE) + assert_equal(exc.body_template, TEMPLATE) + ok_(isinstance(exc.body_template_obj, Template)) + eq_(exc.body_template_obj.substitute({'foo': 'FOO', 'bar': 'BAR'}), + 'FOO: BAR') + +def test_WSGIHTTPException_w_empty_body(): + class EmptyOnly(WSGIHTTPException): + empty_body = True + exc = EmptyOnly(content_type='text/plain', content_length=234) + ok_('content_type' not in exc.__dict__) + ok_('content_length' not in exc.__dict__) + +def test_WSGIHTTPException___str__(): + exc1 = WSGIHTTPException(detail='Detail') + eq_(str(exc1), 'Detail') + class Explain(WSGIHTTPException): + explanation = 'Explanation' + eq_(str(Explain()), 'Explanation') + +def test_WSGIHTTPException_plain_body_no_comment(): + class Explain(WSGIHTTPException): + code = '999' + title = 'Testing' + explanation = 'Explanation' + exc = Explain(detail='Detail') + eq_(exc.plain_body({}), + '999 Testing\n\nExplanation\n\n Detail ') + +def test_WSGIHTTPException_html_body_w_comment(): + class Explain(WSGIHTTPException): + code = '999' + title = 'Testing' + explanation = 'Explanation' + exc = Explain(detail='Detail', comment='Comment') + eq_(exc.html_body({}), + '<html>\n' + ' <head>\n' + ' <title>999 Testing</title>\n' + ' </head>\n' + ' <body>\n' + ' <h1>999 Testing</h1>\n' + ' Explanation<br /><br />\n' + 'Detail\n' + '<!-- Comment -->\n\n' + ' </body>\n' + '</html>' + ) + +def test_WSGIHTTPException_generate_response(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'PUT', + 'HTTP_ACCEPT': 'text/html' + } + excep = WSGIHTTPException() + assert_equal( excep(environ,start_response), [ + '<html>\n' + ' <head>\n' + ' <title>None None</title>\n' + ' </head>\n' + ' <body>\n' + ' <h1>None None</h1>\n' + ' <br /><br />\n' + '\n' + '\n\n' + ' </body>\n' + '</html>' ] + ) + +def test_WSGIHTTPException_call_w_body(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'PUT' + } + excep = WSGIHTTPException() + excep.body = 'test' + assert_equal( excep(environ,start_response), ['test'] ) + + +def test_WSGIHTTPException_wsgi_response(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'HEAD' + } + excep = WSGIHTTPException() + assert_equal( excep.wsgi_response(environ,start_response), [] ) + +def test_WSGIHTTPException_exception_newstyle(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'HEAD' + } + excep = WSGIHTTPException() + exc.newstyle_exceptions = True + assert_equal( excep.exception(environ,start_response), [] ) + +def test_WSGIHTTPException_exception_no_newstyle(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'HEAD' + } + excep = WSGIHTTPException() + exc.newstyle_exceptions = False + assert_equal( excep.exception(environ,start_response), [] ) + +def test_HTTPMove(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'HEAD' + } + m = _HTTPMove() + assert_equal( m( environ, start_response ), [] ) + +def test_HTTPMove_location_not_none(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'HEAD' + } + m = _HTTPMove(location='http://example.com') + assert_equal( m( environ, start_response ), [] ) + +def test_HTTPMove_add_slash_and_location(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'HEAD' + } + assert_raises( TypeError, _HTTPMove, location='http://example.com', add_slash=True ) + +def test_HTTPMove_call_add_slash(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'HEAD' + } + m = _HTTPMove() + m.add_slash = True + assert_equal( m( environ, start_response ), [] ) + +def test_HTTPMove_call_query_string(): + def start_response(status, headers, exc_info=None): + pass + environ = { + 'wsgi.url_scheme': 'HTTP', + 'SERVER_NAME': 'localhost', + 'SERVER_PORT': '80', + 'REQUEST_METHOD': 'HEAD' + } + m = _HTTPMove() + m.add_slash = True + environ[ 'QUERY_STRING' ] = 'querystring' + assert_equal( m( environ, start_response ), [] ) + +def test_HTTPExceptionMiddleware_ok(): + def app( environ, start_response ): + return '123' + application = app + m = HTTPExceptionMiddleware(application) + environ = {} + start_response = None + res = m( environ, start_response ) + assert_equal( res, '123' ) + +def test_HTTPExceptionMiddleware_exception(): + def wsgi_response( environ, start_response): + return '123' + def app( environ, start_response ): + raise HTTPException( None, wsgi_response ) + application = app + m = HTTPExceptionMiddleware(application) + environ = {} + start_response = None + res = m( environ, start_response ) + assert_equal( res, '123' ) + +def test_HTTPExceptionMiddleware_exception_exc_info_none(): + class DummySys: + def exc_info(self): + return None + def wsgi_response( environ, start_response): + return start_response('200 OK', [], exc_info=None) + def app( environ, start_response ): + raise HTTPException( None, wsgi_response ) + application = app + m = HTTPExceptionMiddleware(application) + environ = {} + def start_response(status, headers, exc_info): + pass + try: + from webob import exc + old_sys = exc.sys + sys = DummySys() + res = m( environ, start_response ) + assert_equal( res, None ) + finally: + exc.sys = old_sys diff --git a/tests/test_headers.py b/tests/test_headers.py index e7c40dd..8bcf2d6 100644 --- a/tests/test_headers.py +++ b/tests/test_headers.py @@ -1,21 +1,24 @@ -# -*- coding: UTF-8 -*- +# -*- coding: utf-8 -*- from webob import headers -from nose.tools import ok_, assert_raises, eq_ as eq +from nose.tools import ok_, assert_raises, eq_ class TestError(Exception): pass -def test_raise_keyerror(): - """Deleting a missing key from ResponseHeaders should raise a KeyError - Deleting a present key should not raise an error at all - """ +def test_ResponseHeaders_delitem_notpresent(): + """Deleting a missing key from ResponseHeaders should raise a KeyError""" d = headers.ResponseHeaders() assert_raises(KeyError, d.__delitem__, 'b') + +def test_ResponseHeaders_delitem_present(): + """ + Deleting a present key should not raise an error at all + """ d = headers.ResponseHeaders(a=1) del d['a'] ok_('a' not in d) -def test_set_default(): +def test_ResponseHeaders_setdefault(): """Testing set_default for ResponseHeaders""" d = headers.ResponseHeaders(a=1) res = d.setdefault('b', 1) @@ -25,23 +28,86 @@ def test_set_default(): res = d.setdefault('B', 10) assert res == d['b'] == d['B'] == 1 -def test_pop(): +def test_ResponseHeader_pop(): """Testing if pop return TypeError when more than len(*args)>1 plus other assorted tests""" d = headers.ResponseHeaders(a=1, b=2, c=3, d=4) assert_raises(TypeError, d.pop, 'a', 'z', 'y') - assert d.pop('a') == 1 - assert 'a' not in d - assert d.pop('B') == 2 - assert 'b' not in d - assert d.pop('c', 'u') == 3 - assert 'c' not in d - assert d.pop('e', 'u') =='u' - assert 'e' not in d + eq_(d.pop('a'), 1) + ok_('a' not in d) + eq_(d.pop('B'), 2) + ok_('b' not in d) + eq_(d.pop('c', 'u'), 3) + ok_('c' not in d) + eq_(d.pop('e', 'u'), 'u') + ok_('e' not in d) assert_raises(KeyError, d.pop, 'z') -def test_delitem_environheaders(): +def test_ResponseHeaders_getitem_miss(): + d = headers.ResponseHeaders() + assert_raises(KeyError, d.__getitem__, 'a') + +def test_ResponseHeaders_getall(): + d = headers.ResponseHeaders() + d.add('a', 1) + d.add('a', 2) + result = d.getall('a') + eq_(result, [1,2]) + +def test_ResponseHeaders_mixed(): + d = headers.ResponseHeaders() + d.add('a', 1) + d.add('a', 2) + d['b'] = 1 + result = d.mixed() + eq_(result, {'a':[1,2], 'b':1}) + +def test_ResponseHeaders_setitem_scalar_replaces_seq(): + d = headers.ResponseHeaders() + d.add('a', 2) + d['a'] = 1 + result = d.getall('a') + eq_(result, [1]) + +def test_ResponseHeaders_contains(): + d = headers.ResponseHeaders() + d['a'] = 1 + ok_('a' in d) + ok_(not 'b' in d) + +def test_EnvironHeaders_delitem(): d = headers.EnvironHeaders({'CONTENT_LENGTH': '10'}) del d['CONTENT-LENGTH'] assert not d assert_raises(KeyError, d.__delitem__, 'CONTENT-LENGTH') + +def test_EnvironHeaders_getitem(): + d = headers.EnvironHeaders({'CONTENT_LENGTH': '10'}) + eq_(d['CONTENT-LENGTH'], '10') + +def test_EnvironHeaders_setitem(): + d = headers.EnvironHeaders({}) + d['abc'] = '10' + eq_(d['abc'], '10') + +def test_EnvironHeaders_contains(): + d = headers.EnvironHeaders({}) + d['a'] = '10' + ok_('a' in d) + ok_(not 'b' in d) + +def test__trans_key_not_basestring(): + result = headers._trans_key(None) + eq_(result, None) + +def test__trans_key_not_a_header(): + result = headers._trans_key('') + eq_(result, None) + +def test__trans_key_key2header(): + result = headers._trans_key('CONTENT_TYPE') + eq_(result, 'Content-Type') + +def test__trans_key_httpheader(): + result = headers._trans_key('HTTP_FOO_BAR') + eq_(result, 'Foo-Bar') diff --git a/tests/test_multidict.py b/tests/test_multidict.py index b4db015..d3ec57e 100644 --- a/tests/test_multidict.py +++ b/tests/test_multidict.py @@ -3,98 +3,213 @@ import unittest from webob import multidict -class MultiDictTestCase(unittest.TestCase): - klass = multidict.MultiDict - data = multidict.MultiDict([('a', u'\xe9'), ('a', 'e'), ('b', 1)]) - +class BaseDictTests(object): def setUp(self): - self.d = self._get_instance() + self._list = [('a', u'\xe9'), ('a', 'e'), ('a', 'f'), ('b', 1)] + self.data = multidict.MultiDict(self._list) + self.d = self._get_instance() def _get_instance(self): return self.klass(self.data.copy()) def test_len(self): - assert len(self.d) == 3 + self.assertEqual(len(self.d), 4) def test_getone(self): - assert self.d.getone('b') == 1 + self.assertEqual(self.d.getone('b'), 1) + + def test_getone_missing(self): + self.assertRaises(KeyError, self.d.getone, 'z') + + def test_getone_multiple_raises(self): self.assertRaises(KeyError, self.d.getone, 'a') def test_getall(self): - assert self.d.getall('b') == [1] + self.assertEqual(self.d.getall('b'), [1]) def test_dict_of_lists(self): - assert self.d.dict_of_lists() == {'a': [u'\xe9', u'e'], 'b': [1]}, self.d.dict_of_lists() + self.assertEqual( + self.d.dict_of_lists(), + {'a': [u'\xe9', u'e', u'f'], 'b': [1]}) def test_dict_api(self): - assert 'a' in self.d.mixed() - assert 'a' in self.d.keys() - assert 'a' in self.d.iterkeys() - assert ('b', 1) in self.d.items() - assert ('b', 1) in self.d.iteritems() - assert 1 in self.d.values() - assert 1 in self.d.itervalues() - assert len(self.d) == 3 + self.assertTrue('a' in self.d.mixed()) + self.assertTrue('a' in self.d.keys()) + self.assertTrue('a' in self.d.iterkeys()) + self.assertTrue(('b', 1) in self.d.items()) + self.assertTrue(('b', 1) in self.d.iteritems()) + self.assertTrue(1 in self.d.values()) + self.assertTrue(1 in self.d.itervalues()) + self.assertEqual(len(self.d), 4) def test_set_del_item(self): d = self._get_instance() - assert 'a' in d + self.assertTrue('a' in d) del d['a'] - assert 'a' not in d - d['a'] = 1 + self.assertTrue(not 'a' in d) def test_pop(self): d = self._get_instance() d['a'] = 1 - assert d.pop('a') == 1 - assert d.pop('x', 1) == 1 - assert d.popitem() == ('b', 1) + self.assertEqual(d.pop('a'), 1) + self.assertEqual(d.pop('x', 1), 1) + + def test_pop_wrong_args(self): + d = self._get_instance() + self.assertRaises(TypeError, d.pop, 'a', 1, 1) + + def test_pop_missing(self): + d = self._get_instance() + self.assertRaises(KeyError, d.pop, 'z') + + def test_popitem(self): + d = self._get_instance() + self.assertEqual(d.popitem(), ('b', 1)) def test_update(self): d = self._get_instance() d.update(e=1) - assert 'e' in d + self.assertTrue('e' in d) d.update(dict(x=1)) - assert 'x' in d + self.assertTrue('x' in d) d.update([('y', 1)]) - assert 'y' in d + self.assertTrue('y' in d) def test_setdefault(self): d = self._get_instance() d.setdefault('a', 1) - assert d['a'] != 1 + self.assertNotEqual(d['a'], 1) d.setdefault('e', 1) - assert 'e' in d + self.assertTrue('e' in d) def test_add(self): d = self._get_instance() d.add('b', 3) - assert d.getall('b') == [1, 3] + self.assertEqual(d.getall('b'), [1, 3]) def test_copy(self): assert self.d.copy() is not self.d if hasattr(self.d, 'multi'): - assert self.d.copy().multi is not self.d.multi - assert self.d.copy() is not self.d.multi + self.assertFalse(self.d.copy().multi is self.d.multi) + self.assertFalse(self.d.copy() is self.d.multi) def test_clear(self): d = self._get_instance() d.clear() - assert len(d) == 0 + self.assertEqual(len(d), 0) def test_nonzero(self): d = self._get_instance() - assert d + self.assertTrue(d) d.clear() - assert not d + self.assertFalse(d) + + def test_repr(self): + self.assertTrue(repr(self._get_instance())) + def test_too_many_args(self): + from webob.multidict import MultiDict + self.assertRaises(TypeError, MultiDict, 1, 2) -class UnicodeMultiDictTestCase(MultiDictTestCase): + def test_no_args(self): + from webob.multidict import MultiDict + md = MultiDict() + self.assertEqual(md._items, []) + + def test_kwargs(self): + from webob.multidict import MultiDict + md = MultiDict(kw1='val1') + self.assertEqual(md._items, [('kw1','val1')]) + + def test_view_list_not_list(self): + from webob.multidict import MultiDict + d = MultiDict() + self.assertRaises(TypeError, d.view_list, 42) + + def test_view_list(self): + from webob.multidict import MultiDict + d = MultiDict() + self.assertEqual(d.view_list([1,2])._items, [1,2]) + + def test_from_fieldstorage_with_filename(self): + from webob.multidict import MultiDict + d = MultiDict() + fs = DummyFieldStorage('a', '1', 'file') + self.assertEqual(d.from_fieldstorage(fs), MultiDict({'a':fs.list[0]})) + + def test_from_fieldstorage_without_filename(self): + from webob.multidict import MultiDict + d = MultiDict() + fs = DummyFieldStorage('a', '1') + self.assertEqual(d.from_fieldstorage(fs), MultiDict({'a':'1'})) + +class MultiDictTestCase(BaseDictTests, unittest.TestCase): + klass = multidict.MultiDict + + def test_update_behavior_warning(self): + import warnings + class Foo(dict): + def __len__(self): + return 0 + foo = Foo() + foo['a'] = 1 + d = self._get_instance() + try: + warnings.simplefilter('error') + self.assertRaises(UserWarning, d.update, foo) + finally: + warnings.resetwarnings() + +class UnicodeMultiDictTestCase(BaseDictTests, unittest.TestCase): klass = multidict.UnicodeMultiDict -class NestedMultiDictTestCase(MultiDictTestCase): + def test_decode_key(self): + d = self._get_instance() + d.decode_keys = True + + class Key(object): + pass + + key = Key() + self.assertEquals(key, d._decode_key(key)) + + def test_decode_value(self): + import cgi + + d = self._get_instance() + d.decode_keys = True + + fs = cgi.FieldStorage() + fs.name = 'a' + self.assertEqual(d._decode_value(fs).name, 'a') + + def test_encode_key(self): + d = self._get_instance() + value = unicode('a') + d.decode_keys = True + self.assertEquals(d._encode_key(value),'a') + + def test_encode_value(self): + d = self._get_instance() + value = unicode('a') + self.assertEquals(d._encode_value(value),'a') + +class NestedMultiDictTestCase(BaseDictTests, unittest.TestCase): klass = multidict.NestedMultiDict + def test_getitem(self): + d = self.klass({'a':1}) + self.assertEqual(d['a'], 1) + + def test_getitem_raises(self): + d = self._get_instance() + self.assertRaises(KeyError, d.__getitem__, 'z') + + def test_contains(self): + d = self._get_instance() + self.assertEquals(d.__contains__('a'), True) + self.assertEquals(d.__contains__('z'), False) + def test_add(self): d = self._get_instance() self.assertRaises(KeyError, d.add, 'b', 3) @@ -119,17 +234,156 @@ class NestedMultiDictTestCase(MultiDictTestCase): self.assertRaises(KeyError, d.pop, 'a') self.assertRaises(KeyError, d.pop, 'a', 1) + def test_popitem(self): + d = self._get_instance() + self.assertRaises(KeyError, d.popitem, 'a') + + def test_pop_wrong_args(self): + d = self._get_instance() + self.assertRaises(KeyError, d.pop, 'a', 1, 1) + def test_clear(self): d = self._get_instance() self.assertRaises(KeyError, d.clear) def test_nonzero(self): d = self._get_instance() - assert d + self.assertEqual(d.__nonzero__(), True) + d.dicts = [{}] + self.assertEqual(d.__nonzero__(), False) + assert not d -class TrackableMultiDict(MultiDictTestCase): +class TrackableMultiDict(BaseDictTests, unittest.TestCase): klass = multidict.TrackableMultiDict def _get_instance(self): def tracker(*args, **kwargs): pass return self.klass(self.data.copy(), __tracker=tracker, __name='tracker') + + def test_inititems(self): + #The first argument passed into the __init__ method + class Arg: + def items(self): + return [('a', u'\xe9'), ('a', 'e'), ('a', 'f'), ('b', 1)] + + d = self._get_instance() + d._items = None + d.__init__(Arg()) + self.assertEquals(self.d._items, self._list) + + def test_nullextend(self): + d = self._get_instance() + self.assertEqual(d.extend(), None) + d.extend(test = 'a') + self.assertEqual(d['test'], 'a') + + def test_listextend(self): + class Other: + def items(self): + return [u'\xe9', u'e', r'f', 1] + + other = Other() + d = self._get_instance() + d.extend(other) + + _list = [u'\xe9', u'e', r'f', 1] + for v in _list: + self.assertTrue(v in d._items) + + def test_dictextend(self): + class Other: + def __getitem__(self, item): + return {'a':1, 'b':2, 'c':3}.get(item) + + def keys(self): + return ['a', 'b', 'c'] + + other = Other() + d = self._get_instance() + d.extend(other) + + _list = [('a', 1), ('b', 2), ('c', 3)] + for v in _list: + self.assertTrue(v in d._items) + + def test_otherextend(self): + class Other(object): + def __iter__(self): + return iter([('a', 1)]) + + other = Other() + d = self._get_instance() + d.extend(other) + + _list = [('a', 1)] + for v in _list: + self.assertTrue(v in d._items) + +class NoVarsTestCase(unittest.TestCase): + klass = multidict.NoVars + + def _get_instance(self): + return self.klass() + + def test_getitem(self): + d = self._get_instance() + self.assertRaises(KeyError, d.__getitem__, 'a') + + def test_setitem(self): + d = self._get_instance() + self.assertRaises(KeyError, d.__setitem__, 'a') + + def test_delitem(self): + d = self._get_instance() + self.assertRaises(KeyError, d.__delitem__, 'a') + + def test_get(self): + d = self._get_instance() + self.assertEqual(d.get('a', default = 'b'), 'b') + + def test_getall(self): + d = self._get_instance() + self.assertEqual(d.getall('a'), []) + + def test_getone(self): + d = self._get_instance() + self.assertRaises(KeyError, d.getone, 'a') + + def test_mixed(self): + d = self._get_instance() + self.assertEqual(d.mixed(), {}) + + def test_contains(self): + d = self._get_instance() + self.assertEqual(d.__contains__('a'), False) + + def test_copy(self): + d = self._get_instance() + self.assertEqual(d.copy(), d) + + def test_len(self): + d = self._get_instance() + self.assertEqual(len(d), 0) + + def test_repr(self): + d = self._get_instance() + self.assertEqual(repr(d), '<NoVars: N/A>') + + def test_keys(self): + d = self._get_instance() + self.assertEqual(d.keys(), []) + + def test_iterkeys(self): + d = self._get_instance() + self.assertEqual(list(d.iterkeys()), []) + +class DummyField(object): + def __init__(self, name, value, filename=None): + self.name = name + self.value = value + self.filename = filename + +class DummyFieldStorage(object): + def __init__(self, name, value, filename=None): + self.list = [DummyField(name, value, filename)] + diff --git a/tests/test_request.py b/tests/test_request.py index ae7bad8..301480e 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -1,14 +1,2271 @@ -from webob import Request, BaseRequest -from webob.request import ( - NoDefault, AdhocAttrMixin, environ_from_url, environ_add_POST -) -from webtest import TestApp -from nose.tools import eq_, ok_, assert_raises, assert_false -from cStringIO import StringIO -import string -import cgi +import unittest + +_marker = object() + +class BaseRequestTests(unittest.TestCase): + + def _getTargetClass(self): + from webob import BaseRequest + return BaseRequest + + def _makeOne(self, environ, environ_getter=None, charset=_marker, + unicode_errors=_marker, decode_param_names=_marker, **kw): + from webob.request import NoDefault + if charset is _marker: + charset = NoDefault + if unicode_errors is _marker: + unicode_errors = NoDefault + if decode_param_names is _marker: + decode_param_names = NoDefault + return self._getTargetClass()(environ, environ_getter, charset, + unicode_errors, decode_param_names, **kw) + + def _makeStringIO(self, text): + try: + from io import BytesIO + except ImportError: # Python < 2.6 + from StringIO import StringIO as BytesIO + return BytesIO(text) + + def test_ctor_environ_getter_raises_WTF(self): + # This API should be changed. + self.assertRaises(ValueError, + self._makeOne, {}, environ_getter=object()) + + def test_ctor_wo_environ_raises_WTF(self): + # This API should be changed. + self.assertRaises(TypeError, self._makeOne, None) + + def test_ctor_w_environ(self): + environ = {} + req = self._makeOne(environ) + self.assertEqual(req.environ, environ) + + def test_body_file_getter(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT} + req = self._makeOne(environ) + self.assert_(req.body_file is INPUT) + + def test_body_file_setter_w_string(self): + BEFORE = self._makeStringIO('before') + AFTER = str('AFTER') + environ = {'wsgi.input': BEFORE, + 'CONTENT_LENGTH': len('before'), + } + req = self._makeOne(environ) + req.body_file = AFTER + self.assertEqual(req.body_file.getvalue(), AFTER) + self.assertEqual(req.content_length, len(AFTER)) + + def test_body_file_setter_non_string(self): + BEFORE = self._makeStringIO('before') + AFTER = self._makeStringIO('after') + environ = {'wsgi.input': BEFORE, + 'CONTENT_LENGTH': len('before'), + } + req = self._makeOne(environ) + req.body_file = AFTER + self.assert_(req.body_file is AFTER) + self.assertEqual(req.content_length, None) + + def test_body_file_deleter(self): + INPUT = self._makeStringIO('before') + environ = {'wsgi.input': INPUT, + 'CONTENT_LENGTH': len('before'), + } + req = self._makeOne(environ) + del req.body_file + self.assertEqual(req.body_file.getvalue(), '') + self.assertEqual(req.content_length, 0) + + def test_body_file_raw(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + self.assert_(req.body_file_raw is INPUT) + + def test_body_file_seekable_input_not_seekable(self): + INPUT = self._makeStringIO('input') + INPUT.seek(1, 0) # consume + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': False, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + seekable = req.body_file_seekable + self.assert_(seekable is not INPUT) + self.assertEqual(seekable.getvalue(), 'nput') + + def test_body_file_seekable_input_is_seekable(self): + INPUT = self._makeStringIO('input') + INPUT.seek(1, 0) # consume + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + seekable = req.body_file_seekable + self.assert_(seekable is INPUT) + + def test_scheme(self): + environ = {'wsgi.url_scheme': 'something:', + } + req = self._makeOne(environ) + self.assertEqual(req.scheme, 'something:') + + def test_method(self): + environ = {'REQUEST_METHOD': 'OPTIONS', + } + req = self._makeOne(environ) + self.assertEqual(req.method, 'OPTIONS') + + def test_http_version(self): + environ = {'SERVER_PROTOCOL': '1.1', + } + req = self._makeOne(environ) + self.assertEqual(req.http_version, '1.1') + + def test_script_name(self): + environ = {'SCRIPT_NAME': '/script', + } + req = self._makeOne(environ) + self.assertEqual(req.script_name, '/script') + + def test_path_info(self): + environ = {'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.path_info, '/path/info') + + def test_content_length_getter(self): + environ = {'CONTENT_LENGTH': '1234', + } + req = self._makeOne(environ) + self.assertEqual(req.content_length, 1234) + + def test_content_length_setter_w_str(self): + environ = {'CONTENT_LENGTH': '1234', + } + req = self._makeOne(environ) + req.content_length = '3456' + self.assertEqual(req.content_length, 3456) + + def test_remote_user(self): + environ = {'REMOTE_USER': 'phred', + } + req = self._makeOne(environ) + self.assertEqual(req.remote_user, 'phred') + + def test_remote_addr(self): + environ = {'REMOTE_ADDR': '1.2.3.4', + } + req = self._makeOne(environ) + self.assertEqual(req.remote_addr, '1.2.3.4') + + def test_query_string(self): + environ = {'QUERY_STRING': 'foo=bar&baz=bam', + } + req = self._makeOne(environ) + self.assertEqual(req.query_string, 'foo=bar&baz=bam') + + def test_server_name(self): + environ = {'SERVER_NAME': 'somehost.tld', + } + req = self._makeOne(environ) + self.assertEqual(req.server_name, 'somehost.tld') + + def test_server_port_getter(self): + environ = {'SERVER_PORT': '6666', + } + req = self._makeOne(environ) + self.assertEqual(req.server_port, 6666) + + def test_server_port_setter_with_string(self): + environ = {'SERVER_PORT': '6666', + } + req = self._makeOne(environ) + req.server_port = '6667' + self.assertEqual(req.server_port, 6667) + + def test_uscript_name(self): + environ = {'SCRIPT_NAME': '/script', + } + req = self._makeOne(environ) + self.assert_(isinstance(req.uscript_name, unicode)) + self.assertEqual(req.uscript_name, '/script') + + def test_upath_info(self): + environ = {'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assert_(isinstance(req.upath_info, unicode)) + self.assertEqual(req.upath_info, '/path/info') + + def test_content_type_getter_no_parameters(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar', + } + req = self._makeOne(environ) + self.assertEqual(req.content_type, 'application/xml+foobar') + + def test_content_type_getter_w_parameters(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar;charset="utf8"', + } + req = self._makeOne(environ) + self.assertEqual(req.content_type, 'application/xml+foobar') + + def test_content_type_setter_w_None(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar;charset="utf8"', + } + req = self._makeOne(environ) + req.content_type = None + self.assertEqual(req.content_type, '') + self.assert_('CONTENT_TYPE' not in environ) + + def test_content_type_setter_existing_paramter_no_new_paramter(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar;charset="utf8"', + } + req = self._makeOne(environ) + req.content_type = 'text/xml' + self.assertEqual(req.content_type, 'text/xml') + self.assertEqual(environ['CONTENT_TYPE'], 'text/xml;charset="utf8"') + + def test_content_type_deleter_clears_environ_value(self): + environ = {'CONTENT_TYPE': 'application/xml+foobar;charset="utf8"', + } + req = self._makeOne(environ) + del req.content_type + self.assertEqual(req.content_type, '') + self.assert_('CONTENT_TYPE' not in environ) + + def test_content_type_deleter_no_environ_value(self): + environ = {} + req = self._makeOne(environ) + del req.content_type + self.assertEqual(req.content_type, '') + self.assert_('CONTENT_TYPE' not in environ) + + def test_charset_getter_cache_hit(self): + CT = 'application/xml+foobar' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req._charset_cache = (CT, 'cp1252') + self.assertEqual(req.charset, 'cp1252') + + def test_charset_getter_cache_miss_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + self.assertEqual(req.charset, 'utf8') + self.assertEqual(req._charset_cache, (CT, 'utf8')) + + def test_charset_getter_cache_miss_wo_parameter(self): + CT = 'application/xml+foobar' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + self.assertEqual(req.charset, 'UTF-8') + self.assertEqual(req._charset_cache, (CT, 'UTF-8')) + + def test_charset_setter_None_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req.charset = None + self.assertEqual(environ['CONTENT_TYPE'], 'application/xml+foobar') + self.assertEqual(req.charset, 'UTF-8') + + def test_charset_setter_empty_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req.charset = '' + self.assertEqual(environ['CONTENT_TYPE'], 'application/xml+foobar') + self.assertEqual(req.charset, 'UTF-8') + + def test_charset_setter_nonempty_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req.charset = 'cp1252' + self.assertEqual(environ['CONTENT_TYPE'], + #'application/xml+foobar; charset="cp1252"') WTF? + 'application/xml+foobar;charset=cp1252', + ) + self.assertEqual(req.charset, 'cp1252') + + def test_charset_setter_nonempty_wo_parameter(self): + CT = 'application/xml+foobar' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + req.charset = 'cp1252' + self.assertEqual(environ['CONTENT_TYPE'], + 'application/xml+foobar; charset="cp1252"', + #'application/xml+foobar;charset=cp1252', WTF? + ) + self.assertEqual(req.charset, 'cp1252') + + def test_charset_deleter_w_parameter(self): + CT = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CT, + } + req = self._makeOne(environ) + del req.charset + self.assertEqual(environ['CONTENT_TYPE'], 'application/xml+foobar') + self.assertEqual(req.charset, 'UTF-8') + + def test_headers_getter_miss(self): + CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CONTENT_TYPE, + 'CONTENT_LENGTH': '123', + } + req = self._makeOne(environ) + headers = req.headers + self.assertEqual(headers, + {'Content-Type': CONTENT_TYPE, + 'Content-Length': '123'}) + self.assertEqual(req._headers, headers) + + def test_headers_getter_hit(self): + CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CONTENT_TYPE, + 'CONTENT_LENGTH': '123', + } + req = self._makeOne(environ) + req._headers = {'Foo': 'Bar'} + self.assertEqual(req.headers, + {'Foo': 'Bar'}) + + def test_headers_setter(self): + CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CONTENT_TYPE, + 'CONTENT_LENGTH': '123', + } + req = self._makeOne(environ) + req._headers = {'Foo': 'Bar'} + req.headers = {'Qux': 'Spam'} + self.assertEqual(req.headers, + {'Qux': 'Spam'}) + + def test_no_headers_deleter(self): + CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' + environ = {'CONTENT_TYPE': CONTENT_TYPE, + 'CONTENT_LENGTH': '123', + } + req = self._makeOne(environ) + def _test(): + del req.headers + self.assertRaises(AttributeError, _test) + + def test_host_url_w_http_host_and_no_port(self): + environ = {'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'example.com', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'http://example.com') + + def test_host_url_w_http_host_and_standard_port(self): + environ = {'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'example.com:80', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'http://example.com') + + def test_host_url_w_http_host_and_oddball_port(self): + environ = {'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'example.com:8888', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'http://example.com:8888') + + def test_host_url_w_http_host_https_and_no_port(self): + environ = {'wsgi.url_scheme': 'https', + 'HTTP_HOST': 'example.com', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'https://example.com') + + def test_host_url_w_http_host_https_and_standard_port(self): + environ = {'wsgi.url_scheme': 'https', + 'HTTP_HOST': 'example.com:443', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'https://example.com') + + def test_host_url_w_http_host_https_and_oddball_port(self): + environ = {'wsgi.url_scheme': 'https', + 'HTTP_HOST': 'example.com:4333', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'https://example.com:4333') + + def test_host_url_wo_http_host(self): + environ = {'wsgi.url_scheme': 'https', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '4333', + } + req = self._makeOne(environ) + self.assertEqual(req.host_url, 'https://example.com:4333') + + def test_application_url(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + } + req = self._makeOne(environ) + self.assertEqual(req.application_url, 'http://example.com/script') + + def test_path_url(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.path_url, 'http://example.com/script/path/info') + + def test_path(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.path, '/script/path/info') + + def test_path_qs_no_qs(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.path_qs, '/script/path/info') + + def test_path_qs_w_qs(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.path_qs, '/script/path/info?foo=bar&baz=bam') + + def test_url_no_qs(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + self.assertEqual(req.url, 'http://example.com/script/path/info') + + def test_url_w_qs(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.url, + 'http://example.com/script/path/info?foo=bar&baz=bam') + + def test_relative_url_to_app_true_wo_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.relative_url('other/page', True), + 'http://example.com/script/other/page') + + def test_relative_url_to_app_true_w_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.relative_url('/other/page', True), + 'http://example.com/other/page') + + def test_relative_url_to_app_false_other_w_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.relative_url('/other/page', False), + 'http://example.com/other/page') + + def test_relative_url_to_app_false_other_wo_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + 'QUERY_STRING': 'foo=bar&baz=bam' + } + req = self._makeOne(environ) + self.assertEqual(req.relative_url('other/page', False), + 'http://example.com/script/path/other/page') + + def test_path_info_pop_empty(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '', + } + req = self._makeOne(environ) + popped = req.path_info_pop() + self.assertEqual(popped, None) + self.assertEqual(environ['SCRIPT_NAME'], '/script') + + def test_path_info_pop_just_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/', + } + req = self._makeOne(environ) + popped = req.path_info_pop() + self.assertEqual(popped, '') + self.assertEqual(environ['SCRIPT_NAME'], '/script/') + self.assertEqual(environ['PATH_INFO'], '') + + def test_path_info_pop_non_empty_no_pattern(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + popped = req.path_info_pop() + self.assertEqual(popped, 'path') + self.assertEqual(environ['SCRIPT_NAME'], '/script/path') + self.assertEqual(environ['PATH_INFO'], '/info') + + def test_path_info_pop_non_empty_w_pattern_miss(self): + import re + PATTERN = re.compile('miss') + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + popped = req.path_info_pop(PATTERN) + self.assertEqual(popped, None) + self.assertEqual(environ['SCRIPT_NAME'], '/script') + self.assertEqual(environ['PATH_INFO'], '/path/info') + + def test_path_info_pop_non_empty_w_pattern_hit(self): + import re + PATTERN = re.compile('path') + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path/info', + } + req = self._makeOne(environ) + popped = req.path_info_pop(PATTERN) + self.assertEqual(popped, 'path') + self.assertEqual(environ['SCRIPT_NAME'], '/script/path') + self.assertEqual(environ['PATH_INFO'], '/info') + + def test_path_info_pop_skips_empty_elements(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '//path/info', + } + req = self._makeOne(environ) + popped = req.path_info_pop() + self.assertEqual(popped, 'path') + self.assertEqual(environ['SCRIPT_NAME'], '/script//path') + self.assertEqual(environ['PATH_INFO'], '/info') + + def test_path_info_peek_empty(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '', + } + req = self._makeOne(environ) + peeked = req.path_info_peek() + self.assertEqual(peeked, None) + self.assertEqual(environ['SCRIPT_NAME'], '/script') + self.assertEqual(environ['PATH_INFO'], '') + + def test_path_info_peek_just_leading_slash(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/', + } + req = self._makeOne(environ) + peeked = req.path_info_peek() + self.assertEqual(peeked, '') + self.assertEqual(environ['SCRIPT_NAME'], '/script') + self.assertEqual(environ['PATH_INFO'], '/') + + def test_path_info_peek_non_empty(self): + environ = {'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/script', + 'PATH_INFO': '/path', + } + req = self._makeOne(environ) + peeked = req.path_info_peek() + self.assertEqual(peeked, 'path') + self.assertEqual(environ['SCRIPT_NAME'], '/script') + self.assertEqual(environ['PATH_INFO'], '/path') + + def test_urlvars_getter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + self.assertEqual(req.urlvars, {'foo': 'bar'}) + + def test_urlvars_getter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': ((), {'foo': 'bar'}), + } + req = self._makeOne(environ) + self.assertEqual(req.urlvars, {'foo': 'bar'}) + + def test_urlvars_getter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + self.assertEqual(req.urlvars, {}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) + + def test_urlvars_setter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + req.urlvars = {'baz': 'bam'} + self.assertEqual(req.urlvars, {'baz': 'bam'}) + self.assertEqual(environ['paste.urlvars'], {'baz': 'bam'}) + self.assert_('wsgiorg.routing_args' not in environ) + + def test_urlvars_setter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': ((), {'foo': 'bar'}), + 'paste.urlvars': {'qux': 'spam'}, + } + req = self._makeOne(environ) + req.urlvars = {'baz': 'bam'} + self.assertEqual(req.urlvars, {'baz': 'bam'}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {'baz': 'bam'})) + self.assert_('paste.urlvars' not in environ) + + def test_urlvars_setter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + req.urlvars = {'baz': 'bam'} + self.assertEqual(req.urlvars, {'baz': 'bam'}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {'baz': 'bam'})) + self.assert_('paste.urlvars' not in environ) + + def test_urlvars_deleter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + del req.urlvars + self.assertEqual(req.urlvars, {}) + self.assert_('paste.urlvars' not in environ) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) + + def test_urlvars_deleter_w_wsgiorg_key_non_empty_tuple(self): + environ = {'wsgiorg.routing_args': (('a', 'b'), {'foo': 'bar'}), + 'paste.urlvars': {'qux': 'spam'}, + } + req = self._makeOne(environ) + del req.urlvars + self.assertEqual(req.urlvars, {}) + self.assertEqual(environ['wsgiorg.routing_args'], (('a', 'b'), {})) + self.assert_('paste.urlvars' not in environ) + + def test_urlvars_deleter_w_wsgiorg_key_empty_tuple(self): + environ = {'wsgiorg.routing_args': ((), {'foo': 'bar'}), + 'paste.urlvars': {'qux': 'spam'}, + } + req = self._makeOne(environ) + del req.urlvars + self.assertEqual(req.urlvars, {}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) + self.assert_('paste.urlvars' not in environ) + + def test_urlvars_deleter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + del req.urlvars + self.assertEqual(req.urlvars, {}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) + self.assert_('paste.urlvars' not in environ) + + def test_urlargs_getter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + self.assertEqual(req.urlargs, ()) + + def test_urlargs_getter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': (('a', 'b'), {'foo': 'bar'}), + } + req = self._makeOne(environ) + self.assertEqual(req.urlargs, ('a', 'b')) + + def test_urlargs_getter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + self.assertEqual(req.urlargs, ()) + self.assert_('wsgiorg.routing_args' not in environ) + + def test_urlargs_setter_w_paste_key(self): + environ = {'paste.urlvars': {'foo': 'bar'}, + } + req = self._makeOne(environ) + req.urlargs = ('a', 'b') + self.assertEqual(req.urlargs, ('a', 'b')) + self.assertEqual(environ['wsgiorg.routing_args'], + (('a', 'b'), {'foo': 'bar'})) + self.assert_('paste.urlvars' not in environ) + + def test_urlargs_setter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': ((), {'foo': 'bar'}), + } + req = self._makeOne(environ) + req.urlargs = ('a', 'b') + self.assertEqual(req.urlargs, ('a', 'b')) + self.assertEqual(environ['wsgiorg.routing_args'], + (('a', 'b'), {'foo': 'bar'})) + + def test_urlargs_setter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + req.urlargs = ('a', 'b') + self.assertEqual(req.urlargs, ('a', 'b')) + self.assertEqual(environ['wsgiorg.routing_args'], + (('a', 'b'), {})) + self.assert_('paste.urlvars' not in environ) + + def test_urlargs_deleter_w_wsgiorg_key(self): + environ = {'wsgiorg.routing_args': (('a', 'b'), {'foo': 'bar'}), + } + req = self._makeOne(environ) + del req.urlargs + self.assertEqual(req.urlargs, ()) + self.assertEqual(environ['wsgiorg.routing_args'], + ((), {'foo': 'bar'})) + + def test_urlargs_deleter_w_wsgiorg_key_empty(self): + environ = {'wsgiorg.routing_args': ((), {}), + } + req = self._makeOne(environ) + del req.urlargs + self.assertEqual(req.urlargs, ()) + self.assert_('paste.urlvars' not in environ) + self.assert_('wsgiorg.routing_args' not in environ) + + def test_urlargs_deleter_wo_keys(self): + environ = {} + req = self._makeOne(environ) + del req.urlargs + self.assertEqual(req.urlargs, ()) + self.assert_('paste.urlvars' not in environ) + self.assert_('wsgiorg.routing_args' not in environ) + + def test_str_cookies_empty_environ(self): + req = self._makeOne({}) + self.assertEqual(req.str_cookies, {}) + + def test_str_cookies_w_webob_parsed_cookies_matching_source(self): + environ = { + 'HTTP_COOKIE': 'a=b', + 'webob._parsed_cookies': ('a=b', {'a': 'b'}), + } + req = self._makeOne(environ) + self.assertEqual(req.str_cookies, {'a': 'b'}) + + def test_str_cookies_w_webob_parsed_cookies_mismatched_source(self): + environ = { + 'HTTP_COOKIE': 'a=b', + 'webob._parsed_cookies': ('a=b;c=d', {'a': 'b', 'c': 'd'}), + } + req = self._makeOne(environ) + self.assertEqual(req.str_cookies, {'a': 'b'}) + + def test_is_xhr_no_header(self): + req = self._makeOne({}) + self.assert_(not req.is_xhr) + + def test_is_xhr_header_miss(self): + environ = {'HTTP_X_REQUESTED_WITH': 'notAnXMLHTTPRequest'} + req = self._makeOne(environ) + self.assert_(not req.is_xhr) + + def test_is_xhr_header_hit(self): + environ = {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'} + req = self._makeOne(environ) + self.assert_(req.is_xhr) + + # host + def test_host_getter_w_HTTP_HOST(self): + environ = {'HTTP_HOST': 'example.com:8888'} + req = self._makeOne(environ) + self.assertEqual(req.host, 'example.com:8888') + + def test_host_getter_wo_HTTP_HOST(self): + environ = {'SERVER_NAME': 'example.com', + 'SERVER_PORT': '8888'} + req = self._makeOne(environ) + self.assertEqual(req.host, 'example.com:8888') + + def test_host_setter(self): + environ = {} + req = self._makeOne(environ) + req.host = 'example.com:8888' + self.assertEqual(environ['HTTP_HOST'], 'example.com:8888') + + def test_host_deleter_hit(self): + environ = {'HTTP_HOST': 'example.com:8888'} + req = self._makeOne(environ) + del req.host + self.assert_('HTTP_HOST' not in environ) + + def test_host_deleter_miss(self): + environ = {} + req = self._makeOne(environ) + del req.host # doesn't raise + + # body + def test_body_getter(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + self.assertEqual(req.body, 'input') + self.assertEqual(req.content_length, len('input')) + def test_body_setter_None(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + req.body = None + self.assertEqual(req.body, '') + self.assertEqual(req.content_length, 0) + self.assert_(req.is_body_seekable) + def test_body_setter_non_string_raises(self): + req = self._makeOne({}) + def _test(): + req.body = object() + self.assertRaises(TypeError, _test) + def test_body_setter_value(self): + BEFORE = self._makeStringIO('before') + environ = {'wsgi.input': BEFORE, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('before'), + } + req = self._makeOne(environ) + req.body = 'after' + self.assertEqual(req.body, 'after') + self.assertEqual(req.content_length, len('after')) + self.assert_(req.is_body_seekable) + def test_body_deleter_None(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'CONTENT_LENGTH': len('input'), + } + req = self._makeOne(environ) + del req.body + self.assertEqual(req.body, '') + self.assertEqual(req.content_length, 0) + self.assert_(req.is_body_seekable) + + def test_str_POST_not_POST_or_PUT(self): + from webob.multidict import NoVars + environ = {'REQUEST_METHOD': 'GET', + } + req = self._makeOne(environ) + result = req.str_POST + self.assert_(isinstance(result, NoVars)) + self.assert_(result.reason.startswith('Not a form request')) + + def test_str_POST_existing_cache_hit(self): + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'REQUEST_METHOD': 'POST', + 'webob._parsed_post_vars': ({'foo': 'bar'}, INPUT), + } + req = self._makeOne(environ) + result = req.str_POST + self.assertEqual(result, {'foo': 'bar'}) + + def test_str_PUT_missing_content_type(self): + from webob.multidict import NoVars + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'REQUEST_METHOD': 'PUT', + } + req = self._makeOne(environ) + result = req.str_POST + self.assert_(isinstance(result, NoVars)) + self.assert_(result.reason.startswith('Not an HTML form submission')) + + def test_str_PUT_bad_content_type(self): + from webob.multidict import NoVars + INPUT = self._makeStringIO('input') + environ = {'wsgi.input': INPUT, + 'REQUEST_METHOD': 'PUT', + 'CONTENT_TYPE': 'text/plain', + } + req = self._makeOne(environ) + result = req.str_POST + self.assert_(isinstance(result, NoVars)) + self.assert_(result.reason.startswith('Not an HTML form submission')) + + def test_str_POST_multipart(self): + BODY_TEXT = ( + '------------------------------deb95b63e42a\n' + 'Content-Disposition: form-data; name="foo"\n' + '\n' + 'foo\n' + '------------------------------deb95b63e42a\n' + 'Content-Disposition: form-data; name="bar"; filename="bar.txt"\n' + 'Content-type: application/octet-stream\n' + '\n' + 'these are the contents of the file "bar.txt"\n' + '\n' + '------------------------------deb95b63e42a--\n') + INPUT = self._makeStringIO(BODY_TEXT) + environ = {'wsgi.input': INPUT, + 'webob.is_body_seekable': True, + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data; ' + 'boundary=----------------------------deb95b63e42a', + 'CONTENT_LENGTH': len(BODY_TEXT), + } + req = self._makeOne(environ) + result = req.str_POST + self.assertEqual(result['foo'], 'foo') + bar = result['bar'] + self.assertEqual(bar.name, 'bar') + self.assertEqual(bar.filename, 'bar.txt') + self.assertEqual(bar.file.read(), + 'these are the contents of the file "bar.txt"\n') + + # POST + # str_GET + def test_str_GET_reflects_query_string(self): + environ = { + 'QUERY_STRING': 'foo=123', + } + req = self._makeOne(environ) + result = req.str_GET + self.assertEqual(result, {'foo': '123'}) + req.query_string = 'foo=456' + result = req.str_GET + self.assertEqual(result, {'foo': '456'}) + req.query_string = '' + result = req.str_GET + self.assertEqual(result, {}) + + def test_str_GET_updates_query_string(self): + environ = { + } + req = self._makeOne(environ) + result = req.query_string + self.assertEqual(result, '') + req.str_GET['foo'] = '123' + result = req.query_string + self.assertEqual(result, 'foo=123') + del req.str_GET['foo'] + result = req.query_string + self.assertEqual(result, '') + + # GET + # str_postvars + # postvars + # str_queryvars + # queryvars + # is_xhr + # str_params + # params + + def test_str_cookies_wo_webob_parsed_cookies(self): + from webob import Request + environ = { + 'HTTP_COOKIE': 'a=b', + } + req = Request.blank('/', environ) + self.assertEqual(req.str_cookies, {'a': 'b'}) + + # cookies + # copy + + def test_copy_get(self): + from webob import Request + environ = { + 'HTTP_COOKIE': 'a=b', + } + req = Request.blank('/', environ) + clone = req.copy_get() + for k, v in req.environ.items(): + if k in ('CONTENT_LENGTH', 'webob.is_body_seekable'): + self.assert_(k not in clone.environ) + elif k == 'wsgi.input': + self.assert_(clone.environ[k] is not v) + else: + self.assertEqual(clone.environ[k], v) + + def test_remove_conditional_headers_accept_encoding(self): + from webob import Request + req = Request.blank('/') + req.accept_encoding='gzip,deflate' + req.remove_conditional_headers() + self.assertEqual(bool(req.accept_encoding), False) + + def test_remove_conditional_headers_if_modified_since(self): + from datetime import datetime + from webob import Request, UTC + req = Request.blank('/') + req.if_modified_since = datetime(2006, 1, 1, 12, 0, tzinfo=UTC) + req.remove_conditional_headers() + self.assertEqual(req.if_modified_since, None) + + def test_remove_conditional_headers_if_none_match(self): + from webob import Request + req = Request.blank('/') + req.if_none_match = 'foo, bar' + req.remove_conditional_headers() + self.assertEqual(bool(req.if_none_match), False) + + def test_remove_conditional_headers_if_range(self): + from webob import Request + req = Request.blank('/') + req.if_range = 'foo, bar' + req.remove_conditional_headers() + self.assertEqual(bool(req.if_range), False) + + def test_remove_conditional_headers_range(self): + from webob import Request + req = Request.blank('/') + req.range = 'bytes=0-100' + req.remove_conditional_headers() + self.assertEqual(req.range, None) + + + # is_body_seekable + # make_body_seekable + # copy_body + # make_tempfile + # remove_conditional_headers + # accept + # accept_charset + # accept_encoding + # accept_language + # authorization + + # cache_control + def test_cache_control_reflects_environ(self): + environ = { + 'HTTP_CACHE_CONTROL': 'max-age=5', + } + req = self._makeOne(environ) + result = req.cache_control + self.assertEqual(result.properties, {'max-age': 5}) + req.environ.update(HTTP_CACHE_CONTROL='max-age=10') + result = req.cache_control + self.assertEqual(result.properties, {'max-age': 10}) + req.environ.update(HTTP_CACHE_CONTROL='') + result = req.cache_control + self.assertEqual(result.properties, {}) + + def test_cache_control_updates_environ(self): + environ = {} + req = self._makeOne(environ) + req.cache_control.max_age = 5 + result = req.environ['HTTP_CACHE_CONTROL'] + self.assertEqual(result, 'max-age=5') + req.cache_control.max_age = 10 + result = req.environ['HTTP_CACHE_CONTROL'] + self.assertEqual(result, 'max-age=10') + req.cache_control = None + result = req.environ['HTTP_CACHE_CONTROL'] + self.assertEqual(result, '') + del req.cache_control + self.assert_('HTTP_CACHE_CONTROL' not in req.environ) + + def test_cache_control_set_dict(self): + environ = {} + req = self._makeOne(environ) + req.cache_control = {'max-age': 5} + result = req.cache_control + self.assertEqual(result.max_age, 5) + + def test_cache_control_set_object(self): + from webob.cachecontrol import CacheControl + environ = {} + req = self._makeOne(environ) + req.cache_control = CacheControl({'max-age': 5}, type='request') + result = req.cache_control + self.assertEqual(result.max_age, 5) + + def test_cache_control_gets_cached(self): + environ = {} + req = self._makeOne(environ) + self.assert_(req.cache_control is req.cache_control) + + #if_match + #if_none_match + + #date + #if_modified_since + #if_unmodified_since + #if_range + #max_forwards + #pragma + #range + #referer + #referrer + #user_agent + #__repr__ + #__str__ + #from_file + + #call_application + def test_call_application_calls_application(self): + environ = {} + req = self._makeOne(environ) + def application(environ, start_response): + start_response('200 OK', [('content-type', 'text/plain')]) + return ['...\n'] + status, headers, output = req.call_application(application) + self.assertEqual(status, '200 OK') + self.assertEqual(headers, [('content-type', 'text/plain')]) + self.assertEqual(''.join(output), '...\n') + + def test_call_application_provides_write(self): + environ = {} + req = self._makeOne(environ) + def application(environ, start_response): + write = start_response('200 OK', [('content-type', 'text/plain')]) + write('...\n') + return [] + status, headers, output = req.call_application(application) + self.assertEqual(status, '200 OK') + self.assertEqual(headers, [('content-type', 'text/plain')]) + self.assertEqual(''.join(output), '...\n') + + def test_call_application_closes_iterable_when_mixed_with_write_calls(self): + environ = { + 'test._call_application_called_close': False + } + req = self._makeOne(environ) + def application(environ, start_response): + write = start_response('200 OK', [('content-type', 'text/plain')]) + class AppIter(object): + def __iter__(self): + yield '...\n' + def close(self): + environ['test._call_application_called_close'] = True + write('...\n') + return AppIter() + status, headers, output = req.call_application(application) + self.assertEqual(''.join(output), '...\n...\n') + self.assertEqual(environ['test._call_application_called_close'], True) + + def test_call_application_raises_exc_info(self): + environ = {} + req = self._makeOne(environ) + def application(environ, start_response): + try: + raise RuntimeError('OH NOES') + except: + import sys + exc_info = sys.exc_info() + start_response('200 OK', [('content-type', 'text/plain')], exc_info) + return ['...\n'] + self.assertRaises(RuntimeError, req.call_application, application) + + def test_call_application_returns_exc_info(self): + environ = {} + req = self._makeOne(environ) + def application(environ, start_response): + try: + raise RuntimeError('OH NOES') + except: + import sys + exc_info = sys.exc_info() + start_response('200 OK', [('content-type', 'text/plain')], exc_info) + return ['...\n'] + status, headers, output, exc_info = req.call_application(application, True) + self.assertEqual(status, '200 OK') + self.assertEqual(headers, [('content-type', 'text/plain')]) + self.assertEqual(''.join(output), '...\n') + self.assertEqual(exc_info[0], RuntimeError) + + #get_response + #blank + + #from_string + def test_from_string_extra_data(self): + from webob import BaseRequest + _test_req_copy = _test_req.replace('Content-Type', + 'Content-Length: 337\r\nContent-Type') + self.assertRaises(ValueError, BaseRequest.from_string, + _test_req_copy+'EXTRA!') + + #as_string + def test_as_string_skip_body(self): + from webob import BaseRequest + req = BaseRequest.from_string(_test_req) + body = req.as_string(skip_body=True) + self.assertEqual(body.count('\r\n\r\n'), 0) + self.assertEqual(req.as_string(skip_body=337), req.as_string()) + body = req.as_string(337-1).split('\r\n\r\n', 1)[1] + self.assertEqual(body, '<body skipped (len=337)>') + + def test_adhoc_attrs_set(self): + from webob import Request + req = Request.blank('/') + req.foo = 1 + self.assertEqual(req.environ['webob.adhoc_attrs'], {'foo': 1}) + + def test_adhoc_attrs_set_nonadhoc(self): + from webob import Request + req = Request.blank('/', environ={'webob.adhoc_attrs':{}}) + req.request_body_tempfile_limit = 1 + self.assertEqual(req.environ['webob.adhoc_attrs'], {}) + + def test_adhoc_attrs_get(self): + from webob import Request + req = Request.blank('/', environ={'webob.adhoc_attrs': {'foo': 1}}) + self.assertEqual(req.foo, 1) + + def test_adhoc_attrs_get_missing(self): + from webob import Request + req = Request.blank('/') + self.assertRaises(AttributeError, getattr, req, 'some_attr') + + def test_adhoc_attrs_del(self): + from webob import Request + req = Request.blank('/', environ={'webob.adhoc_attrs': {'foo': 1}}) + del req.foo + self.assertEqual(req.environ['webob.adhoc_attrs'], {}) + + def test_adhoc_attrs_del_missing(self): + from webob import Request + req = Request.blank('/') + self.assertRaises(AttributeError, delattr, req, 'some_attr') + + # TODO: webob/request.py:1143 + # the only known way to reach this line is by experimentation + + +class RequestTests_functional(unittest.TestCase): + + def test_gets(self): + from webtest import TestApp + app = TestApp(simpleapp) + res = app.get('/') + self.assert_('Hello' in res) + self.assert_("get is GET([])" in res) + self.assert_("post is <NoVars: Not a form request>" in res) + + res = app.get('/?name=george') + res.mustcontain("get is GET([('name', 'george')])") + res.mustcontain("Val is george") + + def test_language_parsing(self): + from webtest import TestApp + app = TestApp(simpleapp) + res = app.get('/') + self.assert_("The languages are: ['en-US']" in res) + + res = app.get('/', + headers={'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'}) + self.assert_("languages are: ['da', 'en-gb', 'en-US']" in res) + + res = app.get('/', + headers={'Accept-Language': 'en-gb;q=0.8, da, en;q=0.7'}) + self.assert_("languages are: ['da', 'en-gb', 'en-US']" in res) + + def test_mime_parsing(self): + from webtest import TestApp + app = TestApp(simpleapp) + res = app.get('/', headers={'Accept':'text/html'}) + self.assert_("accepttypes is: text/html" in res) + + res = app.get('/', headers={'Accept':'application/xml'}) + self.assert_("accepttypes is: application/xml" in res) + + res = app.get('/', headers={'Accept':'application/xml,*/*'}) + self.assert_("accepttypes is: application/xml" in res) + + def test_accept_best_match(self): + from webob import Request + self.assert_(not Request.blank('/').accept) + self.assert_(not Request.blank('/', headers={'Accept': ''}).accept) + req = Request.blank('/', headers={'Accept':'text/plain'}) + self.assert_(req.accept) + self.assertRaises(ValueError, req.accept.best_match, ['*/*']) + req = Request.blank('/', accept=['*/*','text/*']) + self.assertEqual( + req.accept.best_match(['application/x-foo', 'text/plain']), + 'text/plain') + self.assertEqual( + req.accept.best_match(['text/plain', 'application/x-foo']), + 'text/plain') + req = Request.blank('/', accept=['text/plain', 'message/*']) + self.assertEqual( + req.accept.best_match(['message/x-foo', 'text/plain']), + 'text/plain') + self.assertEqual( + req.accept.best_match(['text/plain', 'message/x-foo']), + 'text/plain') + + def test_from_mimeparse(self): + # http://mimeparse.googlecode.com/svn/trunk/mimeparse.py + from webob import Request + supported = ['application/xbel+xml', 'application/xml'] + tests = [('application/xbel+xml', 'application/xbel+xml'), + ('application/xbel+xml; q=1', 'application/xbel+xml'), + ('application/xml; q=1', 'application/xml'), + ('application/*; q=1', 'application/xbel+xml'), + ('*/*', 'application/xbel+xml')] + + for accept, get in tests: + req = Request.blank('/', headers={'Accept':accept}) + self.assertEqual(req.accept.best_match(supported), get) + + supported = ['application/xbel+xml', 'text/xml'] + tests = [('text/*;q=0.5,*/*; q=0.1', 'text/xml'), + ('text/html,application/atom+xml; q=0.9', None)] + + for accept, get in tests: + req = Request.blank('/', headers={'Accept':accept}) + self.assertEqual(req.accept.best_match(supported), get) + + supported = ['application/json', 'text/html'] + tests = [ + ('application/json, text/javascript, */*', 'application/json'), + ('application/json, text/html;q=0.9', 'application/json'), + ] + + for accept, get in tests: + req = Request.blank('/', headers={'Accept':accept}) + self.assertEqual(req.accept.best_match(supported), get) + + offered = ['image/png', 'application/xml'] + tests = [ + ('image/png', 'image/png'), + ('image/*', 'image/png'), + ('image/*, application/xml', 'application/xml'), + ] + + for accept, get in tests: + req = Request.blank('/', accept=accept) + self.assertEqual(req.accept.best_match(offered), get) + + def test_headers(self): + from webtest import TestApp + app = TestApp(simpleapp) + headers = { + 'If-Modified-Since': 'Sat, 29 Oct 1994 19:43:31 GMT', + 'Cookie': 'var1=value1', + 'User-Agent': 'Mozilla 4.0 (compatible; MSIE)', + 'If-None-Match': '"etag001", "etag002"', + 'X-Requested-With': 'XMLHttpRequest', + } + res = app.get('/?foo=bar&baz', headers=headers) + res.mustcontain( + 'if_modified_since: ' + + 'datetime.datetime(1994, 10, 29, 19, 43, 31, tzinfo=UTC)', + "user_agent: 'Mozilla", + 'is_xhr: True', + "cookies is {'var1': 'value1'}", + "params is NestedMultiDict([('foo', 'bar'), ('baz', '')])", + "if_none_match: <ETag etag001 or etag002>", + ) + + def test_bad_cookie(self): + from webob import Request + req = Request.blank('/') + req.headers['Cookie'] = '070-it-:><?0' + self.assertEqual(req.cookies, {}) + req.headers['Cookie'] = 'foo=bar' + self.assertEqual(req.cookies, {'foo': 'bar'}) + req.headers['Cookie'] = '...' + self.assertEqual(req.cookies, {}) + req.headers['Cookie'] = '=foo' + self.assertEqual(req.cookies, {}) + req.headers['Cookie'] = ('dismiss-top=6; CP=null*; ' + 'PHPSESSID=0a539d42abc001cdc762809248d4beed; a=42') + self.assertEqual(req.cookies, { + 'CP': u'null*', + 'PHPSESSID': u'0a539d42abc001cdc762809248d4beed', + 'a': u'42', + 'dismiss-top': u'6' + }) + req.headers['Cookie'] = 'fo234{=bar blub=Blah' + self.assertEqual(req.cookies, {'blub': 'Blah'}) + + def test_cookie_quoting(self): + from webob import Request + req = Request.blank('/') + req.headers['Cookie'] = 'foo="?foo"; Path=/' + self.assertEqual(req.cookies, {'foo': '?foo'}) + + def test_path_quoting(self): + from webob import Request + path = '/:@&+$,/bar' + req = Request.blank(path) + self.assertEqual(req.path, path) + self.assert_(req.url.endswith(path)) + + def test_params(self): + from webob import Request + req = Request.blank('/?a=1&b=2') + req.method = 'POST' + req.body = 'b=3' + self.assertEqual(req.params.items(), + [('a', '1'), ('b', '2'), ('b', '3')]) + new_params = req.params.copy() + self.assertEqual(new_params.items(), + [('a', '1'), ('b', '2'), ('b', '3')]) + new_params['b'] = '4' + self.assertEqual(new_params.items(), [('a', '1'), ('b', '4')]) + # The key name is \u1000: + req = Request.blank('/?%E1%80%80=x', + decode_param_names=True, charset='UTF-8') + self.assert_(req.decode_param_names) + self.assert_(u'\u1000' in req.GET.keys()) + self.assertEqual(req.GET[u'\u1000'], 'x') + + def test_copy_body(self): + from webob import Request + req = Request.blank('/', method='POST', body='some text', + request_body_tempfile_limit=1) + old_body_file = req.body_file_raw + req.copy_body() + self.assert_(req.body_file_raw is not old_body_file) + req = Request.blank('/', method='POST', + body_file=UnseekableInput('0123456789'), content_length=10) + self.assert_(not hasattr(req.body_file_raw, 'seek')) + old_body_file = req.body_file_raw + req.make_body_seekable() + self.assert_(req.body_file_raw is not old_body_file) + self.assertEqual(req.body, '0123456789') + old_body_file = req.body_file + req.make_body_seekable() + self.assert_(req.body_file_raw is old_body_file) + self.assert_(req.body_file is old_body_file) + + def test_broken_seek(self): + # copy() should work even when the input has a broken seek method + from webob import Request + req = Request.blank('/', method='POST', + body_file=UnseekableInputWithSeek('0123456789'), + content_length=10) + self.assert_(hasattr(req.body_file_raw, 'seek')) + self.assertRaises(IOError, req.body_file_raw.seek, 0) + old_body_file = req.body_file + req2 = req.copy() + self.assert_(req2.body_file_raw is req2.body_file is not old_body_file) + self.assertEqual(req2.body, '0123456789') + + def test_set_body(self): + from webob import BaseRequest + req = BaseRequest.blank('/', body='foo') + self.assert_(req.is_body_seekable) + self.assertEqual(req.body, 'foo') + self.assertEqual(req.content_length, 3) + del req.body + self.assertEqual(req.body, '') + self.assertEqual(req.content_length, 0) + + def test_broken_clen_header(self): + # if the UA sends "content_length: ..' header (the name is wrong) + # it should not break the req.headers.items() + from webob import Request + req = Request.blank('/') + req.environ['HTTP_CONTENT_LENGTH'] = '0' + req.headers.items() + + def test_nonstr_keys(self): + # non-string env keys shouldn't break req.headers + from webob import Request + req = Request.blank('/') + req.environ[1] = 1 + req.headers.items() + + + def test_authorization(self): + from webob import Request + req = Request.blank('/') + req.authorization = 'Digest uri="/?a=b"' + self.assertEqual(req.authorization, ('Digest', {'uri': '/?a=b'})) + + def test_authorization2(self): + from webob.descriptors import parse_auth_params + for s, d in [ + ('x=y', {'x': 'y'}), + ('x="y"', {'x': 'y'}), + ('x=y,z=z', {'x': 'y', 'z': 'z'}), + ('x=y, z=z', {'x': 'y', 'z': 'z'}), + ('x="y",z=z', {'x': 'y', 'z': 'z'}), + ('x="y", z=z', {'x': 'y', 'z': 'z'}), + ('x="y,x", z=z', {'x': 'y,x', 'z': 'z'}), + ]: + self.assertEqual(parse_auth_params(s), d) + + + def test_from_file(self): + from webob import Request + req = Request.blank('http://example.com:8000/test.html?params') + self.equal_req(req) + + req = Request.blank('http://example.com/test2') + req.method = 'POST' + req.body = 'test=example' + self.equal_req(req) + + def test_req_kw_none_val(self): + from webob import Request + request = Request({}, content_length=None) + self.assert_('content-length' not in request.headers) + self.assert_('content-type' not in request.headers) + + def test_env_keys(self): + from webob import Request + req = Request.blank('/') + # SCRIPT_NAME can be missing + del req.environ['SCRIPT_NAME'] + self.assertEqual(req.script_name, '') + self.assertEqual(req.uscript_name, u'') + + def test_repr_nodefault(self): + from webob.request import NoDefault + nd = NoDefault + self.assertEqual(repr(nd), '(No Default)') + + def test_request_noenviron_param(self): + # Environ is a a mandatory not null param in Request. + from webob import Request + self.assertRaises(TypeError, Request, environ=None) + + def test_environ_getter(self): + # Parameter environ_getter in Request is no longer valid and + # should raise an error in case it's used + from webob import Request + class env(object): + def __init__(self, env): + self.env = env + def env_getter(self): + return self.env + self.assertRaises(ValueError, + Request, environ_getter=env({'a':1}).env_getter) + + def test_unicode_errors(self): + # Passing unicode_errors != NoDefault should assign value to + # dictionary['unicode_errors'], else not + from webob.request import NoDefault + from webob import Request + r = Request({'a':1}, unicode_errors='strict') + self.assert_('unicode_errors' in r.__dict__) + r = Request({'a':1}, unicode_errors=NoDefault) + self.assert_('unicode_errors' not in r.__dict__) + + def test_charset_deprecation(self): + # Any class that inherits from BaseRequest cannot define a + # default_charset attribute. + # Any class that inherits from BaseRequest cannot define a + # charset attr that is instance of str + from webob import BaseRequest + from webob.request import AdhocAttrMixin + class NewRequest(BaseRequest): + default_charset = 'utf-8' + def __init__(self, environ, **kw): + super(NewRequest, self).__init__(environ, **kw) + self.assertRaises(DeprecationWarning, NewRequest, {'a':1}) + class NewRequest(BaseRequest): + charset = 'utf-8' + def __init__(self, environ, **kw): + super(NewRequest, self).__init__(environ, **kw) + self.assertRaises(DeprecationWarning, NewRequest, {'a':1}) + class NewRequest(AdhocAttrMixin, BaseRequest): + default_charset = 'utf-8' + def __init__(self, environ, **kw): + super(NewRequest, self).__init__(environ, **kw) + self.assertRaises(DeprecationWarning, NewRequest, {'a':1}) + class NewRequest(AdhocAttrMixin, BaseRequest): + charset = 'utf-8' + def __init__(self, environ, **kw): + super(NewRequest, self).__init__(environ, **kw) + self.assertRaises(DeprecationWarning, NewRequest, {'a':1}) + + def test_unexpected_kw(self): + # Passed an attr in kw that does not exist in the class, should + # raise an error + # Passed an attr in kw that does exist in the class, should be ok + from webob import Request + self.assertRaises(TypeError, + Request, {'a':1}, this_does_not_exist=1) + r = Request({'a':1}, **{'charset':'utf-8', 'server_name':'127.0.0.1'}) + self.assertEqual(getattr(r, 'charset', None), 'utf-8') + self.assertEqual(getattr(r, 'server_name', None), '127.0.0.1') + + def test_body_file_setter(self): + # If body_file is passed and it's instance of str, we define + # environ['wsgi.input'] and content_length. Plus, while deleting the + # attribute, we should get '' and 0 respectively + from webob import Request + r = Request({'a':1}, **{'body_file':'hello world'}) + self.assertEqual(r.environ['wsgi.input'].getvalue(), 'hello world') + self.assertEqual(int(r.environ['CONTENT_LENGTH']), len('hello world')) + del r.body_file + self.assertEqual(r.environ['wsgi.input'].getvalue(), '') + self.assertEqual(int(r.environ['CONTENT_LENGTH']), 0) + + def test_conttype_set_del(self): + # Deleting content_type attr from a request should update the + # environ dict + # Assigning content_type should replace first option of the environ + # dict + from webob import Request + r = Request({'a':1}, **{'content_type':'text/html'}) + self.assert_('CONTENT_TYPE' in r.environ) + self.assert_(hasattr(r, 'content_type')) + del r.content_type + self.assert_('CONTENT_TYPE' not in r.environ) + a = Request({'a':1}, + content_type='charset=utf-8;application/atom+xml;type=entry') + self.assert_(a.environ['CONTENT_TYPE']== + 'charset=utf-8;application/atom+xml;type=entry') + a.content_type = 'charset=utf-8' + self.assert_(a.environ['CONTENT_TYPE']== + 'charset=utf-8;application/atom+xml;type=entry') + + def test_headers2(self): + # Setting headers in init and later with a property, should update + # the info + from webob import Request + headers = {'Host': 'www.example.com', + 'Accept-Language': 'en-us,en;q=0.5', + 'Accept-Encoding': 'gzip,deflate', + 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', + 'Keep-Alive': '115', + 'Connection': 'keep-alive', + 'Cache-Control': 'max-age=0'} + r = Request({'a':1}, headers=headers) + for i in headers.keys(): + self.assert_(i in r.headers and + 'HTTP_'+i.upper().replace('-', '_') in r.environ) + r.headers = {'Server':'Apache'} + self.assertEqual(r.environ.keys(), ['a', 'HTTP_SERVER']) + + def test_host_url(self): + # Request has a read only property host_url that combines several + # keys to create a host_url + from webob import Request + a = Request({'wsgi.url_scheme':'http'}, **{'host':'www.example.com'}) + self.assertEqual(a.host_url, 'http://www.example.com') + a = Request({'wsgi.url_scheme':'http'}, **{'server_name':'localhost', + 'server_port':5000}) + self.assertEqual(a.host_url, 'http://localhost:5000') + a = Request({'wsgi.url_scheme':'https'}, **{'server_name':'localhost', + 'server_port':443}) + self.assertEqual(a.host_url, 'https://localhost') + + def test_path_info_p(self): + # Peek path_info to see what's coming + # Pop path_info until there's nothing remaining + from webob import Request + a = Request({'a':1}, **{'path_info':'/foo/bar','script_name':''}) + self.assertEqual(a.path_info_peek(), 'foo') + self.assertEqual(a.path_info_pop(), 'foo') + self.assertEqual(a.path_info_peek(), 'bar') + self.assertEqual(a.path_info_pop(), 'bar') + self.assertEqual(a.path_info_peek(), None) + self.assertEqual(a.path_info_pop(), None) + + def test_urlvars_property(self): + # Testing urlvars setter/getter/deleter + from webob import Request + a = Request({'wsgiorg.routing_args':((),{'x':'y'}), + 'paste.urlvars':{'test':'value'}}) + a.urlvars = {'hello':'world'} + self.assert_('paste.urlvars' not in a.environ) + self.assertEqual(a.environ['wsgiorg.routing_args'], + ((), {'hello':'world'})) + del a.urlvars + self.assert_('wsgiorg.routing_args' not in a.environ) + a = Request({'paste.urlvars':{'test':'value'}}) + self.assertEqual(a.urlvars, {'test':'value'}) + a.urlvars = {'hello':'world'} + self.assertEqual(a.environ['paste.urlvars'], {'hello':'world'}) + del a.urlvars + self.assert_('paste.urlvars' not in a.environ) + + def test_urlargs_property(self): + # Testing urlargs setter/getter/deleter + from webob import Request + a = Request({'paste.urlvars':{'test':'value'}}) + self.assertEqual(a.urlargs, ()) + a.urlargs = {'hello':'world'} + self.assertEqual(a.environ['wsgiorg.routing_args'], + ({'hello':'world'}, {'test':'value'})) + a = Request({'a':1}) + a.urlargs = {'hello':'world'} + self.assertEqual(a.environ['wsgiorg.routing_args'], + ({'hello':'world'}, {})) + del a.urlargs + self.assert_('wsgiorg.routing_args' not in a.environ) + + def test_host_property(self): + # Testing host setter/getter/deleter + from webob import Request + a = Request({'wsgi.url_scheme':'http'}, server_name='localhost', + server_port=5000) + self.assertEqual(a.host, "localhost:5000") + a.host = "localhost:5000" + self.assert_('HTTP_HOST' in a.environ) + del a.host + self.assert_('HTTP_HOST' not in a.environ) + + def test_body_property(self): + # Testing body setter/getter/deleter plus making sure body has a + # seek method + #a = Request({'a':1}, **{'CONTENT_LENGTH':'?'}) + # I cannot think of a case where somebody would put anything else + # than a # numerical value in CONTENT_LENGTH, Google didn't help + # either + #self.assertEqual(a.body, '') + # I need to implement a not seekable stringio like object. + import string + from webob import Request + from webob import BaseRequest + from cStringIO import StringIO + class DummyIO(object): + def __init__(self, txt): + self.txt = txt + def read(self, n=-1): + return self.txt[0:n] + limit = BaseRequest.request_body_tempfile_limit + len_strl = limit // len(string.letters) + 1 + r = Request({'a':1}, body_file=DummyIO(string.letters * len_strl)) + self.assertEqual(len(r.body), len(string.letters*len_strl)-1) + self.assertRaises(TypeError, + setattr, r, 'body', unicode('hello world')) + r.body = None + self.assertEqual(r.body, '') + r = Request({'a':1}, body_file=DummyIO(string.letters)) + self.assert_(not hasattr(r.body_file_raw, 'seek')) + r.make_body_seekable() + self.assert_(hasattr(r.body_file_raw, 'seek')) + r = Request({'a':1}, body_file=StringIO(string.letters)) + self.assert_(hasattr(r.body_file_raw, 'seek')) + r.make_body_seekable() + self.assert_(hasattr(r.body_file_raw, 'seek')) + + def test_repr_invalid(self): + # If we have an invalid WSGI environ, the repr should tell us. + from webob import BaseRequest + req = BaseRequest({'CONTENT_LENGTH':'0', 'body':''}) + self.assert_(repr(req).endswith('(invalid WSGI environ)>')) + + def test_from_garbage_file(self): + # If we pass a file with garbage to from_file method it should + # raise an error plus missing bits in from_file method + from cStringIO import StringIO + from webob import BaseRequest + self.assertRaises(ValueError, + BaseRequest.from_file, StringIO('hello world')) + val_file = StringIO( + "GET /webob/ HTTP/1.1\n" + "Host: pythonpaste.org\n" + "User-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.13)" + "Gecko/20101206 Ubuntu/10.04 (lucid) Firefox/3.6.13\n" + "Accept: " + "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;" + "q=0.8\n" + "Accept-Language: en-us,en;q=0.5\n" + "Accept-Encoding: gzip,deflate\n" + "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" + # duplicate on purpose + "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" + "Keep-Alive: 115\n" + "Connection: keep-alive\n" + ) + req = BaseRequest.from_file(val_file) + self.assert_(isinstance(req, BaseRequest)) + self.assert_(not repr(req).endswith('(invalid WSGI environ)>')) + val_file = StringIO( + "GET /webob/ HTTP/1.1\n" + "Host pythonpaste.org\n" + ) + self.assertRaises(ValueError, BaseRequest.from_file, val_file) + + def test_from_string(self): + # A valid request without a Content-Length header should still read + # the full body. + # Also test parity between as_string and from_string / from_file. + import cgi + from webob import BaseRequest + req = BaseRequest.from_string(_test_req) + self.assert_(isinstance(req, BaseRequest)) + self.assert_(not repr(req).endswith('(invalid WSGI environ)>')) + self.assert_('\n' not in req.http_version or '\r' in req.http_version) + self.assert_(',' not in req.host) + self.assert_(req.content_length is not None) + self.assertEqual(req.content_length, 337) + self.assert_('foo' in req.body) + bar_contents = "these are the contents of the file 'bar.txt'\r\n" + self.assert_(bar_contents in req.body) + self.assertEqual(req.params['foo'], 'foo') + bar = req.params['bar'] + self.assert_(isinstance(bar, cgi.FieldStorage)) + self.assertEqual(bar.type, 'application/octet-stream') + bar.file.seek(0) + self.assertEqual(bar.file.read(), bar_contents) + # out should equal contents, except for the Content-Length header, + # so insert that. + _test_req_copy = _test_req.replace('Content-Type', + 'Content-Length: 337\r\nContent-Type') + self.assertEqual(str(req), _test_req_copy) + + req2 = BaseRequest.from_string(_test_req2) + self.assert_('host' not in req2.headers) + self.assertEqual(str(req2), _test_req2.rstrip()) + self.assertRaises(ValueError, + BaseRequest.from_string, _test_req2 + 'xx') + + def test_blank(self): + # BaseRequest.blank class method + from webob import BaseRequest + self.assertRaises(ValueError, BaseRequest.blank, + 'www.example.com/foo?hello=world', None, + 'www.example.com/foo?hello=world') + self.assertRaises(ValueError, BaseRequest.blank, + 'gopher.example.com/foo?hello=world', None, + 'gopher://gopher.example.com') + req = BaseRequest.blank('www.example.com/foo?hello=world', None, + 'http://www.example.com') + self.assertEqual(req.environ.get('HTTP_HOST', None), + 'www.example.com:80') + self.assertEqual(req.environ.get('PATH_INFO', None), + 'www.example.com/foo') + self.assertEqual(req.environ.get('QUERY_STRING', None), + 'hello=world') + self.assertEqual(req.environ.get('REQUEST_METHOD', None), 'GET') + req = BaseRequest.blank('www.example.com/secure?hello=world', None, + 'https://www.example.com/secure') + self.assertEqual(req.environ.get('HTTP_HOST', None), + 'www.example.com:443') + self.assertEqual(req.environ.get('PATH_INFO', None), + 'www.example.com/secure') + self.assertEqual(req.environ.get('QUERY_STRING', None), 'hello=world') + self.assertEqual(req.environ.get('REQUEST_METHOD', None), 'GET') + self.assertEqual(req.environ.get('SCRIPT_NAME', None), '/secure') + self.assertEqual(req.environ.get('SERVER_NAME', None), + 'www.example.com') + self.assertEqual(req.environ.get('SERVER_PORT', None), '443') + + def test_environ_from_url(self): + # Generating an environ just from an url plus testing environ_add_POST + from webob.request import environ_add_POST + from webob.request import environ_from_url + self.assertRaises(TypeError, environ_from_url, + 'http://www.example.com/foo?bar=baz#qux') + self.assertRaises(TypeError, environ_from_url, + 'gopher://gopher.example.com') + req = environ_from_url('http://www.example.com/foo?bar=baz') + self.assertEqual(req.get('HTTP_HOST', None), 'www.example.com:80') + self.assertEqual(req.get('PATH_INFO', None), '/foo') + self.assertEqual(req.get('QUERY_STRING', None), 'bar=baz') + self.assertEqual(req.get('REQUEST_METHOD', None), 'GET') + self.assertEqual(req.get('SCRIPT_NAME', None), '') + self.assertEqual(req.get('SERVER_NAME', None), 'www.example.com') + self.assertEqual(req.get('SERVER_PORT', None), '80') + req = environ_from_url('https://www.example.com/foo?bar=baz') + self.assertEqual(req.get('HTTP_HOST', None), 'www.example.com:443') + self.assertEqual(req.get('PATH_INFO', None), '/foo') + self.assertEqual(req.get('QUERY_STRING', None), 'bar=baz') + self.assertEqual(req.get('REQUEST_METHOD', None), 'GET') + self.assertEqual(req.get('SCRIPT_NAME', None), '') + self.assertEqual(req.get('SERVER_NAME', None), 'www.example.com') + self.assertEqual(req.get('SERVER_PORT', None), '443') + environ_add_POST(req, None) + self.assert_('CONTENT_TYPE' not in req) + self.assert_('CONTENT_LENGTH' not in req) + environ_add_POST(req, {'hello':'world'}) + self.assert_(req.get('HTTP_HOST', None), 'www.example.com:443') + self.assertEqual(req.get('PATH_INFO', None), '/foo') + self.assertEqual(req.get('QUERY_STRING', None), 'bar=baz') + self.assertEqual(req.get('REQUEST_METHOD', None), 'POST') + self.assertEqual(req.get('SCRIPT_NAME', None), '') + self.assertEqual(req.get('SERVER_NAME', None), 'www.example.com') + self.assertEqual(req.get('SERVER_PORT', None), '443') + self.assertEqual(req.get('CONTENT_LENGTH', None),'11') + self.assertEqual(req.get('CONTENT_TYPE', None), + 'application/x-www-form-urlencoded') + self.assertEqual(req['wsgi.input'].read(), 'hello=world') + + + def test_post_does_not_reparse(self): + # test that there's no repetitive parsing is happening on every + # req.POST access + from webob import Request + req = Request.blank('/', + content_type='multipart/form-data; boundary=boundary', + POST=_cgi_escaping_body + ) + f0 = req.body_file_raw + post1 = req.str_POST + f1 = req.body_file_raw + self.assert_(f1 is not f0) + post2 = req.str_POST + f2 = req.body_file_raw + self.assert_(post1 is post2) + self.assert_(f1 is f2) + + + def test_middleware_body(self): + from webob import Request + def app(env, sr): + sr('200 OK', []) + return [env['wsgi.input'].read()] + + def mw(env, sr): + req = Request(env) + data = req.body_file.read() + resp = req.get_response(app) + resp.headers['x-data'] = data + return resp(env, sr) + + req = Request.blank('/', method='PUT', body='abc') + resp = req.get_response(mw) + self.assertEqual(resp.body, 'abc') + self.assertEqual(resp.headers['x-data'], 'abc') + + def test_body_file_noseek(self): + from webob import Request + req = Request.blank('/', method='PUT', body='abc') + lst = [req.body_file.read(1) for i in range(3)] + self.assertEqual(lst, ['a','b','c']) + + def test_cgi_escaping_fix(self): + from webob import Request + req = Request.blank('/', + content_type='multipart/form-data; boundary=boundary', + POST=_cgi_escaping_body + ) + self.assertEqual(req.POST.keys(), ['%20%22"']) + req.body_file.read() + self.assertEqual(req.POST.keys(), ['%20%22"']) + + def test_content_type_none(self): + from webob import Request + r = Request.blank('/', content_type='text/html') + self.assertEqual(r.content_type, 'text/html') + r.content_type = None + + def test_charset_in_content_type(self): + from webob import Request + r = Request({'CONTENT_TYPE':'text/html;charset=ascii'}) + r.charset = 'shift-jis' + self.assertEqual(r.charset, 'shift-jis') + + def test_body_file_seekable(self): + from cStringIO import StringIO + from webob import Request + r = Request.blank('/') + r.body_file = StringIO('body') + self.assertEqual(r.body_file_seekable.read(), 'body') + + def test_request_init(self): + # port from doctest (docs/reference.txt) + from webob import Request + req = Request.blank('/article?id=1') + self.assertEqual(req.environ['HTTP_HOST'], 'localhost:80') + self.assertEqual(req.environ['PATH_INFO'], '/article') + self.assertEqual(req.environ['QUERY_STRING'], 'id=1') + self.assertEqual(req.environ['REQUEST_METHOD'], 'GET') + self.assertEqual(req.environ['SCRIPT_NAME'], '') + self.assertEqual(req.environ['SERVER_NAME'], 'localhost') + self.assertEqual(req.environ['SERVER_PORT'], '80') + self.assertEqual(req.environ['SERVER_PROTOCOL'], 'HTTP/1.0') + self.assert_(hasattr(req.environ['wsgi.errors'], 'write') and + hasattr(req.environ['wsgi.errors'], 'flush')) + self.assert_(hasattr(req.environ['wsgi.input'], 'next')) + self.assertEqual(req.environ['wsgi.multiprocess'], False) + self.assertEqual(req.environ['wsgi.multithread'], False) + self.assertEqual(req.environ['wsgi.run_once'], False) + self.assertEqual(req.environ['wsgi.url_scheme'], 'http') + self.assertEqual(req.environ['wsgi.version'], (1, 0)) + + # Test body + self.assert_(hasattr(req.body_file, 'read')) + self.assertEqual(req.body, '') + req.body = 'test' + self.assert_(hasattr(req.body_file, 'read')) + self.assertEqual(req.body, 'test') + + # Test method & URL + self.assertEqual(req.method, 'GET') + self.assertEqual(req.scheme, 'http') + self.assertEqual(req.script_name, '') # The base of the URL + req.script_name = '/blog' # make it more interesting + self.assertEqual(req.path_info, '/article') + # Content-Type of the request body + self.assertEqual(req.content_type, '') + # The auth'ed user (there is none set) + self.assert_(req.remote_user is None) + self.assert_(req.remote_addr is None) + self.assertEqual(req.host, 'localhost:80') + self.assertEqual(req.host_url, 'http://localhost') + self.assertEqual(req.application_url, 'http://localhost/blog') + self.assertEqual(req.path_url, 'http://localhost/blog/article') + self.assertEqual(req.url, 'http://localhost/blog/article?id=1') + self.assertEqual(req.path, '/blog/article') + self.assertEqual(req.path_qs, '/blog/article?id=1') + self.assertEqual(req.query_string, 'id=1') + self.assertEqual(req.relative_url('archive'), + 'http://localhost/blog/archive') + + # Doesn't change request + self.assertEqual(req.path_info_peek(), 'article') + # Does change request! + self.assertEqual(req.path_info_pop(), 'article') + self.assertEqual(req.script_name, '/blog/article') + self.assertEqual(req.path_info, '') + + # Headers + req.headers['Content-Type'] = 'application/x-www-urlencoded' + self.assertEqual(sorted(req.headers.items()), + [('Content-Length', '4'), + ('Content-Type', 'application/x-www-urlencoded'), + ('Host', 'localhost:80')]) + self.assertEqual(req.environ['CONTENT_TYPE'], + 'application/x-www-urlencoded') + + def test_request_query_and_POST_vars(self): + # port from doctest (docs/reference.txt) + + # Query & POST variables + from webob import Request + from webob.multidict import MultiDict + from webob.multidict import NestedMultiDict + from webob.multidict import NoVars + from webob.multidict import TrackableMultiDict + req = Request.blank('/test?check=a&check=b&name=Bob') + GET = TrackableMultiDict([('check', 'a'), + ('check', 'b'), + ('name', 'Bob')]) + self.assertEqual(req.str_GET, GET) + self.assertEqual(req.str_GET['check'], 'b') + self.assertEqual(req.str_GET.getall('check'), ['a', 'b']) + self.assertEqual(req.str_GET.items(), + [('check', 'a'), ('check', 'b'), ('name', 'Bob')]) + + self.assert_(isinstance(req.str_POST, NoVars)) + # NoVars can be read like a dict, but not written + self.assertEqual(req.str_POST.items(), []) + req.method = 'POST' + req.body = 'name=Joe&email=joe@example.com' + self.assertEqual(req.str_POST, + MultiDict([('name', 'Joe'), + ('email', 'joe@example.com')])) + self.assertEqual(req.str_POST['name'], 'Joe') + + self.assert_(isinstance(req.str_params, NestedMultiDict)) + self.assertEqual(req.str_params.items(), + [('check', 'a'), + ('check', 'b'), + ('name', 'Bob'), + ('name', 'Joe'), + ('email', 'joe@example.com')]) + self.assertEqual(req.str_params['name'], 'Bob') + self.assertEqual(req.str_params.getall('name'), ['Bob', 'Joe']) + + def test_request_put(self): + from datetime import datetime + from webob import Request + from webob import Response + from webob import UTC + from webob.acceptparse import MIMEAccept + from webob.byterange import Range + from webob.etag import ETagMatcher + from webob.etag import _NoIfRange + from webob.multidict import MultiDict + from webob.multidict import TrackableMultiDict + from webob.multidict import UnicodeMultiDict + req = Request.blank('/test?check=a&check=b&name=Bob') + req.method = 'PUT' + req.body = 'var1=value1&var2=value2&rep=1&rep=2' + req.environ['CONTENT_LENGTH'] = str(len(req.body)) + req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded' + GET = TrackableMultiDict([('check', 'a'), + ('check', 'b'), + ('name', 'Bob')]) + self.assertEqual(req.str_GET, GET) + self.assertEqual(req.str_POST, MultiDict( + [('var1', 'value1'), + ('var2', 'value2'), + ('rep', '1'), + ('rep', '2')])) + + # Unicode + req.charset = 'utf8' + self.assert_(isinstance(req.GET, UnicodeMultiDict)) + self.assertEqual(req.GET.items(), + [('check', u'a'), ('check', u'b'), ('name', u'Bob')]) + + # Cookies + req.headers['Cookie'] = 'test=value' + self.assert_(isinstance(req.cookies, UnicodeMultiDict)) + self.assertEqual(req.cookies.items(), [('test', u'value')]) + req.charset = None + self.assertEqual(req.str_cookies, {'test': 'value'}) + + # Accept-* headers + self.assert_('text/html' in req.accept) + req.accept = 'text/html;q=0.5, application/xhtml+xml;q=1' + self.assert_(isinstance(req.accept, MIMEAccept)) + self.assert_('text/html' in req.accept) + + self.assertEqual(req.accept.first_match(['text/html', + 'application/xhtml+xml']), 'text/html') + self.assertEqual(req.accept.best_match(['text/html', + 'application/xhtml+xml']), + 'application/xhtml+xml') + self.assertEqual(req.accept.best_matches(), + ['application/xhtml+xml', 'text/html']) + + req.accept_language = 'es, pt-BR' + self.assertEqual(req.accept_language.best_matches('en-US'), + ['es', 'pt-BR', 'en-US']) + self.assertEqual(req.accept_language.best_matches('es'), ['es']) + + # Conditional Requests + server_token = 'opaque-token' + # shouldn't return 304 + self.assert_(not server_token in req.if_none_match) + req.if_none_match = server_token + self.assert_(isinstance(req.if_none_match, ETagMatcher)) + # You *should* return 304 + self.assert_(server_token in req.if_none_match) + + req.if_modified_since = datetime(2006, 1, 1, 12, 0, tzinfo=UTC) + self.assertEqual(req.headers['If-Modified-Since'], + 'Sun, 01 Jan 2006 12:00:00 GMT') + server_modified = datetime(2005, 1, 1, 12, 0, tzinfo=UTC) + self.assert_(req.if_modified_since) + self.assert_(req.if_modified_since >= server_modified) + + self.assert_(isinstance(req.if_range, _NoIfRange)) + self.assert_(req.if_range.match(etag='some-etag', + last_modified=datetime(2005, 1, 1, 12, 0))) + req.if_range = 'opaque-etag' + self.assert_(not req.if_range.match(etag='other-etag')) + self.assert_(req.if_range.match(etag='opaque-etag')) + + res = Response(etag='opaque-etag') + self.assert_(req.if_range.match_response(res)) + + req.range = 'bytes=0-100' + self.assert_(isinstance(req.range, Range)) + self.assertEqual(req.range.ranges, [(0, 101)]) + cr = req.range.content_range(length=1000) + self.assertEqual((cr.start, cr.stop, cr.length), (0, 101, 1000)) + + self.assert_(server_token in req.if_match) + # No If-Match means everything is ok + req.if_match = server_token + self.assert_(server_token in req.if_match) + # Still OK + req.if_match = 'other-token' + # Not OK, should return 412 Precondition Failed: + self.assert_(not server_token in req.if_match) + + def test_call_WSGI_app(self): + from webob import Request + req = Request.blank('/') + def wsgi_app(environ, start_response): + start_response('200 OK', [('Content-type', 'text/plain')]) + return ['Hi!'] + self.assertEqual(req.call_application(wsgi_app), + ('200 OK', [('Content-type', 'text/plain')], ['Hi!'])) + + res = req.get_response(wsgi_app) + from webob.response import Response + self.assert_(isinstance(res, Response)) + self.assertEqual(res.status, '200 OK') + from webob.headers import ResponseHeaders + self.assert_(isinstance(res.headers, ResponseHeaders)) + self.assertEqual(res.headers.items(), [('Content-type', 'text/plain')]) + self.assertEqual(res.body, 'Hi!') + + def equal_req(self, req): + from cStringIO import StringIO + from webob import Request + input = StringIO(str(req)) + req2 = Request.from_file(input) + self.assertEqual(req.url, req2.url) + headers1 = dict(req.headers) + headers2 = dict(req2.headers) + self.assertEqual(int(headers1.get('Content-Length', '0')), + int(headers2.get('Content-Length', '0'))) + if 'Content-Length' in headers1: + del headers1['Content-Length'] + if 'Content-Length' in headers2: + del headers2['Content-Length'] + self.assertEqual(headers1, headers2) + self.assertEqual(req.body, req2.body) + def simpleapp(environ, start_response): + from webob import Request status = '200 OK' response_headers = [('Content-type','text/plain')] start_response(status, response_headers) @@ -18,15 +2275,21 @@ def simpleapp(environ, start_response): 'Hello world!\n', 'The get is %r' % request.str_GET, ' and Val is %s\n' % request.str_GET.get('name'), - 'The languages are: %s\n' % request.accept_language.best_matches('en-US'), - 'The accepttypes is: %s\n' % request.accept.best_match(['application/xml', 'text/html']), + 'The languages are: %s\n' % + request.accept_language.best_matches('en-US'), + 'The accepttypes is: %s\n' % + request.accept.best_match(['application/xml', 'text/html']), 'post is %r\n' % request.str_POST, 'params is %r\n' % request.str_params, 'cookies is %r\n' % request.str_cookies, 'body: %r\n' % request.body, 'method: %s\n' % request.method, 'remote_user: %r\n' % request.environ['REMOTE_USER'], - 'host_url: %r; application_url: %r; path_url: %r; url: %r\n' % (request.host_url, request.application_url, request.path_url, request.url), + 'host_url: %r; application_url: %r; path_url: %r; url: %r\n' % + (request.host_url, + request.application_url, + request.path_url, + request.url), 'urlvars: %r\n' % request.urlvars, 'urlargs: %r\n' % (request.urlargs, ), 'is_xhr: %r\n' % request.is_xhr, @@ -35,537 +2298,13 @@ def simpleapp(environ, start_response): 'if_none_match: %r\n' % request.if_none_match, ] -def test_gets(): - app = TestApp(simpleapp) - res = app.get('/') - assert 'Hello' in res - assert "get is GET([])" in res - assert "post is <NoVars: Not a form request>" in res - - res = app.get('/?name=george') - res.mustcontain("get is GET([('name', 'george')])") - res.mustcontain("Val is george") - -def test_language_parsing(): - app = TestApp(simpleapp) - res = app.get('/') - assert "The languages are: ['en-US']" in res - - res = app.get('/', headers={'Accept-Language':'da, en-gb;q=0.8, en;q=0.7'}) - assert "languages are: ['da', 'en-gb', 'en-US']" in res - - res = app.get('/', headers={'Accept-Language':'en-gb;q=0.8, da, en;q=0.7'}) - assert "languages are: ['da', 'en-gb', 'en-US']" in res - -def test_mime_parsing(): - app = TestApp(simpleapp) - res = app.get('/', headers={'Accept':'text/html'}) - assert "accepttypes is: text/html" in res - - res = app.get('/', headers={'Accept':'application/xml'}) - assert "accepttypes is: application/xml" in res - - res = app.get('/', headers={'Accept':'application/xml,*/*'}) - assert "accepttypes is: application/xml" in res, res - - -def test_accept_best_match(): - assert not Request.blank('/').accept - assert not Request.blank('/', headers={'Accept': ''}).accept - req = Request.blank('/', headers={'Accept':'text/plain'}) - ok_(req.accept) - assert_raises(ValueError, req.accept.best_match, ['*/*']) - req = Request.blank('/', accept=['*/*','text/*']) - eq_(req.accept.best_match(['application/x-foo', 'text/plain']), 'text/plain') - eq_(req.accept.best_match(['text/plain', 'application/x-foo']), 'text/plain') - req = Request.blank('/', accept=['text/plain', 'message/*']) - eq_(req.accept.best_match(['message/x-foo', 'text/plain']), 'text/plain') - eq_(req.accept.best_match(['text/plain', 'message/x-foo']), 'text/plain') - -def test_from_mimeparse(): - # http://mimeparse.googlecode.com/svn/trunk/mimeparse.py - supported = ['application/xbel+xml', 'application/xml'] - tests = [('application/xbel+xml', 'application/xbel+xml'), - ('application/xbel+xml; q=1', 'application/xbel+xml'), - ('application/xml; q=1', 'application/xml'), - ('application/*; q=1', 'application/xbel+xml'), - ('*/*', 'application/xbel+xml')] - - for accept, get in tests: - req = Request.blank('/', headers={'Accept':accept}) - assert req.accept.best_match(supported) == get, ( - '%r generated %r instead of %r for %r' % (accept, req.accept.best_match(supported), get, supported)) - - supported = ['application/xbel+xml', 'text/xml'] - tests = [('text/*;q=0.5,*/*; q=0.1', 'text/xml'), - ('text/html,application/atom+xml; q=0.9', None)] - - for accept, get in tests: - req = Request.blank('/', headers={'Accept':accept}) - assert req.accept.best_match(supported) == get, ( - 'Got %r instead of %r for %r' % (req.accept.best_match(supported), get, supported)) - - supported = ['application/json', 'text/html'] - tests = [('application/json, text/javascript, */*', 'application/json'), - ('application/json, text/html;q=0.9', 'application/json')] - - for accept, get in tests: - req = Request.blank('/', headers={'Accept':accept}) - assert req.accept.best_match(supported) == get, ( - '%r generated %r instead of %r for %r' % (accept, req.accept.best_match(supported), get, supported)) - - offered = ['image/png', 'application/xml'] - tests = [ - ('image/png', 'image/png'), - ('image/*', 'image/png'), - ('image/*, application/xml', 'application/xml'), - ] - - for accept, get in tests: - req = Request.blank('/', accept=accept) - eq_(req.accept.best_match(offered), get) - -def test_headers(): - app = TestApp(simpleapp) - headers = { - 'If-Modified-Since': 'Sat, 29 Oct 1994 19:43:31 GMT', - 'Cookie': 'var1=value1', - 'User-Agent': 'Mozilla 4.0 (compatible; MSIE)', - 'If-None-Match': '"etag001", "etag002"', - 'X-Requested-With': 'XMLHttpRequest', - } - res = app.get('/?foo=bar&baz', headers=headers) - res.mustcontain( - 'if_modified_since: datetime.datetime(1994, 10, 29, 19, 43, 31, tzinfo=UTC)', - "user_agent: 'Mozilla", - 'is_xhr: True', - "cookies is {'var1': 'value1'}", - "params is NestedMultiDict([('foo', 'bar'), ('baz', '')])", - "if_none_match: <ETag etag001 or etag002>", - ) -def test_bad_cookie(): - req = Request.blank('/') - req.headers['Cookie'] = '070-it-:><?0' - assert req.cookies == {} - req.headers['Cookie'] = 'foo=bar' - assert req.cookies == {'foo': 'bar'} - req.headers['Cookie'] = '...' - assert req.cookies == {} - req.headers['Cookie'] = '=foo' - assert req.cookies == {} - req.headers['Cookie'] = 'dismiss-top=6; CP=null*; PHPSESSID=0a539d42abc001cdc762809248d4beed; a=42' - eq_(req.cookies, { - 'CP': u'null*', - 'PHPSESSID': u'0a539d42abc001cdc762809248d4beed', - 'a': u'42', - 'dismiss-top': u'6' - }) - req.headers['Cookie'] = 'fo234{=bar blub=Blah' - assert req.cookies == {'blub': 'Blah'} - -def test_cookie_quoting(): - req = Request.blank('/') - req.headers['Cookie'] = 'foo="?foo"; Path=/' - assert req.cookies == {'foo': '?foo'} - -def test_path_quoting(): - path = '/:@&+$,/bar' - req = Request.blank(path) - assert req.path == path - assert req.url.endswith(path) - -def test_params(): - req = Request.blank('/?a=1&b=2') - req.method = 'POST' - req.body = 'b=3' - assert req.params.items() == [('a', '1'), ('b', '2'), ('b', '3')] - new_params = req.params.copy() - assert new_params.items() == [('a', '1'), ('b', '2'), ('b', '3')] - new_params['b'] = '4' - assert new_params.items() == [('a', '1'), ('b', '4')] - # The key name is \u1000: - req = Request.blank('/?%E1%80%80=x', decode_param_names=True, charset='UTF-8') - assert req.decode_param_names - assert u'\u1000' in req.GET.keys() - assert req.GET[u'\u1000'] == 'x' - -class UnseekableInput(object): - def __init__(self, data): - self.data = data - self.pos = 0 - def read(self, size=-1): - if size == -1: - t = self.data[self.pos:] - self.pos = len(self.data) - return t - else: - assert self.pos + size <= len(self.data), ( - "Attempt to read past end (length=%s, position=%s, reading %s bytes)" - % (len(self.data), self.pos, size)) - t = self.data[self.pos:self.pos+size] - self.pos += size - return t -def test_copy_body(): - req = Request.blank('/', method='POST', body='some text', request_body_tempfile_limit=1) - old_body_file = req.body_file_raw - req.copy_body() - assert req.body_file_raw is not old_body_file - req = Request.blank('/', method='POST', body_file=UnseekableInput('0123456789'), content_length=10) - assert not hasattr(req.body_file_raw, 'seek') - old_body_file = req.body_file_raw - req.make_body_seekable() - assert req.body_file_raw is not old_body_file - assert req.body == '0123456789' - old_body_file = req.body_file - req.make_body_seekable() - assert req.body_file_raw is old_body_file - assert req.body_file is old_body_file +_cgi_escaping_body = '''--boundary +Content-Disposition: form-data; name="%20%22"" -class UnseekableInputWithSeek(UnseekableInput): - def seek(self, pos, rel=0): - raise IOError("Invalid seek!") -def test_broken_seek(): - # copy() should work even when the input has a broken seek method - req = Request.blank('/', method='POST', body_file=UnseekableInputWithSeek('0123456789'), content_length=10) - assert hasattr(req.body_file_raw, 'seek') - assert_raises(IOError, req.body_file_raw.seek, 0) - old_body_file = req.body_file - req2 = req.copy() - assert req2.body_file_raw is req2.body_file is not old_body_file - assert req2.body == '0123456789' - -def test_set_body(): - req = BaseRequest.blank('/', body='foo') - assert req.is_body_seekable - eq_(req.body, 'foo') - eq_(req.content_length, 3) - del req.body - eq_(req.body, '') - eq_(req.content_length, 0) - - - -def test_broken_clen_header(): - # if the UA sends "content_length: ..' header (the name is wrong) - # it should not break the req.headers.items() - req = Request.blank('/') - req.environ['HTTP_CONTENT_LENGTH'] = '0' - req.headers.items() - - -def test_nonstr_keys(): - # non-string env keys shouldn't break req.headers - req = Request.blank('/') - req.environ[1] = 1 - req.headers.items() - - -def test_authorization(): - req = Request.blank('/') - req.authorization = 'Digest uri="/?a=b"' - assert req.authorization == ('Digest', {'uri': '/?a=b'}) - -def test_authorization2(): - from webob.descriptors import parse_auth_params - for s, d in [ - ('x=y', {'x': 'y'}), - ('x="y"', {'x': 'y'}), - ('x=y,z=z', {'x': 'y', 'z': 'z'}), - ('x=y, z=z', {'x': 'y', 'z': 'z'}), - ('x="y",z=z', {'x': 'y', 'z': 'z'}), - ('x="y", z=z', {'x': 'y', 'z': 'z'}), - ('x="y,x", z=z', {'x': 'y,x', 'z': 'z'}), - ]: - eq_(parse_auth_params(s), d) - - -def test_from_file(): - req = Request.blank('http://example.com:8000/test.html?params') - equal_req(req) - - req = Request.blank('http://example.com/test2') - req.method = 'POST' - req.body = 'test=example' - equal_req(req) - -def equal_req(req): - input = StringIO(str(req)) - req2 = Request.from_file(input) - eq_(req.url, req2.url) - headers1 = dict(req.headers) - headers2 = dict(req2.headers) - eq_(int(headers1.get('Content-Length', '0')), - int(headers2.get('Content-Length', '0'))) - if 'Content-Length' in headers1: - del headers1['Content-Length'] - if 'Content-Length' in headers2: - del headers2['Content-Length'] - eq_(headers1, headers2) - eq_(req.body, req2.body) - -def test_req_kw_none_val(): - assert 'content-length' not in Request({}, content_length=None).headers - assert 'content-type' not in Request({}, content_type=None).headers - -def test_env_keys(): - req = Request.blank('/') - # SCRIPT_NAME can be missing - del req.environ['SCRIPT_NAME'] - eq_(req.script_name, '') - eq_(req.uscript_name, u'') - -def test_repr_nodefault(): - nd = NoDefault - eq_(repr(nd), '(No Default)') - -def test_request_noenviron_param(): - """Environ is a a mandatory not null param in Request""" - assert_raises(TypeError, Request, environ=None) - -def test_environ_getter(): - """ - Parameter environ_getter in Request is no longer valid and should raise - an error in case it's used - """ - class env(object): - def __init__(self, env): - self.env = env - def env_getter(self): - return self.env - assert_raises(ValueError, Request, environ_getter=env({'a':1}).env_getter) - -def test_unicode_errors(): - """ - Passing unicode_errors != NoDefault should assign value to - dictionary['unicode_errors'], else not - """ - r = Request({'a':1}, unicode_errors='strict') - ok_('unicode_errors' in r.__dict__) - r = Request({'a':1}, unicode_errors=NoDefault) - ok_('unicode_errors' not in r.__dict__) - -def test_charset_deprecation(): - """ - Any class that inherits from BaseRequest cannot define a default_charset - attribute. - Any class that inherits from BaseRequest cannot define a charset attr - that is instance of str - """ - class NewRequest(BaseRequest): - default_charset = 'utf-8' - def __init__(self, environ, **kw): - super(NewRequest, self).__init__(environ, **kw) - assert_raises(DeprecationWarning, NewRequest, {'a':1}) - class NewRequest(BaseRequest): - charset = 'utf-8' - def __init__(self, environ, **kw): - super(NewRequest, self).__init__(environ, **kw) - assert_raises(DeprecationWarning, NewRequest, {'a':1}) - class NewRequest(AdhocAttrMixin, BaseRequest): - default_charset = 'utf-8' - def __init__(self, environ, **kw): - super(NewRequest, self).__init__(environ, **kw) - assert_raises(DeprecationWarning, NewRequest, {'a':1}) - class NewRequest(AdhocAttrMixin, BaseRequest): - charset = 'utf-8' - def __init__(self, environ, **kw): - super(NewRequest, self).__init__(environ, **kw) - assert_raises(DeprecationWarning, NewRequest, {'a':1}) - -def test_unexpected_kw(): - """ - Passed an attr in kw that does not exist in the class, should raise an - error - Passed an attr in kw that does exist in the class, should be ok - """ - assert_raises(TypeError, Request, {'a':1}, **{'this_does_not_exist':1}) - r = Request({'a':1}, **{'charset':'utf-8', 'server_name':'127.0.0.1'}) - eq_(getattr(r, 'charset', None), 'utf-8') - eq_(getattr(r, 'server_name', None), '127.0.0.1') - -def test_body_file_setter(): - """" - If body_file is passed and it's instance of str, we define - environ['wsgi.input'] and content_length. Plus, while deleting the - attribute, we should get '' and 0 respectively - """ - r = Request({'a':1}, **{'body_file':'hello world'}) - eq_(r.environ['wsgi.input'].getvalue(), 'hello world') - eq_(int(r.environ['CONTENT_LENGTH']), len('hello world')) - del r.body_file - eq_(r.environ['wsgi.input'].getvalue(), '') - eq_(int(r.environ['CONTENT_LENGTH']), 0) - -def test_conttype_set_del(): - """ - Deleting content_type attr from a request should update the environ dict - Assigning content_type should replace first option of the environ dict - """ - r = Request({'a':1}, **{'content_type':'text/html'}) - ok_('CONTENT_TYPE' in r.environ) - ok_(hasattr(r, 'content_type')) - del r.content_type - ok_('CONTENT_TYPE' not in r.environ) - a = Request({'a':1},**{'content_type':'charset=utf-8;application/atom+xml;type=entry'}) - ok_(a.environ['CONTENT_TYPE']=='charset=utf-8;application/atom+xml;type=entry') - a.content_type = 'charset=utf-8' - ok_(a.environ['CONTENT_TYPE']=='charset=utf-8;application/atom+xml;type=entry') - -def test_headers(): - """ - Setting headers in init and later with a property, should update the info - """ - headers = {'Host': 'www.example.com', - 'Accept-Language': 'en-us,en;q=0.5', - 'Accept-Encoding': 'gzip,deflate', - 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', - 'Keep-Alive': '115', - 'Connection': 'keep-alive', - 'Cache-Control': 'max-age=0'} - r = Request({'a':1}, headers=headers) - for i in headers.keys(): - ok_(i in r.headers and - 'HTTP_'+i.upper().replace('-', '_') in r.environ) - r.headers = {'Server':'Apache'} - eq_(r.environ.keys(), ['a', 'HTTP_SERVER']) - -def test_host_url(): - """ - Request has a read only property host_url that combines several keys to - create a host_url - """ - a = Request({'wsgi.url_scheme':'http'}, **{'host':'www.example.com'}) - eq_(a.host_url, 'http://www.example.com') - a = Request({'wsgi.url_scheme':'http'}, **{'server_name':'localhost', - 'server_port':5000}) - eq_(a.host_url, 'http://localhost:5000') - a = Request({'wsgi.url_scheme':'https'}, **{'server_name':'localhost', - 'server_port':443}) - eq_(a.host_url, 'https://localhost') - -def test_path_info_p(): - """ - Peek path_info to see what's coming - Pop path_info until there's nothing remaining - """ - a = Request({'a':1}, **{'path_info':'/foo/bar','script_name':''}) - eq_(a.path_info_peek(), 'foo') - eq_(a.path_info_pop(), 'foo') - eq_(a.path_info_peek(), 'bar') - eq_(a.path_info_pop(), 'bar') - eq_(a.path_info_peek(), None) - eq_(a.path_info_pop(), None) - -def test_urlvars_property(): - """ - Testing urlvars setter/getter/deleter - """ - a = Request({'wsgiorg.routing_args':((),{'x':'y'}), - 'paste.urlvars':{'test':'value'}}) - a.urlvars = {'hello':'world'} - ok_('paste.urlvars' not in a.environ) - eq_(a.environ['wsgiorg.routing_args'], ((), {'hello':'world'})) - del a.urlvars - ok_('wsgiorg.routing_args' not in a.environ) - a = Request({'paste.urlvars':{'test':'value'}}) - eq_(a.urlvars, {'test':'value'}) - a.urlvars = {'hello':'world'} - eq_(a.environ['paste.urlvars'], {'hello':'world'}) - del a.urlvars - ok_('paste.urlvars' not in a.environ) - -def test_urlargs_property(): - """ - Testing urlargs setter/getter/deleter - """ - a = Request({'paste.urlvars':{'test':'value'}}) - eq_(a.urlargs, ()) - a.urlargs = {'hello':'world'} - eq_(a.environ['wsgiorg.routing_args'], ({'hello':'world'}, - {'test':'value'})) - a = Request({'a':1}) - a.urlargs = {'hello':'world'} - eq_(a.environ['wsgiorg.routing_args'], ({'hello':'world'}, {})) - del a.urlargs - ok_('wsgiorg.routing_args' not in a.environ) - -def test_host_property(): - """ - Testing host setter/getter/deleter - """ - a = Request({'wsgi.url_scheme':'http'}, **{'server_name':'localhost', - 'server_port':5000}) - eq_(a.host, "localhost:5000") - a.host = "localhost:5000" - ok_('HTTP_HOST' in a.environ) - del a.host - ok_('HTTP_HOST' not in a.environ) - -def test_body_property(): - """ - Testing body setter/getter/deleter plus making sure body has a seek method - """ - #a = Request({'a':1}, **{'CONTENT_LENGTH':'?'}) - # I cannot think of a case where somebody would put anything else than a - # numerical value in CONTENT_LENGTH, Google didn't help either - #eq_(a.body, '') - # I need to implement a not seekable stringio like object. - class DummyIO(object): - def __init__(self, txt): - self.txt = txt - def read(self, n=-1): - return self.txt[0:n] - len_strl = BaseRequest.request_body_tempfile_limit/len(string.letters)+1 - r = Request({'a':1}, **{'body_file':DummyIO(string.letters*len_strl)}) - eq_(len(r.body), len(string.letters*len_strl)-1) - assert_raises(TypeError, setattr, r, 'body', unicode('hello world')) - r.body = None - eq_(r.body, '') - r = Request({'a':1}, **{'body_file':DummyIO(string.letters)}) - assert not hasattr(r.body_file_raw, 'seek') - r.make_body_seekable() - assert hasattr(r.body_file_raw, 'seek') - r = Request({'a':1}, **{'body_file':StringIO(string.letters)}) - assert hasattr(r.body_file_raw, 'seek') - r.make_body_seekable() - assert hasattr(r.body_file_raw, 'seek') - -def test_repr_invalid(): - """If we have an invalid WSGI environ, the repr should tell us""" - req = BaseRequest({'CONTENT_LENGTH':'0', 'body':''}) - ok_(repr(req).endswith('(invalid WSGI environ)>')) - -def test_from_file(): - """If we pass a file with garbage to from_file method it should raise an - error plus missing bits in from_file method - """ - assert_raises(ValueError, BaseRequest.from_file, StringIO('hello world')) - val_file = StringIO( - "GET /webob/ HTTP/1.1\n" - "Host: pythonpaste.org\n" - "User-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.13)" - "Gecko/20101206 Ubuntu/10.04 (lucid) Firefox/3.6.13\n" - "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;" - "q=0.8\n" - "Accept-Language: en-us,en;q=0.5\n" - "Accept-Encoding: gzip,deflate\n" - "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" - # duplicate on porpouse - "Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" - "Keep-Alive: 115\n" - "Connection: keep-alive\n" - ) - req = BaseRequest.from_file(val_file) - assert isinstance(req, BaseRequest) - assert_false(repr(req).endswith('(invalid WSGI environ)>')) - val_file = StringIO( - "GET /webob/ HTTP/1.1\n" - "Host pythonpaste.org\n" - ) - assert_raises(ValueError, BaseRequest.from_file, val_file) +--boundary--''' def _norm_req(s): return '\r\n'.join(s.strip().replace('\r','').split('\n')) @@ -590,6 +2329,7 @@ these are the contents of the file 'bar.txt' ------------------------------deb95b63e42a-- """ + _test_req2 = """ POST / HTTP/1.0 Content-Length: 0 @@ -599,149 +2339,128 @@ Content-Length: 0 _test_req = _norm_req(_test_req) _test_req2 = _norm_req(_test_req2) + '\r\n' -def test_from_string(): - """A valid request without a Content-Length header should still read the full body. - Also test parity between as_string and from_string / from_file. - """ - req = BaseRequest.from_string(_test_req) - assert isinstance(req, BaseRequest) - assert_false(repr(req).endswith('(invalid WSGI environ)>')) - assert_false('\n' in req.http_version or '\r' in req.http_version) - assert ',' not in req.host - assert req.content_length is not None - assert req.content_length == 337 - assert 'foo' in req.body - bar_contents = "these are the contents of the file 'bar.txt'\r\n" - assert bar_contents in req.body - assert req.params['foo'] == 'foo' - bar = req.params['bar'] - assert isinstance(bar, cgi.FieldStorage) - assert bar.type == 'application/octet-stream' - bar.file.seek(0) - assert bar.file.read() == bar_contents - # out should equal contents, except for the Content-Length header, - # so insert that. - _test_req_copy = _test_req.replace('Content-Type', 'Content-Length: 337\r\nContent-Type') - assert str(req) == _test_req_copy - - req2 = BaseRequest.from_string(_test_req2) - assert 'host' not in req2.headers - eq_(str(req2), _test_req2.rstrip()) - assert_raises(ValueError, BaseRequest.from_string, _test_req2+'xx') - - -def test_blank(): - """BaseRequest.blank class method""" - assert_raises(ValueError, BaseRequest.blank, - 'www.example.com/foo?hello=world', None, - 'www.example.com/foo?hello=world') - assert_raises(ValueError, BaseRequest.blank, - 'gopher.example.com/foo?hello=world', None, - 'gopher://gopher.example.com') - req = BaseRequest.blank('www.example.com/foo?hello=world', None, - 'http://www.example.com') - ok_(req.environ.get('HTTP_HOST', None)== 'www.example.com:80' and - req.environ.get('PATH_INFO', None)== 'www.example.com/foo' and - req.environ.get('QUERY_STRING', None)== 'hello=world' and - req.environ.get('REQUEST_METHOD', None)== 'GET') - req = BaseRequest.blank('www.example.com/secure?hello=world', None, - 'https://www.example.com/secure') - ok_(req.environ.get('HTTP_HOST', None)== 'www.example.com:443' and - req.environ.get('PATH_INFO', None)== 'www.example.com/secure' and - req.environ.get('QUERY_STRING', None)== 'hello=world' and - req.environ.get('REQUEST_METHOD', None)== 'GET' and - req.environ.get('SCRIPT_NAME', None)== '/secure' and - req.environ.get('SERVER_NAME', None)== 'www.example.com' and - req.environ.get('SERVER_PORT', None)== '443') - -def test_environ_from_url(): - """Generating an environ just from an url plus testing environ_add_POST""" - assert_raises(TypeError, environ_from_url, - 'http://www.example.com/foo?bar=baz#qux') - assert_raises(TypeError, environ_from_url, - 'gopher://gopher.example.com') - req = environ_from_url('http://www.example.com/foo?bar=baz') - ok_(req.get('HTTP_HOST', None)== 'www.example.com:80' and - req.get('PATH_INFO', None)== '/foo' and - req.get('QUERY_STRING', None)== 'bar=baz' and - req.get('REQUEST_METHOD', None)== 'GET' and - req.get('SCRIPT_NAME', None)== '' and - req.get('SERVER_NAME', None)== 'www.example.com' and - req.get('SERVER_PORT', None)== '80') - req = environ_from_url('https://www.example.com/foo?bar=baz') - ok_(req.get('HTTP_HOST', None)== 'www.example.com:443' and - req.get('PATH_INFO', None)== '/foo' and - req.get('QUERY_STRING', None)== 'bar=baz' and - req.get('REQUEST_METHOD', None)== 'GET' and - req.get('SCRIPT_NAME', None)== '' and - req.get('SERVER_NAME', None)== 'www.example.com' and - req.get('SERVER_PORT', None)== '443') - environ_add_POST(req, None) - assert_false('CONTENT_TYPE' in req and 'CONTENT_LENGTH' in req) - environ_add_POST(req, {'hello':'world'}) - ok_(req.get('HTTP_HOST', None)== 'www.example.com:443' and - req.get('PATH_INFO', None)== '/foo' and - req.get('QUERY_STRING', None)== 'bar=baz' and - req.get('REQUEST_METHOD', None)== 'POST' and - req.get('SCRIPT_NAME', None)== '' and - req.get('SERVER_NAME', None)== 'www.example.com' and - req.get('SERVER_PORT', None)== '443' and - req.get('CONTENT_LENGTH', None)=='11' and - req.get('CONTENT_TYPE', None)=='application/x-www-form-urlencoded') - eq_(req['wsgi.input'].read(), 'hello=world') - - -def test_post_does_not_reparse(): - # test that there's no repetitive parsing is happening on every req.POST access - req = Request.blank('/', - content_type='multipart/form-data; boundary=boundary', - POST=_cgi_escaping_body - ) - f0 = req.body_file_raw - post1 = req.str_POST - f1 = req.body_file_raw - assert f1 is not f0 - post2 = req.str_POST - f2 = req.body_file_raw - assert post1 is post2 - assert f1 is f2 - - -def test_middleware_body(): - def app(env, sr): - sr('200 OK', []) - return [env['wsgi.input'].read()] - - def mw(env, sr): - req = Request(env) - data = req.body_file.read() - resp = req.get_response(app) - resp.headers['x-data'] = data - return resp(env, sr) - - req = Request.blank('/', method='PUT', body='abc') - resp = req.get_response(mw) - eq_(resp.body, 'abc') - eq_(resp.headers['x-data'], 'abc') - -def test_body_file_noseek(): - req = Request.blank('/', method='PUT', body='abc') - lst = [req.body_file.read(1) for i in range(3)] - eq_(lst, ['a','b','c']) - -def test_cgi_escaping_fix(): - req = Request.blank('/', - content_type='multipart/form-data; boundary=boundary', - POST=_cgi_escaping_body - ) - eq_(req.POST.keys(), ['%20%22"']) - req.body_file.read() - eq_(req.POST.keys(), ['%20%22"']) - -_cgi_escaping_body = '''--boundary -Content-Disposition: form-data; name="%20%22"" +class UnseekableInput(object): + def __init__(self, data): + self.data = data + self.pos = 0 + def read(self, size=-1): + if size == -1: + t = self.data[self.pos:] + self.pos = len(self.data) + return t + else: + assert(self.pos + size <= len(self.data)) + t = self.data[self.pos:self.pos+size] + self.pos += size + return t +class UnseekableInputWithSeek(UnseekableInput): + def seek(self, pos, rel=0): + raise IOError("Invalid seek!") ---boundary--''' +class FakeCGIBodyTests(unittest.TestCase): + + def test_encode_multipart_value_type_options(self): + from StringIO import StringIO + from cgi import FieldStorage + from webob.request import BaseRequest, FakeCGIBody + from webob.multidict import MultiDict + multipart_type = 'multipart/form-data; boundary=foobar' + multipart_body = StringIO( + '--foobar\r\n' + 'Content-Disposition: form-data; name="bananas"; filename="bananas.txt"\r\n' + 'Content-type: text/plain; charset="utf-9"\r\n' + '\r\n' + "these are the contents of the file 'bananas.txt'\r\n" + '\r\n' + '--foobar--' + ) + environ = BaseRequest.blank('/').environ + environ.update(CONTENT_TYPE=multipart_type) + environ.update(REQUEST_METHOD='POST') + fs = FieldStorage(multipart_body, environ=environ) + vars = MultiDict.from_fieldstorage(fs) + self.assertEqual(vars['bananas'].__class__, FieldStorage) + body = FakeCGIBody(vars, multipart_type) + self.assertEqual(body.read(), multipart_body.getvalue()) + + def test_encode_multipart_no_boundary(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({}, 'multipart/form-data') + self.assertRaises(ValueError, body.read) + + def test_repr(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'multipart/form-data; boundary=foobar') + body.read(1) + import re + self.assertEqual( + re.sub(r'\b0x[0-9a-f]+\b', '<whereitsat>', repr(body)), + "<FakeCGIBody at <whereitsat> viewing {'bananas': 'ba...nas'} at position 1>", + ) + def test_seek_tell(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'multipart/form-data; boundary=foobar') + self.assertEqual(body.tell(), 0) + body.seek(1) + self.assertEqual(body.tell(), 1) + self.assertRaises(IOError, body.seek, 0, 2) + + def test_iter(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'multipart/form-data; boundary=foobar') + self.assertEqual(list(body), [ + '--foobar\r\n', + 'Content-Disposition: form-data; name="bananas"\r\n', + '\r\n', + 'bananas\r\n', + '--foobar--', + ]) + + def test_readline(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'multipart/form-data; boundary=foobar') + self.assertEqual(body.readline(), '--foobar\r\n') + self.assertEqual(body.readline(), 'Content-Disposition: form-data; name="bananas"\r\n') + self.assertEqual(body.readline(), '\r\n') + self.assertEqual(body.readline(), 'bananas\r\n') + self.assertEqual(body.readline(), '--foobar--') + # subsequent calls to readline will return '' + + def test_read_bad_content_type(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'application/jibberjabber') + self.assertRaises(AssertionError, body.read) + + def test_read_urlencoded(self): + from webob.request import FakeCGIBody + body = FakeCGIBody({'bananas': 'bananas'}, 'application/x-www-form-urlencoded') + self.assertEqual(body.read(), 'bananas=bananas') + + +class Test_cgi_FieldStorage__repr__patch(unittest.TestCase): + def _callFUT(self, fake): + from webob.request import _cgi_FieldStorage__repr__patch + return _cgi_FieldStorage__repr__patch(fake) + + def test_with_file(self): + class Fake(object): + name = 'name' + file = 'file' + filename = 'filename' + value = 'value' + fake = Fake() + result = self._callFUT(fake) + self.assertEqual(result, "FieldStorage('name', 'filename')") + + def test_without_file(self): + class Fake(object): + name = 'name' + file = None + filename = 'filename' + value = 'value' + fake = Fake() + result = self._callFUT(fake) + self.assertEqual(result, "FieldStorage('name', 'filename', 'value')") diff --git a/tests/test_request.txt b/tests/test_request.txt deleted file mode 100644 index 026b85c..0000000 --- a/tests/test_request.txt +++ /dev/null @@ -1,553 +0,0 @@ -This demonstrates how the Request object works, and tests it. - -To test for absense of PendingDeprecationWarning's we should reset -default warning filters - - >>> import warnings - >>> warnings.resetwarnings() - - -You can instantiate a request using ``Request.blank()``, to create a -fresh environment dictionary with all the basic keys such a dictionary -should have. - - >>> import sys - >>> if sys.version >= '2.7': - ... from io import BytesIO as InputType - ... else: - ... from cStringIO import InputType - >>> from dtopt import ELLIPSIS, NORMALIZE_WHITESPACE - >>> from webob import Request, UTC - >>> req = Request.blank('/') - >>> req # doctest: +ELLIPSIS - <Request at ... GET http://localhost/> - >>> print req - GET / HTTP/1.0 - Host: localhost:80 - >>> req.environ # doctest: +ELLIPSIS - {...} - >>> isinstance(req.body_file, InputType) - True - >>> req.scheme - 'http' - >>> req.method - 'GET' - >>> req.script_name - '' - >>> req.path_info - '/' - >>> req.upath_info - u'/' - >>> req.content_type - '' - >>> print req.remote_user - None - >>> req.host_url - 'http://localhost' - >>> req.script_name = '/foo' - >>> req.path_info = '/bar/' - >>> req.environ['QUERY_STRING'] = 'a=b' - >>> req.application_url - 'http://localhost/foo' - >>> req.path_url - 'http://localhost/foo/bar/' - >>> req.url - 'http://localhost/foo/bar/?a=b' - >>> req.relative_url('baz') - 'http://localhost/foo/bar/baz' - >>> req.relative_url('baz', to_application=True) - 'http://localhost/foo/baz' - >>> req.relative_url('http://example.org') - 'http://example.org' - >>> req.path_info_peek() - 'bar' - >>> req.path_info_pop() - 'bar' - >>> req.script_name, req.path_info - ('/foo/bar', '/') - >>> print req.environ.get('wsgiorg.routing_args') - None - >>> req.urlvars - {} - >>> req.environ['wsgiorg.routing_args'] - ((), {}) - >>> req.urlvars = dict(x='y') - >>> req.environ['wsgiorg.routing_args'] - ((), {'x': 'y'}) - >>> req.urlargs - () - >>> req.urlargs = (1, 2, 3) - >>> req.environ['wsgiorg.routing_args'] - ((1, 2, 3), {'x': 'y'}) - >>> del req.urlvars - >>> req.environ['wsgiorg.routing_args'] - ((1, 2, 3), {}) - >>> req.urlvars = {'test': 'value'} - >>> del req.urlargs - >>> req.environ['wsgiorg.routing_args'] - ((), {'test': 'value'}) - >>> req.is_xhr - False - >>> req.environ['HTTP_X_REQUESTED_WITH'] = 'XMLHttpRequest' - >>> req.is_xhr - True - >>> req.host - 'localhost:80' - -There are also variables to access the variables and body: - - >>> from cStringIO import StringIO - >>> body = 'var1=value1&var2=value2&rep=1&rep=2' - >>> req = Request.blank('/') - >>> req.method = 'POST' - >>> req.body_file = StringIO(body) - >>> req.environ['CONTENT_LENGTH'] = str(len(body)) - >>> vars = req.str_POST - >>> vars - MultiDict([('var1', 'value1'), ('var2', 'value2'), ('rep', '1'), ('rep', '2')]) - >>> vars is req.POST - False - >>> req.POST - UnicodeMultiDict([('var1', u'value1'), ('var2', u'value2'), ('rep', u'1'), ('rep', u'2')]) - >>> req.decode_param_names - False - -Note that the variables are there for GET requests and non-form requests, -but they are empty and read-only: - - >>> req = Request.blank('/') - >>> req.str_POST - <NoVars: Not a form request> - >>> req.str_POST.items() - [] - >>> req.str_POST['x'] = 'y' - Traceback (most recent call last): - ... - KeyError: 'Cannot add variables: Not a form request' - >>> req.method = 'POST' - >>> req.str_POST - MultiDict([]) - >>> req.content_type = 'text/xml' - >>> req.body_file = StringIO('<xml></xml>') - >>> req.str_POST - <NoVars: Not an HTML form submission (Content-Type: text/xml)> - >>> req.body - '<xml></xml>' - -You can also get access to the query string variables, of course: - - >>> req = Request.blank('/?a=b&d=e&d=f') - >>> req.str_GET - GET([('a', 'b'), ('d', 'e'), ('d', 'f')]) - >>> req.GET['d'] - u'f' - >>> req.GET.getall('d') - [u'e', u'f'] - >>> req.method = 'POST' - >>> req.body = 'x=y&d=g' - >>> req.environ['CONTENT_LENGTH'] - '7' - >>> req.params - UnicodeMultiDict([('a', u'b'), ('d', u'e'), ('d', u'f'), ('x', u'y'), ('d', u'g')]) - >>> req.params['d'] - u'f' - >>> req.params.getall('d') - [u'e', u'f', u'g'] - -Cookies are viewed as a dictionary (*view only*): - - >>> req = Request.blank('/') - >>> req.environ['HTTP_COOKIE'] = 'var1=value1; var2=value2' - >>> sorted(req.str_cookies.items()) - [('var1', 'value1'), ('var2', 'value2')] - >>> sorted(req.cookies.items()) - [('var1', u'value1'), ('var2', u'value2')] - >>> req.charset = 'utf8' - >>> sorted(req.cookies.items()) - [('var1', u'value1'), ('var2', u'value2')] - -Sometimes conditional headers are problematic. You can remove them: - - >>> from datetime import datetime - >>> req = Request.blank('/') - >>> req.if_none_match = 'some-etag' - >>> req.if_modified_since = datetime(2005, 1, 1, 12, 0) - >>> req.environ['HTTP_ACCEPT_ENCODING'] = 'gzip' - >>> print sorted(req.headers.items()) - [('Accept-Encoding', 'gzip'), ('Host', 'localhost:80'), ('If-Modified-Since', 'Sat, 01 Jan 2005 12:00:00 GMT'), ('If-None-Match', 'some-etag')] - >>> req.remove_conditional_headers() - >>> print req.headers - {'Host': 'localhost:80'} - -Some headers are handled specifically (more should be added): - - >>> req = Request.blank('/') - >>> req.if_none_match = 'xxx' - >>> 'xxx' in req.if_none_match - True - >>> 'yyy' in req.if_none_match - False - >>> req.if_modified_since = datetime(2005, 1, 1, 12, 0) - >>> req.if_modified_since < datetime(2006, 1, 1, 12, 0, tzinfo=UTC) - True - >>> req.user_agent is None - True - >>> req.user_agent = 'MSIE-Win' - >>> req.user_agent - 'MSIE-Win' - - >>> req.cache_control - <CacheControl ''> - >>> req.cache_control.no_cache = True - >>> req.cache_control.max_age = 0 - >>> req.cache_control - <CacheControl 'max-age=0, no-cache'> - -.cache_control is a view: - - >>> 'cache-control' in req.headers - True - >>> req.headers['cache-control'] - 'max-age=0, no-cache' - >>> req.cache_control = {'no-transform': None, 'max-age': 100} - >>> req.headers['cache-control'] - 'max-age=100, no-transform' - - - -Accept-* headers are parsed into read-only objects that support -containment tests, and some useful methods. Note that parameters on -mime types are not supported. - - >>> req = Request.blank('/') - >>> req.environ['HTTP_ACCEPT'] = "text/*;q=0.3, text/html;q=0.7, text/html;level=1, text/html;level=2;q=0.4, */*;q=0.5" - >>> req.accept # doctest: +ELLIPSIS - <MIMEAccept at ... Accept: text/*;q=0.3, text/html;q=0.7, text/html, text/html;q=0.4, */*;q=0.5> - >>> for item, quality in req.accept._parsed: - ... print '%s: %0.1f' % (item, quality) - text/*: 0.3 - text/html: 0.7 - text/html: 1.0 - text/html: 0.4 - */*: 0.5 - >>> '%0.1f' % req.accept.quality('text/html') - '0.3' - >>> req.accept.first_match(['text/plain', 'text/html', 'image/png']) - 'text/plain' - >>> 'image/png' in req.accept - True - >>> req.environ['HTTP_ACCEPT'] = "text/html, application/xml; q=0.7, text/*; q=0.5, */*; q=0.1" - >>> req.accept # doctest: +ELLIPSIS - <MIMEAccept at ... Accept: text/html, application/xml;q=0.7, text/*;q=0.5, */*;q=0.1> - >>> req.accept.best_match(['text/plain', 'application/xml']) - 'application/xml' - >>> req.accept.first_match(['application/xml', 'text/html']) - 'application/xml' - >>> req.accept = "text/html, application/xml, text/*; q=0.5" - >>> 'image/png' in req.accept - False - >>> 'text/plain' in req.accept - True - >>> req.accept_charset = 'utf8' - >>> 'UTF8' in req.accept_charset - True - >>> 'gzip' in req.accept_encoding - False - >>> req.accept_encoding = 'gzip' - >>> 'GZIP' in req.accept_encoding - True - >>> req.accept_language = {'en-US': 0.5, 'es': 0.7} - >>> str(req.accept_language) - 'es;q=0.7, en-US;q=0.5' - >>> req.headers['Accept-Language'] - 'es;q=0.7, en-US;q=0.5' - >>> req.accept_language.best_matches('en-GB') - ['es', 'en-US', 'en-GB'] - >>> req.accept_language.best_matches('es') - ['es'] - >>> req.accept_language.best_matches('ES') - ['ES'] - - >>> req = Request.blank('/', accept_language='en;q=0.5') - >>> req.accept_language.best_match(['en-gb']) - 'en-gb' - - >>> req = Request.blank('/', accept_charset='utf-8;q=0.5') - >>> req.accept_charset.best_match(['iso-8859-1', 'utf-8']) - 'iso-8859-1' - -The If-Range header is a combination of a possible conditional date or -etag match:: - - >>> req = Request.blank('/') - >>> req.if_range = 'asdf' - >>> req.if_range - <IfRange etag=asdf, date=*> - >>> from webob import Response - >>> res = Response() - >>> res.etag = 'asdf' - >>> req.if_range.match_response(res) - True - >>> res.etag = None - >>> req.if_range.match_response(res) - False - >>> res.last_modified = datetime(2005, 1, 1, 12, 0, tzinfo=UTC) - >>> req.if_range = datetime(2006, 1, 1, 12, 0, tzinfo=UTC) - >>> req.if_range - <IfRange etag=*, date=Sun, 01 Jan 2006 12:00:00 GMT> - >>> req.if_range.match_response(res) - True - >>> res.last_modified = datetime(2007, 1, 1, 12, 0, tzinfo=UTC) - >>> req.if_range.match_response(res) - False - >>> req = Request.blank('/') - >>> req.if_range - <Empty If-Range> - >>> req.if_range.match_response(res) - True - -Ranges work like so:: - - >>> req = Request.blank('/') - >>> req.range = (0, 100) - >>> req.range - <Range ranges=(0, 100)> - >>> str(req.range) - 'bytes=0-99' - -You can use them with responses:: - - >>> res = Response() - >>> res.content_range = req.range.content_range(1000) - >>> res.content_range - <ContentRange bytes 0-99/1000> - >>> str(res.content_range) - 'bytes 0-99/1000' - >>> start, end, length = res.content_range - >>> start, end, length - (0, 100, 1000) - -A quick test of caching the request body: - - >>> from cStringIO import StringIO - >>> length = Request.request_body_tempfile_limit+10 - >>> data = StringIO('x'*length) - >>> req = Request.blank('/') - >>> req.content_length = length - >>> req.body_file = data - >>> req.body_file_raw - <...IO... object at ...> - >>> len(req.body) - 10250 - >>> req.body_file - <open file ..., mode 'w+b' at ...> - >>> int(req.body_file.tell()) - 0 - >>> req.POST - UnicodeMultiDict([]) - >>> int(req.body_file.tell()) - 0 - -Some query tests: - - >>> req = Request.blank('/') - >>> req.GET.get('unknown') - >>> req.GET.get('unknown', '?') - '?' - >>> req.POST.get('unknown') - >>> req.POST.get('unknown', '?') - '?' - >>> req.params.get('unknown') - >>> req.params.get('unknown', '?') - '?' - -Some updating of the query string: - - >>> req = Request.blank('http://localhost/foo?a=b') - >>> req.str_GET - GET([('a', 'b')]) - >>> req.str_GET['c'] = 'd' - >>> req.query_string - 'a=b&c=d' - -And for dealing with file uploads: - - >>> req = Request.blank('/posty') - >>> req.method = 'POST' - >>> req.content_type = 'multipart/form-data; boundary="foobar"' - >>> req.body = '''\ - ... --foobar - ... Content-Disposition: form-data; name="a" - ... - ... b - ... --foobar - ... Content-Disposition: form-data; name="upload"; filename="test.html" - ... Content-Type: text/html - ... - ... <html>Some text...</html> - ... --foobar-- - ... ''' - >>> req.str_POST - MultiDict([('a', 'b'), ('upload', FieldStorage('upload', 'test.html'))]) - >>> print req.body.replace('\r', '') # doctest: +REPORT_UDIFF - --foobar - Content-Disposition: form-data; name="a" - <BLANKLINE> - b - --foobar - Content-Disposition: form-data; name="upload"; filename="test.html" - Content-type: text/html - <BLANKLINE> - <html>Some text...</html> - --foobar-- - >>> req.POST['c'] = 'd' - >>> req.str_POST - MultiDict([('a', 'b'), ('upload', FieldStorage('upload', 'test.html')), ('c', 'd')]) - >>> req.body_file_raw - <FakeCGIBody at ... viewing MultiDict([('a'...d')])> - >>> sorted(req.str_POST.keys()) - ['a', 'c', 'upload'] - >>> print req.body.replace('\r', '') # doctestx: +REPORT_UDIFF - --foobar - Content-Disposition: form-data; name="a" - <BLANKLINE> - b - --foobar - Content-Disposition: form-data; name="upload"; filename="test.html" - Content-type: text/html - <BLANKLINE> - <html>Some text...</html> - --foobar - Content-Disposition: form-data; name="c" - <BLANKLINE> - d - --foobar-- - -FakeCGIBody have both readline and readlines methods: - - >>> req_ = Request.blank('/posty') - >>> req_.method = 'POST' - >>> req_.content_type = 'multipart/form-data; boundary="foobar"' - >>> req_.body = '''\ - ... --foobar - ... Content-Disposition: form-data; name="a" - ... - ... b - ... --foobar - ... Content-Disposition: form-data; name="upload"; filename="test.html" - ... Content-Type: text/html - ... - ... <html>Some text...</html> - ... --foobar-- - ... ''' - >>> req_.str_POST - MultiDict([('a', 'b'), ('upload', FieldStorage('upload', 'test.html'))]) - >>> print req_.body.replace('\r', '') # doctest: +REPORT_UDIFF - --foobar - Content-Disposition: form-data; name="a" - <BLANKLINE> - b - --foobar - Content-Disposition: form-data; name="upload"; filename="test.html" - Content-type: text/html - <BLANKLINE> - <html>Some text...</html> - --foobar-- - >>> req_.POST['c'] = 'd' - >>> req_.str_POST - MultiDict([('a', 'b'), ('upload', FieldStorage('upload', 'test.html')), ('c', 'd')]) - >>> req_.body_file_raw.readline() - '--foobar\r\n' - >>> [n.replace('\r', '') for n in req_.body_file.readlines()] - ['Content-Disposition: form-data; name="a"\n', '\n', 'b\n', '--foobar\n', 'Content-Disposition: form-data; name="upload"; filename="test.html"\n', 'Content-type: text/html\n', '\n', '<html>Some text...</html>\n', '--foobar\n', 'Content-Disposition: form-data; name="c"\n', '\n', 'd\n', '--foobar--'] - -Also reparsing works through the fake body: - - >>> del req.environ['webob._parsed_post_vars'] - >>> req.str_POST - MultiDict([('a', 'b'), ('upload', FieldStorage('upload', 'test.html')), ('c', 'd')]) - -A ``BaseRequest`` class exists for the purpose of usage by web -frameworks that want a less featureful ``Request``. - -For example, the ``Request`` class mutates the -``environ['webob.adhoc_attrs']`` attribute when its ``__getattr__``, -``__setattr__``, and ``__delattr__`` are invoked. - -The ``BaseRequest`` class omits the mutation annotation behavior -provided by the default ``Request`` implementation. Instead, the of -the ``BaseRequest`` class actually mutates the ``__dict__`` of the -request instance itself. - - >>> from webob import BaseRequest - >>> req = BaseRequest.blank('/') - >>> req.foo = 1 - >>> req.environ['webob.adhoc_attrs'] - Traceback (most recent call last): - ... - KeyError: 'webob.adhoc_attrs' - >>> req.foo - 1 - >>> del req.foo - >>> req.foo - Traceback (most recent call last): - ... - AttributeError: 'BaseRequest' object has no attribute 'foo' - - - - >>> req = BaseRequest.blank('//foo') - >>> print req.path_info_pop('x') - None - >>> req.script_name - '' - >>> print BaseRequest.blank('/foo').path_info_pop('/') - None - >>> BaseRequest.blank('/foo').path_info_pop('foo') - 'foo' - >>> BaseRequest.blank('/foo').path_info_pop('fo+') - 'foo' - >>> BaseRequest.blank('//1000').path_info_pop('\d+') - '1000' - >>> BaseRequest.blank('/1000/x').path_info_pop('\d+') - '1000' - - - >>> req = Request.blank('/', method='PUT', body='x'*10) - -str(req) returns the request as HTTP request string - - >>> print req - PUT / HTTP/1.0 - Content-Length: 10 - Host: localhost:80 - <BLANKLINE> - xxxxxxxxxx - -req.as_string() does the same but also can take additional argument `skip_body` -skip_body=True excludes the body from the result - - >>> print req.as_string(skip_body=True) - PUT / HTTP/1.0 - Content-Length: 10 - Host: localhost:80 - - -skip_body=<int> excludes the body from the result if it's longer than that number - - >>> print req.as_string(skip_body=5) - PUT / HTTP/1.0 - Content-Length: 10 - Host: localhost:80 - <BLANKLINE> - <body skipped (len=10)> - -but not if it's shorter - - >>> print req.as_string(skip_body=100) - PUT / HTTP/1.0 - Content-Length: 10 - Host: localhost:80 - <BLANKLINE> - xxxxxxxxxx - diff --git a/tests/test_response.py b/tests/test_response.py index 06f79b0..37d042e 100644 --- a/tests/test_response.py +++ b/tests/test_response.py @@ -1,8 +1,13 @@ import sys +import zlib if sys.version >= '2.7': from io import BytesIO as StringIO else: from cStringIO import StringIO +try: + from hashlib import md5 +except ImportError: + from md5 import md5 from nose.tools import eq_, ok_, assert_raises @@ -105,6 +110,209 @@ def test_HEAD_closes(): eq_(res.body, '') ok_(app_iter.closed) +def test_HEAD_conditional_response_returns_empty_response(): + from webob.response import EmptyResponse + req = Request.blank('/') + req.method = 'HEAD' + res = Response(request=req, conditional_response=True) + class FakeRequest: + method = 'HEAD' + if_none_match = 'none' + if_modified_since = False + range = False + def __init__(self, env): + self.env = env + def start_response(status, headerlist): + pass + res.RequestClass = FakeRequest + result = res({}, start_response) + ok_(isinstance(result, EmptyResponse)) + +def test_HEAD_conditional_response_range_empty_response(): + from webob.response import EmptyResponse + req = Request.blank('/') + req.method = 'HEAD' + res = Response(request=req, conditional_response=True) + res.status_int = 200 + res.body = 'Are we not men?' + res.content_length = len(res.body) + class FakeRequest: + method = 'HEAD' + if_none_match = 'none' + if_modified_since = False + def __init__(self, env): + self.env = env + self.range = self # simulate inner api + self.if_range = self + def content_range(self, length): + """range attr""" + class Range: + start = 4 + stop = 5 + return Range + def match_response(self, res): + """if_range_match attr""" + return True + def start_response(status, headerlist): + pass + res.RequestClass = FakeRequest + result = res({}, start_response) + ok_(isinstance(result, EmptyResponse), result) + +def test_conditional_response_if_none_match_false(): + req = Request.blank('/', if_none_match='foo') + resp = Response(app_iter=['foo\n'], + conditional_response=True, etag='foo') + resp = req.get_response(resp) + eq_(resp.status_int, 304) + +def test_conditional_response_if_none_match_true(): + req = Request.blank('/', if_none_match='foo') + resp = Response(app_iter=['foo\n'], + conditional_response=True, etag='bar') + resp = req.get_response(resp) + eq_(resp.status_int, 200) + +def test_conditional_response_if_modified_since_false(): + from datetime import datetime, timedelta + req = Request.blank('/', if_modified_since=datetime(2011, 3, 17, 13, 0, 0)) + resp = Response(app_iter=['foo\n'], conditional_response=True, + last_modified=req.if_modified_since-timedelta(seconds=1)) + resp = req.get_response(resp) + eq_(resp.status_int, 304) + +def test_conditional_response_if_modified_since_true(): + from datetime import datetime, timedelta + req = Request.blank('/', if_modified_since=datetime(2011, 3, 17, 13, 0, 0)) + resp = Response(app_iter=['foo\n'], conditional_response=True, + last_modified=req.if_modified_since+timedelta(seconds=1)) + resp = req.get_response(resp) + eq_(resp.status_int, 200) + +def test_conditional_response_range_not_satisfiable_response(): + req = Request.blank('/', range='bytes=100-200') + resp = Response(app_iter=['foo\n'], content_length=4, + conditional_response=True) + resp = req.get_response(resp) + eq_(resp.status_int, 416) + eq_(resp.content_range.start, None) + eq_(resp.content_range.stop, None) + eq_(resp.content_range.length, 4) + eq_(resp.body, 'Requested range not satisfiable: bytes=100-200') + +def test_HEAD_conditional_response_range_not_satisfiable_response(): + req = Request.blank('/', method='HEAD', range='bytes=100-200') + resp = Response(app_iter=['foo\n'], content_length=4, + conditional_response=True) + resp = req.get_response(resp) + eq_(resp.status_int, 416) + eq_(resp.content_range.start, None) + eq_(resp.content_range.stop, None) + eq_(resp.content_range.length, 4) + eq_(resp.body, '') + +def test_del_environ(): + res = Response() + res.environ = {'yo': 'mama'} + eq_(res.environ, {'yo': 'mama'}) + del res.environ + eq_(res.environ, None) + eq_(res.request, None) + +def test_set_request_environ(): + res = Response() + class FakeRequest: + environ = {'jo': 'mama'} + res.request = FakeRequest + eq_(res.environ, {'jo': 'mama'}) + eq_(res.request, FakeRequest) + res.environ = None + eq_(res.environ, None) + eq_(res.request, None) + +def test_del_request(): + res = Response() + class FakeRequest: + environ = {} + res.request = FakeRequest + del res.request + eq_(res.environ, None) + eq_(res.request, None) + +def test_set_environ_via_request_subterfuge(): + class FakeRequest: + def __init__(self, env): + self.environ = env + res = Response() + res.RequestClass = FakeRequest + res.request = {'action': 'dwim'} + eq_(res.environ, {'action': 'dwim'}) + ok_(isinstance(res.request, FakeRequest)) + eq_(res.request.environ, res.environ) + +def test_set_request(): + res = Response() + class FakeRequest: + environ = {'foo': 'bar'} + res.request = FakeRequest + eq_(res.request, FakeRequest) + eq_(res.environ, FakeRequest.environ) + res.request = None + eq_(res.environ, None) + eq_(res.request, None) + +def test_md5_etag(): + res = Response() + res.body = """\ +In A.D. 2101 +War was beginning. +Captain: What happen ? +Mechanic: Somebody set up us the bomb. +Operator: We get signal. +Captain: What ! +Operator: Main screen turn on. +Captain: It's You !! +Cats: How are you gentlemen !! +Cats: All your base are belong to us. +Cats: You are on the way to destruction. +Captain: What you say !! +Cats: You have no chance to survive make your time. +Cats: HA HA HA HA .... +Captain: Take off every 'zig' !! +Captain: You know what you doing. +Captain: Move 'zig'. +Captain: For great justice.""" + res.md5_etag() + ok_(res.etag) + ok_('\n' not in res.etag) + eq_(res.etag, + md5(res.body).digest().encode('base64').replace('\n', '').strip('=')) + eq_(res.content_md5, None) + +def test_md5_etag_set_content_md5(): + res = Response() + b = 'The quick brown fox jumps over the lazy dog' + res.md5_etag(b, set_content_md5=True) + ok_(res.content_md5, + md5(b).digest().encode('base64').replace('\n', '').strip('=')) + +def test_decode_content_defaults_to_identity(): + res = Response() + res.body = 'There be dragons' + res.decode_content() + eq_(res.body, 'There be dragons') + +def test_decode_content_with_deflate(): + res = Response() + b = 'Hey Hey Hey' + # Simulate inflate by chopping the headers off + # the gzip encoded data + res.body = zlib.compress(b)[2:-4] + res.content_encoding = 'deflate' + res.decode_content() + eq_(res.body, b) + eq_(res.content_encoding, None) + def test_content_length(): r0 = Response('x'*10, content_length=10) @@ -159,6 +367,13 @@ def test_app_iter_range(): eq_(list(res.content_range), [2,5,6]) eq_(res.body, '234', 'body=%r; app_iter=%r' % (res.body, app_iter)) +def test_app_iter_range_inner_method(): + class FakeAppIter: + def app_iter_range(self, start, stop): + return 'you win', start, stop + res = Response(app_iter=FakeAppIter()) + eq_(res.app_iter_range(30, 40), ('you win', 30, 40)) + def test_content_type_in_headerlist(): # Couldn't manage to clone Response in order to modify class # attributes safely. Shouldn't classes be fresh imported for every @@ -213,3 +428,608 @@ def test_set_headerlist(): eq_(res.headerlist, [('Content-Type', 'text/html; charset=UTF-8')]) del res.headerlist eq_(res.headerlist, []) + +def test_request_uri_no_script_name(): + from webob.response import _request_uri + environ = { + 'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'test.com', + 'SCRIPT_NAME': '/foobar', + } + eq_(_request_uri(environ), 'http://test.com/foobar') + +def test_request_uri_https(): + from webob.response import _request_uri + environ = { + 'wsgi.url_scheme': 'https', + 'SERVER_NAME': 'test.com', + 'SERVER_PORT': '443', + 'SCRIPT_NAME': '/foobar', + } + eq_(_request_uri(environ), 'https://test.com/foobar') + +def test_app_iter_range_starts_after_iter_end(): + from webob.response import AppIterRange + range = AppIterRange(iter([]), start=1, stop=1) + eq_(list(range), []) + +def test_response_file_body_flush_is_no_op(): + from webob.response import ResponseBodyFile + rbo = ResponseBodyFile(None) + rbo.flush() + +def test_response_file_body_writelines(): + from webob.response import ResponseBodyFile + class FakeResponse: + pass + res = FakeResponse() + res._app_iter = res.app_iter = ['foo'] + rbo = ResponseBodyFile(res) + rbo.writelines(['bar', 'baz']) + eq_(res.app_iter, ['foo', 'bar', 'baz']) + +def test_response_file_body_write_non_str(): + from webob.response import ResponseBodyFile + class FakeResponse: + pass + res = FakeResponse() + rbo = ResponseBodyFile(res) + assert_raises(TypeError, rbo.write, object()) + +def test_response_file_body_write_empty_app_iter(): + from webob.response import ResponseBodyFile + class FakeResponse: + pass + res = FakeResponse() + res._app_iter = res.app_iter = None + res.body = 'foo' + rbo = ResponseBodyFile(res) + rbo.write('baz') + eq_(res.app_iter, ['foo', 'baz']) + +def test_response_file_body_write_empty_body(): + from webob.response import ResponseBodyFile + class FakeResponse: + body = '' + res = FakeResponse() + res._app_iter = res.app_iter = None + rbo = ResponseBodyFile(res) + rbo.write('baz') + eq_(res.app_iter, ['baz']) + +def test_response_file_body_close_not_implemented(): + from webob.response import ResponseBodyFile + rbo = ResponseBodyFile(None) + assert_raises(NotImplementedError, rbo.close) + +def test_response_file_body_repr(): + from webob.response import ResponseBodyFile + rbo = ResponseBodyFile('yo') + eq_(repr(rbo), "<body_file for 'yo'>") + +def test_body_get_is_none(): + res = Response() + res._body = None + res._app_iter = None + assert_raises(TypeError, Response, app_iter=iter(['a']), + body="somebody") + assert_raises(AttributeError, res.__getattribute__, 'body') + +def test_body_get_is_unicode_notverylong(): + res = Response() + res._app_iter = u'foo' + res._body = None + assert_raises(ValueError, res.__getattribute__, 'body') + +def test_body_get_is_unicode_verylong(): + res = Response() + res._app_iter = u'x' * 51 + res._body = None + assert_raises(ValueError, res.__getattribute__, 'body') + +def test_body_set_not_unicode_or_str(): + res = Response() + assert_raises(TypeError, res.__setattr__, 'body', object()) + +def test_body_set_unicode(): + res = Response() + assert_raises(TypeError, res.__setattr__, 'body', u'abc') + +def test_body_set_under_body_doesnt_exist(): + res = Response() + del res._body + res.body = 'abc' + eq_(res._body, 'abc') + eq_(res.content_length, 3) + eq_(res._app_iter, None) + +def test_body_del(): + res = Response() + res._body = '123' + res.content_length = 3 + res._app_iter = () + del res.body + eq_(res._body, None) + eq_(res.content_length, None) + eq_(res._app_iter, None) + +def test_unicode_body_get_no_charset(): + res = Response() + res.charset = None + assert_raises(AttributeError, res.__getattribute__, 'unicode_body') + +def test_unicode_body_get_decode(): + res = Response() + res.charset = 'utf-8' + res.body = 'La Pe\xc3\xb1a' + eq_(res.unicode_body, unicode('La Pe\xc3\xb1a', 'utf-8')) + +def test_unicode_body_set_no_charset(): + res = Response() + res.charset = None + assert_raises(AttributeError, res.__setattr__, 'unicode_body', 'abc') + +def test_unicode_body_set_not_unicode(): + res = Response() + res.charset = 'utf-8' + assert_raises(TypeError, res.__setattr__, 'unicode_body', + 'La Pe\xc3\xb1a') + +def test_unicode_body_del(): + res = Response() + res._body = '123' + res.content_length = 3 + res._app_iter = () + del res.unicode_body + eq_(res._body, None) + eq_(res.content_length, None) + eq_(res._app_iter, None) + +def test_body_file_del(): + res = Response() + res._body = '123' + res.content_length = 3 + res._app_iter = () + del res.body_file + eq_(res._body, None) + eq_(res.content_length, None) + eq_(res._app_iter, None) + +def test_write_unicode(): + res = Response() + res.unicode_body = unicode('La Pe\xc3\xb1a', 'utf-8') + res.write(u'a') + eq_(res.unicode_body, unicode('La Pe\xc3\xb1aa', 'utf-8')) + +def test_write_text(): + res = Response() + res.body = 'abc' + res.write(u'a') + eq_(res.unicode_body, 'abca') + +def test_app_iter_get_app_iter_is_None(): + res = Response() + res._app_iter = None + res._body = None + assert_raises(AttributeError, res.__getattribute__, 'app_iter') + +def test_app_iter_del(): + res = Response() + res.content_length = 3 + res._app_iter = ['123'] + del res.app_iter + eq_(res._app_iter, None) + eq_(res._body, None) + eq_(res.content_length, None) + + +def test_charset_set_charset_is_None(): + res = Response() + res.charset = 'utf-8' + res._app_iter = ['123'] + del res.app_iter + eq_(res._app_iter, None) + eq_(res._body, None) + eq_(res.content_length, None) + +def test_charset_set_no_content_type_header(): + res = Response() + res.headers.pop('Content-Type', None) + assert_raises(AttributeError, res.__setattr__, 'charset', 'utf-8') + +def test_charset_del_no_content_type_header(): + res = Response() + res.headers.pop('Content-Type', None) + eq_(res._charset__del(), None) + +def test_content_type_params_get_no_semicolon_in_content_type_header(): + res = Response() + res.headers['Content-Type'] = 'foo' + eq_(res.content_type_params, {}) + +def test_content_type_params_get_semicolon_in_content_type_header(): + res = Response() + res.headers['Content-Type'] = 'foo;encoding=utf-8' + eq_(res.content_type_params, {'encoding':'utf-8'}) + +def test_content_type_params_set_value_dict_empty(): + res = Response() + res.headers['Content-Type'] = 'foo;bar' + res.content_type_params = None + eq_(res.headers['Content-Type'], 'foo') + +def test_content_type_params_set_ok_param_quoting(): + res = Response() + res.content_type_params = {'a':''} + eq_(res.headers['Content-Type'], 'text/html; a=""') + +def test_set_cookie_overwrite(): + res = Response() + res.set_cookie('a', '1') + res.set_cookie('a', '2', overwrite=True) + eq_(res.headerlist[-1], ('Set-Cookie', 'a=2; Path=/')) + +def test_set_cookie_value_is_None(): + res = Response() + res.set_cookie('a', None) + eq_(res.headerlist[-1][0], 'Set-Cookie') + val = [ x.strip() for x in res.headerlist[-1][1].split(';')] + assert len(val) == 4 + val.sort() + eq_(val[0], 'Max-Age=0') + eq_(val[1], 'Path=/') + eq_(val[2], 'a=') + assert val[3].startswith('expires') + +def test_set_cookie_expires_is_None_and_max_age_is_int(): + res = Response() + res.set_cookie('a', '1', max_age=100) + eq_(res.headerlist[-1][0], 'Set-Cookie') + val = [ x.strip() for x in res.headerlist[-1][1].split(';')] + assert len(val) == 4 + val.sort() + eq_(val[0], 'Max-Age=100') + eq_(val[1], 'Path=/') + eq_(val[2], 'a=1') + assert val[3].startswith('expires') + +def test_set_cookie_expires_is_None_and_max_age_is_timedelta(): + from datetime import timedelta + res = Response() + res.set_cookie('a', '1', max_age=timedelta(seconds=100)) + eq_(res.headerlist[-1][0], 'Set-Cookie') + val = [ x.strip() for x in res.headerlist[-1][1].split(';')] + assert len(val) == 4 + val.sort() + eq_(val[0], 'Max-Age=100') + eq_(val[1], 'Path=/') + eq_(val[2], 'a=1') + assert val[3].startswith('expires') + +def test_set_cookie_expires_is_not_None_and_max_age_is_None(): + import datetime + res = Response() + then = datetime.datetime.utcnow() + datetime.timedelta(days=1) + res.set_cookie('a', '1', expires=then) + eq_(res.headerlist[-1][0], 'Set-Cookie') + val = [ x.strip() for x in res.headerlist[-1][1].split(';')] + assert len(val) == 4 + val.sort() + ok_(val[0] in ('Max-Age=86399', 'Max-Age=86400')) + eq_(val[1], 'Path=/') + eq_(val[2], 'a=1') + assert val[3].startswith('expires') + +def test_set_cookie_value_is_unicode(): + res = Response() + val = unicode('La Pe\xc3\xb1a', 'utf-8') + res.set_cookie('a', val) + eq_(res.headerlist[-1], (r'Set-Cookie', 'a="La Pe\\303\\261a"; Path=/')) + +def test_delete_cookie(): + res = Response() + res.headers['Set-Cookie'] = 'a=2; Path=/' + res.delete_cookie('a') + eq_(res.headerlist[-1][0], 'Set-Cookie') + val = [ x.strip() for x in res.headerlist[-1][1].split(';')] + assert len(val) == 4 + val.sort() + eq_(val[0], 'Max-Age=0') + eq_(val[1], 'Path=/') + eq_(val[2], 'a=') + assert val[3].startswith('expires') + +def test_delete_cookie_with_path(): + res = Response() + res.headers['Set-Cookie'] = 'a=2; Path=/' + res.delete_cookie('a', path='/abc') + eq_(res.headerlist[-1][0], 'Set-Cookie') + val = [ x.strip() for x in res.headerlist[-1][1].split(';')] + assert len(val) == 4 + val.sort() + eq_(val[0], 'Max-Age=0') + eq_(val[1], 'Path=/abc') + eq_(val[2], 'a=') + assert val[3].startswith('expires') + +def test_delete_cookie_with_domain(): + res = Response() + res.headers['Set-Cookie'] = 'a=2; Path=/' + res.delete_cookie('a', path='/abc', domain='example.com') + eq_(res.headerlist[-1][0], 'Set-Cookie') + val = [ x.strip() for x in res.headerlist[-1][1].split(';')] + assert len(val) == 5 + val.sort() + eq_(val[0], 'Domain=example.com') + eq_(val[1], 'Max-Age=0') + eq_(val[2], 'Path=/abc') + eq_(val[3], 'a=') + assert val[4].startswith('expires') + +def test_unset_cookie_not_existing_and_not_strict(): + res = Response() + result = res.unset_cookie('a', strict=False) + assert result is None + +def test_unset_cookie_not_existing_and_strict(): + res = Response() + assert_raises(KeyError, res.unset_cookie, 'a') + +def test_unset_cookie_key_in_cookies(): + res = Response() + res.headers.add('Set-Cookie', 'a=2; Path=/') + res.headers.add('Set-Cookie', 'b=3; Path=/') + res.unset_cookie('a') + eq_(res.headers.getall('Set-Cookie'), ['b=3; Path=/']) + +def test_merge_cookies_no_set_cookie(): + res = Response() + result = res.merge_cookies('abc') + eq_(result, 'abc') + +def test_merge_cookies_resp_is_Response(): + inner_res = Response() + res = Response() + res.set_cookie('a', '1') + result = res.merge_cookies(inner_res) + eq_(result.headers.getall('Set-Cookie'), ['a=1; Path=/']) + +def test_merge_cookies_resp_is_wsgi_callable(): + L = [] + def dummy_wsgi_callable(environ, start_response): + L.append((environ, start_response)) + return 'abc' + res = Response() + res.set_cookie('a', '1') + wsgiapp = res.merge_cookies(dummy_wsgi_callable) + environ = {} + def dummy_start_response(status, headers, exc_info=None): + eq_(headers, [('Set-Cookie', 'a=1; Path=/')]) + result = wsgiapp(environ, dummy_start_response) + assert result == 'abc' + assert len(L) == 1 + L[0][1]('200 OK', []) # invoke dummy_start_response assertion + +def test_body_get_body_is_None_len_app_iter_is_zero(): + res = Response() + res._app_iter = StringIO() + res._body = None + result = res.body + eq_(result, '') + +def test_body_set_AttributeError_edgecase(): + res = Response() + del res._app_iter + del res._body + res.body = 'abc' + eq_(res._body, 'abc') + eq_(res.content_length, 3) + eq_(res._app_iter, None) + +def test_cache_control_get(): + res = Response() + eq_(repr(res.cache_control), "<CacheControl ''>") + eq_(res.cache_control.max_age, None) + +def test_location(): + # covers webob/response.py:934-938 + res = Response() + res.location = '/test.html' + eq_(res.location, '/test.html') + req = Request.blank('/') + eq_(req.get_response(res).location, 'http://localhost/test.html') + res.location = '/test2.html' + eq_(req.get_response(res).location, 'http://localhost/test2.html') + +def test_request_uri_http(): + # covers webob/response.py:1152 + from webob.response import _request_uri + environ = { + 'wsgi.url_scheme': 'http', + 'SERVER_NAME': 'test.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '/foobar', + } + eq_(_request_uri(environ), 'http://test.com/foobar') + +def test_request_uri_no_script_name2(): + # covers webob/response.py:1160 + # There is a test_request_uri_no_script_name in test_response.py, but it + # sets SCRIPT_NAME. + from webob.response import _request_uri + environ = { + 'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'test.com', + 'PATH_INFO': '/foobar', + } + eq_(_request_uri(environ), 'http://test.com/foobar') + +def test_cache_control_object_max_age_ten(): + res = Response() + res.cache_control.max_age = 10 + eq_(repr(res.cache_control), "<CacheControl 'max-age=10'>") + eq_(res.headers['cache-control'], 'max-age=10') + +def test_cache_control_set_object_error(): + res = Response() + assert_raises(AttributeError, setattr, res.cache_control, 'max_stale', 10) + +def test_cache_expires_set(): + res = Response() + res.cache_expires = True + eq_(repr(res.cache_control), + "<CacheControl 'max-age=0, must-revalidate, no-cache, no-store'>") + +def test_status_int_set(): + res = Response() + res.status_int = 400 + eq_(res._status, '400 Bad Request') + +def test_cache_control_set_dict(): + res = Response() + res.cache_control = {'a':'b'} + eq_(repr(res.cache_control), "<CacheControl 'a=b'>") + +def test_cache_control_set_None(): + res = Response() + res.cache_control = None + eq_(repr(res.cache_control), "<CacheControl ''>") + +def test_cache_control_set_unicode(): + res = Response() + res.cache_control = u'abc' + eq_(repr(res.cache_control), "<CacheControl 'abc'>") + +def test_cache_control_set_control_obj_is_not_None(): + class DummyCacheControl(object): + def __init__(self): + self.header_value = 1 + self.properties = {'bleh':1} + res = Response() + res._cache_control_obj = DummyCacheControl() + res.cache_control = {} + eq_(res.cache_control.properties, {}) + +def test_cache_control_del(): + res = Response() + del res.cache_control + eq_(repr(res.cache_control), "<CacheControl ''>") + +def test_body_file_get(): + res = Response() + result = res.body_file + from webob.response import ResponseBodyFile + eq_(result.__class__, ResponseBodyFile) + +def test_body_file_write_no_charset(): + from webob.response import ResponseBodyFile + class FakeReponse: + charset = None + rbo = ResponseBodyFile(FakeReponse()) + assert_raises(TypeError, rbo.write, u'foo') + +def test_body_file_write_unicode_encodes(): + from webob.response import ResponseBodyFile + class FakeReponse: + charset = 'utf-8' + _app_iter = app_iter = [] + s = unicode('La Pe\xc3\xb1a', 'utf-8') + res = FakeReponse() + rbo = ResponseBodyFile(res) + rbo.write(s) + eq_(res.app_iter, ['La Pe\xc3\xb1a']) + +def test_repr(): + res = Response() + ok_(repr(res).endswith('200 OK>')) + +def test_cache_expires_set_timedelta(): + res = Response() + from datetime import timedelta + delta = timedelta(seconds=60) + res.cache_expires(seconds=delta) + eq_(res.cache_control.max_age, 60) + +def test_cache_expires_set_int(): + res = Response() + res.cache_expires(seconds=60) + eq_(res.cache_control.max_age, 60) + +def test_cache_expires_set_None(): + res = Response() + res.cache_expires(seconds=None, a=1) + eq_(res.cache_control.a, 1) + +def test_cache_expires_set_zero(): + res = Response() + res.cache_expires(seconds=0) + eq_(res.cache_control.no_store, True) + eq_(res.cache_control.no_cache, '*') + eq_(res.cache_control.must_revalidate, True) + eq_(res.cache_control.max_age, 0) + eq_(res.cache_control.post_check, 0) + +def test_encode_content_unknown(): + res = Response() + assert_raises(AssertionError, res.encode_content, 'badencoding') + +def test_encode_content_identity(): + res = Response() + result = res.encode_content('identity') + eq_(result, None) + +def test_encode_content_gzip_already_gzipped(): + res = Response() + res.content_encoding = 'gzip' + result = res.encode_content('gzip') + eq_(result, None) + +def test_encode_content_gzip_notyet_gzipped(): + res = Response() + res.app_iter = StringIO('foo') + result = res.encode_content('gzip') + eq_(result, None) + eq_(res.content_length, 23) + eq_(res.app_iter, ['\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff', '', + 'K\xcb\xcf\x07\x00', '!es\x8c\x03\x00\x00\x00']) + +def test_encode_content_gzip_notyet_gzipped_lazy(): + res = Response() + res.app_iter = StringIO('foo') + result = res.encode_content('gzip', lazy=True) + eq_(result, None) + eq_(res.content_length, None) + eq_(list(res.app_iter), ['\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff', '', + 'K\xcb\xcf\x07\x00', '!es\x8c\x03\x00\x00\x00']) + +def test_decode_content_identity(): + res = Response() + res.content_encoding = 'identity' + result = res.decode_content() + eq_(result, None) + +def test_decode_content_weird(): + res = Response() + res.content_encoding = 'weird' + assert_raises(ValueError, res.decode_content) + +def test_decode_content_gzip(): + from gzip import GzipFile + io = StringIO() + gzip_f = GzipFile(filename='', mode='w', fileobj=io) + gzip_f.write('abc') + gzip_f.close() + body = io.getvalue() + res = Response() + res.content_encoding = 'gzip' + res.body = body + res.decode_content() + eq_(res.body, 'abc') + +def test__abs_headerlist_location_with_scheme(): + res = Response() + res.content_encoding = 'gzip' + res.headerlist = [('Location', 'http:')] + result = res._abs_headerlist({}) + eq_(result, [('Location', 'http:')]) + diff --git a/tests/test_response.txt b/tests/test_response.txt deleted file mode 100644 index 655328d..0000000 --- a/tests/test_response.txt +++ /dev/null @@ -1,446 +0,0 @@ -This demonstrates how the Response object works, and tests it at the -same time. - - >>> from dtopt import ELLIPSIS - >>> from webob import Response, UTC - >>> from datetime import datetime - >>> res = Response('Test', status='200 OK') - -This is a minimal response object. We can do things like get and set -the body: - - >>> res.body - 'Test' - >>> res.body = 'Another test' - >>> res.body - 'Another test' - >>> res.body = 'Another' - >>> res.write(' test') - >>> res.app_iter - ['Another test'] - >>> res.content_length - 12 - >>> res.headers['content-length'] - '12' - -Content-Length is only applied when setting the body to a string; you -have to set it manually otherwise. There are also getters and setters -for the various pieces: - - >>> res.app_iter = ['test'] - >>> print res.content_length - None - >>> res.content_length = 4 - >>> res.status - '200 OK' - >>> res.status_int - 200 - >>> res.headers - ResponseHeaders([('Content-Type', 'text/html; charset=UTF-8'), ('Content-Length', '4')]) - >>> res.headerlist - [('Content-Type', 'text/html; charset=UTF-8'), ('Content-Length', '4')] - -Content-type and charset are handled separately as properties, though -they are both in the ``res.headers['content-type']`` header: - - >>> res.content_type - 'text/html' - >>> res.content_type = 'text/html' - >>> res.content_type - 'text/html' - >>> res.charset - 'UTF-8' - >>> res.charset = 'iso-8859-1' - >>> res.charset - 'iso-8859-1' - >>> res.content_type - 'text/html' - >>> res.headers['content-type'] - 'text/html; charset=iso-8859-1' - -Cookie handling is done through methods: - - >>> res.set_cookie('test', 'value') - >>> res.headers['set-cookie'] - 'test=value; Path=/' - >>> res.set_cookie('test2', 'value2', max_age=10000) - >>> res.headers['set-cookie'] # We only see the last header - 'test2=value2; expires="... GMT"; Max-Age=10000; Path=/' - >>> res.headers.getall('set-cookie') - ['test=value; Path=/', 'test2=value2; expires="... GMT"; Max-Age=10000; Path=/'] - >>> res.unset_cookie('test') - >>> res.headers.getall('set-cookie') - ['test2=value2; expires="... GMT"; Max-Age=10000; Path=/'] - >>> res.set_cookie('test2', 'value2-add') - >>> res.headers.getall('set-cookie') - ['test2=value2; expires="... GMT"; Max-Age=10000; Path=/', 'test2=value2-add; Path=/'] - >>> res.set_cookie('test2', 'value2-replace', overwrite=True) - >>> res.headers.getall('set-cookie') - ['test2=value2-replace; Path=/'] - - - >>> r = Response() - >>> r.set_cookie('x', 'x') - >>> r.set_cookie('y', 'y') - >>> r.set_cookie('z', 'z') - >>> r.headers.getall('set-cookie') - ['x=x; Path=/', 'y=y; Path=/', 'z=z; Path=/'] - >>> r.unset_cookie('y') - >>> r.headers.getall('set-cookie') - ['x=x; Path=/', 'z=z; Path=/'] - - -Most headers are available in a parsed getter/setter form through -properties: - - >>> res.age = 10 - >>> res.age, res.headers['age'] - (10, '10') - >>> res.allow = ['GET', 'PUT'] - >>> res.allow, res.headers['allow'] - (('GET', 'PUT'), 'GET, PUT') - >>> res.cache_control - <CacheControl ''> - >>> print res.cache_control.max_age - None - >>> res.cache_control.properties['max-age'] = None - >>> print res.cache_control.max_age - -1 - >>> res.cache_control.max_age = 10 - >>> res.cache_control - <CacheControl 'max-age=10'> - >>> res.headers['cache-control'] - 'max-age=10' - >>> res.cache_control.max_stale = 10 - Traceback (most recent call last): - ... - AttributeError: The property max-stale only applies to request Cache-Control - >>> res.cache_control = {} - >>> res.cache_control - <CacheControl ''> - >>> res.content_disposition = 'attachment; filename=foo.xml' - >>> (res.content_disposition, res.headers['content-disposition']) - ('attachment; filename=foo.xml', 'attachment; filename=foo.xml') - >>> res.content_encoding = 'gzip' - >>> (res.content_encoding, res.headers['content-encoding']) - ('gzip', 'gzip') - >>> res.content_language = 'en' - >>> (res.content_language, res.headers['content-language']) - (('en',), 'en') - >>> res.content_location = 'http://localhost:8080' - >>> res.headers['content-location'] - 'http://localhost:8080' - >>> res.content_range = (0, 100, 1000) - >>> (res.content_range, res.headers['content-range']) - (<ContentRange bytes 0-99/1000>, 'bytes 0-99/1000') - >>> res.date = datetime(2005, 1, 1, 12, 0, tzinfo=UTC) - >>> (res.date, res.headers['date']) - (datetime.datetime(2005, 1, 1, 12, 0, tzinfo=UTC), 'Sat, 01 Jan 2005 12:00:00 GMT') - >>> print res.etag - None - >>> res.etag = 'foo' - >>> (res.etag, res.headers['etag']) - ('foo', '"foo"') - >>> res.etag = 'something-with-"quotes"' - >>> (res.etag, res.headers['etag']) - ('something-with-"quotes"', '"something-with-\\"quotes\\""') - >>> res.expires = res.date - >>> res.retry_after = 120 # two minutes - >>> res.retry_after - datetime.datetime(...) - >>> res.server = 'Python/foo' - >>> res.headers['server'] - 'Python/foo' - >>> res.vary = ['Cookie'] - >>> (res.vary, res.headers['vary']) - (('Cookie',), 'Cookie') - -The location header will absolutify itself when the response -application is actually served. We can force this with -``req.get_response``:: - - >>> res.location = '/test.html' - >>> from webob import Request - >>> req = Request.blank('/') - >>> res.location - '/test.html' - >>> req.get_response(res).location - 'http://localhost/test.html' - >>> res.location = '/test2.html' - >>> req.get_response(res).location - 'http://localhost/test2.html' - -There's some conditional response handling too (you have to turn on -conditional_response):: - - >>> res = Response('abc', conditional_response=True) # doctest: +ELLIPSIS - >>> req = Request.blank('/') - >>> res.etag = 'tag' - >>> req.if_none_match = 'tag' - >>> req.get_response(res) - <Response ... 304 Not Modified> - >>> res.etag = 'other-tag' - >>> req.get_response(res) - <Response ... 200 OK> - >>> del req.if_none_match - >>> req.if_modified_since = datetime(2005, 1, 1, 12, 1, tzinfo=UTC) - >>> res.last_modified = datetime(2005, 1, 1, 12, 1, tzinfo=UTC) - >>> print req.get_response(res) - 304 Not Modified - ETag: "other-tag" - Last-Modified: Sat, 01 Jan 2005 12:01:00 GMT - >>> res.last_modified = datetime(2006, 1, 1, 12, 1, tzinfo=UTC) - >>> req.get_response(res) - <Response ... 200 OK> - >>> res.last_modified = None - >>> req.get_response(res) - <Response ... 200 OK> - -Weak etags:: - - >>> req = Request.blank('/', if_none_match='W/"test"') - >>> res = Response(conditional_response=True, etag='test') - >>> req.get_response(res).status - '304 Not Modified' - -Also range response:: - - >>> res = Response('0123456789', conditional_response=True) - >>> req = Request.blank('/', range=(1, 5)) - >>> req.range - <Range ranges=(1, 5)> - >>> str(req.range) - 'bytes=1-4' - >>> result = req.get_response(res) - >>> result.body - '1234' - >>> result.content_range.stop - 5 - >>> result.content_range - <ContentRange bytes 1-4/10> - >>> tuple(result.content_range) - (1, 5, 10) - >>> result.content_length - 4 - - - >>> req.range = (5, 20) - >>> str(req.range) - 'bytes=5-19' - >>> result = req.get_response(res) - >>> print result - 206 Partial Content - Content-Length: 5 - Content-Range: bytes 5-9/10 - Content-Type: text/html; charset=UTF-8 - <BLANKLINE> - 56789 - >>> tuple(result.content_range) - (5, 10, 10) - - >>> req_head = req.copy() - >>> req_head.method = 'HEAD' - >>> print req_head.get_response(res) - 206 Partial Content - Content-Length: 5 - Content-Range: bytes 5-9/10 - Content-Type: text/html; charset=UTF-8 - -And an invalid requested range: - - >>> req.range = (10, 20) - >>> result = req.get_response(res) - >>> print result - 416 Requested Range Not Satisfiable - Content-Length: 44 - Content-Range: bytes */10 - Content-Type: text/plain - <BLANKLINE> - Requested range not satisfiable: bytes=10-19 - >>> str(result.content_range) - 'bytes */10' - - >>> req_head = req.copy() - >>> req_head.method = 'HEAD' - >>> print req_head.get_response(res) - 416 Requested Range Not Satisfiable - Content-Length: 44 - Content-Range: bytes */10 - Content-Type: text/plain - - >>> Request.blank('/', range=(1,2)).get_response( - ... Response('0123456789', conditional_response=True)).content_length - 1 - - -That was easier; we'll try it with a iterator for the body:: - - >>> res = Response(conditional_response=True) - >>> res.app_iter = ['01234', '567', '89'] - >>> req = Request.blank('/') - >>> req.range = (1, 5) - >>> result = req.get_response(res) - -Because we don't know the length of the app_iter, this doesn't work:: - - >>> result.body - '0123456789' - >>> print result.content_range - None - -But it will, if we set content_length:: - >>> res.content_length = 10 - >>> req.range = (5, None) - >>> result = req.get_response(res) - >>> result.body - '56789' - >>> result.content_range - <ContentRange bytes 5-9/10> - - -Ranges requesting x last bytes are supported too: - - >>> req.range = 'bytes=-1' - >>> req.range - <Range ranges=(-1, None)> - >>> result = req.get_response(res) - >>> result.body - '9' - >>> result.content_range - <ContentRange bytes 9-9/10> - >>> result.content_length - 1 - - -If those ranges are not satisfiable, a 416 error is returned: - - >>> req.range = 'bytes=-100' - >>> result = req.get_response(res) - >>> result.status - '416 Requested Range Not Satisfiable' - >>> result.content_range - <ContentRange bytes */10> - >>> result.body - 'Requested range not satisfiable: bytes=-100' - - -If we set Content-Length then we can use it with an app_iter - - >>> res.content_length = 10 - >>> req.range = (1, 5) # python-style range - >>> req.range - <Range ranges=(1, 5)> - >>> result = req.get_response(res) - >>> result.body - '1234' - >>> result.content_range - <ContentRange bytes 1-4/10> - >>> # And trying If-modified-since - >>> res.etag = 'foobar' - >>> req.if_range = 'foobar' - >>> req.if_range - <IfRange etag=foobar, date=*> - >>> result = req.get_response(res) - >>> result.content_range - <ContentRange bytes 1-4/10> - >>> req.if_range = 'blah' - >>> result = req.get_response(res) - >>> result.content_range - >>> req.if_range = datetime(2005, 1, 1, 12, 0, tzinfo=UTC) - >>> res.last_modified = datetime(2005, 1, 1, 12, 0, tzinfo=UTC) - >>> result = req.get_response(res) - >>> result.content_range - <ContentRange bytes 1-4/10> - >>> res.last_modified = datetime(2006, 1, 1, 12, 0, tzinfo=UTC) - >>> result = req.get_response(res) - >>> result.content_range - -Some tests of Content-Range parsing:: - - >>> from webob.byterange import ContentRange - >>> ContentRange.parse('bytes */*') - <ContentRange bytes */*> - >>> ContentRange.parse('bytes */10') - <ContentRange bytes */10> - >>> ContentRange.parse('bytes 5-9/10') - <ContentRange bytes 5-9/10> - >>> ContentRange.parse('bytes 5-10/*') - <ContentRange bytes 5-10/*> - >>> print ContentRange.parse('bytes 5-10/10') - None - >>> print ContentRange.parse('bytes 5-4/10') - None - >>> print ContentRange.parse('bytes 5-*/10') - None - -Some tests of exceptions:: - - >>> from webob import exc - >>> res = exc.HTTPNotFound('Not found!') - >>> res.exception.content_type = 'text/plain' - >>> res.content_type - 'text/plain' - >>> res = exc.HTTPNotModified() - >>> res.headers - ResponseHeaders([]) - -Headers can be set to unicode values:: - - >>> res = Response('test') - >>> res.etag = u'fran\xe7ais' - -But they come out as str:: - - >>> res.etag - 'fran\xe7ais' - - -Unicode can come up in unexpected places, make sure it doesn't break things -(this particular case could be caused by a `from __future__ import unicode_literals`):: - - >>> Request.blank('/', method=u'POST').get_response(exc.HTTPMethodNotAllowed()) - <Response at ... 405 Method Not Allowed> - -Copying Responses should copy their internal structures - - >>> r = Response(app_iter=[]) - >>> r2 = r.copy() - >>> r.headerlist is r2.headerlist - False - >>> r.app_iter is r2.app_iter - False - - >>> r = Response(app_iter=iter(['foo'])) - >>> r2 = r.copy() - >>> del r2.content_type - >>> r2.body_file.write(' bar') - >>> print r - 200 OK - Content-Type: text/html; charset=UTF-8 - Content-Length: 3 - <BLANKLINE> - foo - >>> print r2 - 200 OK - Content-Length: 7 - <BLANKLINE> - foo bar - - -Additional Response constructor keywords are used to set attributes - - >>> r = Response(cache_expires=True) - >>> r.headers['Cache-Control'] - 'max-age=0, must-revalidate, no-cache, no-store' - - - - >>> from webob.exc import HTTPBadRequest - >>> raise HTTPBadRequest('bad data') - Traceback (most recent call last): - ... - HTTPBadRequest: bad data - >>> raise HTTPBadRequest() - Traceback (most recent call last): - ... - HTTPBadRequest: The server could not comply with the request since it is either malformed or otherwise incorrect. |