summaryrefslogtreecommitdiff
path: root/tests/test_fileapp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_fileapp.py')
-rw-r--r--tests/test_fileapp.py242
1 files changed, 242 insertions, 0 deletions
diff --git a/tests/test_fileapp.py b/tests/test_fileapp.py
new file mode 100644
index 0000000..bdd7510
--- /dev/null
+++ b/tests/test_fileapp.py
@@ -0,0 +1,242 @@
+# (c) 2005 Ian Bicking, Clark C. Evans and contributors
+# This module is part of the Python Paste Project and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+import time
+import random
+import os
+import tempfile
+try:
+ # Python 3
+ from email.utils import parsedate_tz, mktime_tz
+except ImportError:
+ # Python 2
+ from rfc822 import parsedate_tz, mktime_tz
+import six
+
+from paste import fileapp
+from paste.fileapp import *
+from paste.fixture import *
+
+# NOTE(haypo): don't use string.letters because the order of lower and upper
+# case letters changes when locale.setlocale() is called for the first time
+LETTERS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
+
+def test_data():
+ harness = TestApp(DataApp(b'mycontent'))
+ res = harness.get("/")
+ assert 'application/octet-stream' == res.header('content-type')
+ assert '9' == res.header('content-length')
+ assert "<Response 200 OK 'mycontent'>" == repr(res)
+ harness.app.set_content(b"bingles")
+ assert "<Response 200 OK 'bingles'>" == repr(harness.get("/"))
+
+def test_cache():
+ def build(*args,**kwargs):
+ app = DataApp(b"SomeContent")
+ app.cache_control(*args,**kwargs)
+ return TestApp(app).get("/")
+ res = build()
+ assert 'public' == res.header('cache-control')
+ assert not res.header('expires',None)
+ res = build(private=True)
+ assert 'private' == res.header('cache-control')
+ assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time()
+ res = build(no_cache=True)
+ assert 'no-cache' == res.header('cache-control')
+ assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time()
+ res = build(max_age=60,s_maxage=30)
+ assert 'public, max-age=60, s-maxage=30' == res.header('cache-control')
+ expires = mktime_tz(parsedate_tz(res.header('expires')))
+ assert expires > time.time()+58 and expires < time.time()+61
+ res = build(private=True, max_age=60, no_transform=True, no_store=True)
+ assert 'private, no-store, no-transform, max-age=60' == \
+ res.header('cache-control')
+ expires = mktime_tz(parsedate_tz(res.header('expires')))
+ assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time()
+
+def test_disposition():
+ def build(*args,**kwargs):
+ app = DataApp(b"SomeContent")
+ app.content_disposition(*args,**kwargs)
+ return TestApp(app).get("/")
+ res = build()
+ assert 'attachment' == res.header('content-disposition')
+ assert 'application/octet-stream' == res.header('content-type')
+ res = build(filename="bing.txt")
+ assert 'attachment; filename="bing.txt"' == \
+ res.header('content-disposition')
+ assert 'text/plain' == res.header('content-type')
+ res = build(inline=True)
+ assert 'inline' == res.header('content-disposition')
+ assert 'application/octet-stream' == res.header('content-type')
+ res = build(inline=True, filename="/some/path/bing.txt")
+ assert 'inline; filename="bing.txt"' == \
+ res.header('content-disposition')
+ assert 'text/plain' == res.header('content-type')
+ try:
+ res = build(inline=True,attachment=True)
+ except AssertionError:
+ pass
+ else:
+ assert False, "should be an exception"
+
+def test_modified():
+ harness = TestApp(DataApp(b'mycontent'))
+ res = harness.get("/")
+ assert "<Response 200 OK 'mycontent'>" == repr(res)
+ last_modified = res.header('last-modified')
+ res = harness.get("/",headers={'if-modified-since': last_modified})
+ assert "<Response 304 Not Modified ''>" == repr(res)
+ res = harness.get("/",headers={'if-modified-since': last_modified + \
+ '; length=1506'})
+ assert "<Response 304 Not Modified ''>" == repr(res)
+ res = harness.get("/",status=400,
+ headers={'if-modified-since': 'garbage'})
+ assert 400 == res.status and b"ill-formed timestamp" in res.body
+ res = harness.get("/",status=400,
+ headers={'if-modified-since':
+ 'Thu, 22 Dec 2030 01:01:01 GMT'})
+ assert 400 == res.status and b"check your system clock" in res.body
+
+def test_file():
+ tempfile = "test_fileapp.%s.txt" % (random.random())
+ content = LETTERS * 20
+ if six.PY3:
+ content = content.encode('utf8')
+ with open(tempfile, "wb") as fp:
+ fp.write(content)
+ try:
+ app = fileapp.FileApp(tempfile)
+ res = TestApp(app).get("/")
+ assert len(content) == int(res.header('content-length'))
+ assert 'text/plain' == res.header('content-type')
+ assert content == res.body
+ assert content == app.content # this is cashed
+ lastmod = res.header('last-modified')
+ print("updating", tempfile)
+ file = open(tempfile,"a+")
+ file.write("0123456789")
+ file.close()
+ res = TestApp(app).get("/",headers={'Cache-Control': 'max-age=0'})
+ assert len(content)+10 == int(res.header('content-length'))
+ assert 'text/plain' == res.header('content-type')
+ assert content + b"0123456789" == res.body
+ assert app.content # we are still cached
+ file = open(tempfile,"a+")
+ file.write("X" * fileapp.CACHE_SIZE) # exceed the cashe size
+ file.write("YZ")
+ file.close()
+ res = TestApp(app).get("/",headers={'Cache-Control': 'max-age=0'})
+ newsize = fileapp.CACHE_SIZE + len(content)+12
+ assert newsize == int(res.header('content-length'))
+ assert newsize == len(res.body)
+ assert res.body.startswith(content) and res.body.endswith(b'XYZ')
+ assert not app.content # we are no longer cached
+ finally:
+ os.unlink(tempfile)
+
+def test_dir():
+ tmpdir = tempfile.mkdtemp()
+ try:
+ tmpfile = os.path.join(tmpdir, 'file')
+ tmpsubdir = os.path.join(tmpdir, 'dir')
+ fp = open(tmpfile, 'w')
+ fp.write('abcd')
+ fp.close()
+ os.mkdir(tmpsubdir)
+ try:
+ app = fileapp.DirectoryApp(tmpdir)
+ for path in ['/', '', '//', '/..', '/.', '/../..']:
+ assert TestApp(app).get(path, status=403).status == 403, ValueError(path)
+ for path in ['/~', '/foo', '/dir', '/dir/']:
+ assert TestApp(app).get(path, status=404).status == 404, ValueError(path)
+ assert TestApp(app).get('/file').body == b'abcd'
+ finally:
+ os.remove(tmpfile)
+ os.rmdir(tmpsubdir)
+ finally:
+ os.rmdir(tmpdir)
+
+def _excercize_range(build,content):
+ # full content request, but using ranges'
+ res = build("bytes=0-%d" % (len(content)-1))
+ assert res.header('accept-ranges') == 'bytes'
+ assert res.body == content
+ assert res.header('content-length') == str(len(content))
+ res = build("bytes=-%d" % (len(content)-1))
+ assert res.body == content
+ assert res.header('content-length') == str(len(content))
+ res = build("bytes=0-")
+ assert res.body == content
+ assert res.header('content-length') == str(len(content))
+ # partial content requests
+ res = build("bytes=0-9", status=206)
+ assert res.body == content[:10]
+ assert res.header('content-length') == '10'
+ res = build("bytes=%d-" % (len(content)-1), status=206)
+ assert res.body == b'Z'
+ assert res.header('content-length') == '1'
+ res = build("bytes=%d-%d" % (3,17), status=206)
+ assert res.body == content[3:18]
+ assert res.header('content-length') == '15'
+
+def test_range():
+ content = LETTERS * 5
+ if six.PY3:
+ content = content.encode('utf8')
+ def build(range, status=206):
+ app = DataApp(content)
+ return TestApp(app).get("/",headers={'Range': range}, status=status)
+ _excercize_range(build,content)
+ build('bytes=0-%d' % (len(content)+1), 416)
+
+def test_file_range():
+ tempfile = "test_fileapp.%s.txt" % (random.random())
+ content = LETTERS * (1+(fileapp.CACHE_SIZE // len(LETTERS)))
+ if six.PY3:
+ content = content.encode('utf8')
+ assert len(content) > fileapp.CACHE_SIZE
+ with open(tempfile, "wb") as fp:
+ fp.write(content)
+ try:
+ def build(range, status=206):
+ app = fileapp.FileApp(tempfile)
+ return TestApp(app).get("/",headers={'Range': range},
+ status=status)
+ _excercize_range(build,content)
+ for size in (13,len(LETTERS), len(LETTERS)-1):
+ fileapp.BLOCK_SIZE = size
+ _excercize_range(build,content)
+ finally:
+ os.unlink(tempfile)
+
+def test_file_cache():
+ filename = os.path.join(os.path.dirname(__file__),
+ 'urlparser_data', 'secured.txt')
+ app = TestApp(fileapp.FileApp(filename))
+ res = app.get('/')
+ etag = res.header('ETag')
+ last_mod = res.header('Last-Modified')
+ res = app.get('/', headers={'If-Modified-Since': last_mod},
+ status=304)
+ res = app.get('/', headers={'If-None-Match': etag},
+ status=304)
+ res = app.get('/', headers={'If-None-Match': 'asdf'},
+ status=200)
+ res = app.get('/', headers={'If-Modified-Since': 'Sat, 1 Jan 2005 12:00:00 GMT'},
+ status=200)
+ res = app.get('/', headers={'If-Modified-Since': last_mod + '; length=100'},
+ status=304)
+ res = app.get('/', headers={'If-Modified-Since': 'invalid date'},
+ status=400)
+
+def test_methods():
+ filename = os.path.join(os.path.dirname(__file__),
+ 'urlparser_data', 'secured.txt')
+ app = TestApp(fileapp.FileApp(filename))
+ get_res = app.get('')
+ res = app.get('', extra_environ={'REQUEST_METHOD': 'HEAD'})
+ assert res.headers == get_res.headers
+ assert not res.body
+ app.post('', status=405) # Method Not Allowed
+