summaryrefslogtreecommitdiff
path: root/waitress/tests/test_buffers.py
diff options
context:
space:
mode:
Diffstat (limited to 'waitress/tests/test_buffers.py')
-rw-r--r--waitress/tests/test_buffers.py489
1 files changed, 185 insertions, 304 deletions
diff --git a/waitress/tests/test_buffers.py b/waitress/tests/test_buffers.py
index 46a215e..e614d22 100644
--- a/waitress/tests/test_buffers.py
+++ b/waitress/tests/test_buffers.py
@@ -1,221 +1,186 @@
import unittest
import io
-class TestFileBasedBuffer(unittest.TestCase):
-
- def _makeOne(self, file=None, from_buffer=None):
- from waitress.buffers import FileBasedBuffer
- return FileBasedBuffer(file, from_buffer=from_buffer)
-
- def test_ctor_from_buffer_None(self):
- inst = self._makeOne('file')
- self.assertEqual(inst.file, 'file')
-
- def test_ctor_from_buffer(self):
- from_buffer = io.BytesIO(b'data')
- from_buffer.getfile = lambda *x: from_buffer
- f = io.BytesIO()
- inst = self._makeOne(f, from_buffer)
- self.assertEqual(inst.file, f)
- del from_buffer.getfile
- self.assertEqual(inst.remain, 4)
- from_buffer.close()
-
- def test___len__(self):
- inst = self._makeOne()
- inst.remain = 10
- self.assertEqual(len(inst), 10)
+class FileBasedBufferTests(object):
- def test___nonzero__(self):
- inst = self._makeOne()
- inst.remain = 10
+ def test_seekable(self):
+ inst = self._makeOneFromBytes()
+ self.assertTrue(inst.seekable)
+ self.assertEqual(inst.remaining, 0)
+
+ def test___bool__(self):
+ inst = self._makeOneFromBytes()
+ inst.remaining = 10
self.assertEqual(bool(inst), True)
- inst.remain = 0
+ inst.remaining = 0
+ self.assertEqual(bool(inst), False)
+ inst.remaining = -1
self.assertEqual(bool(inst), True)
def test_append(self):
- f = io.BytesIO(b'data')
- inst = self._makeOne(f)
+ inst = self._makeOneFromBytes(b'data')
inst.append(b'data2')
- self.assertEqual(f.getvalue(), b'datadata2')
- self.assertEqual(inst.remain, 5)
-
- def test_get_skip_true(self):
- f = io.BytesIO(b'data')
- inst = self._makeOne(f)
- result = inst.get(100, skip=True)
+ self.assertEqual(inst.remaining, 9)
+ self.assertEqual(inst.read(), b'datadata2')
+ self.assertEqual(inst.remaining, 0)
+
+ def test_read_zero(self):
+ inst = self._makeOneFromBytes(b'data')
+ result = inst.read(0)
+ self.assertEqual(result, b'')
+ self.assertEqual(inst.remaining, 4)
+
+ def test_read_all(self):
+ inst = self._makeOneFromBytes(b'data')
+ result = inst.read()
self.assertEqual(result, b'data')
- self.assertEqual(inst.remain, -4)
+ self.assertEqual(inst.remaining, 0)
- def test_get_skip_false(self):
- f = io.BytesIO(b'data')
- inst = self._makeOne(f)
- result = inst.get(100, skip=False)
- self.assertEqual(result, b'data')
- self.assertEqual(inst.remain, 0)
+ def test_read_not_enough(self):
+ inst = self._makeOneFromBytes(b'data')
+ result = inst.read(3)
+ self.assertEqual(result, b'dat')
+ self.assertEqual(inst.remaining, 1)
- def test_get_skip_bytes_less_than_zero(self):
- f = io.BytesIO(b'data')
- inst = self._makeOne(f)
- result = inst.get(-1, skip=False)
+ def test_read_exact(self):
+ inst = self._makeOneFromBytes(b'data')
+ result = inst.read(4)
self.assertEqual(result, b'data')
- self.assertEqual(inst.remain, 0)
-
- def test_skip_remain_gt_bytes(self):
- f = io.BytesIO(b'd')
- inst = self._makeOne(f)
- inst.remain = 1
- inst.skip(1)
- self.assertEqual(inst.remain, 0)
-
- def test_skip_remain_lt_bytes(self):
- f = io.BytesIO(b'd')
- inst = self._makeOne(f)
- inst.remain = 1
- self.assertRaises(ValueError, inst.skip, 2)
+ self.assertEqual(inst.remaining, 0)
- def test_newfile(self):
- inst = self._makeOne()
- self.assertRaises(NotImplementedError, inst.newfile)
-
- def test_prune_remain_notzero(self):
- f = io.BytesIO(b'd')
- inst = self._makeOne(f)
- inst.remain = 1
- nf = io.BytesIO()
- inst.newfile = lambda *x: nf
- inst.prune()
- self.assertTrue(inst.file is not f)
- self.assertEqual(nf.getvalue(), b'd')
-
- def test_prune_remain_zero_tell_notzero(self):
- f = io.BytesIO(b'd')
- inst = self._makeOne(f)
- nf = io.BytesIO(b'd')
- inst.newfile = lambda *x: nf
- inst.remain = 0
- inst.prune()
- self.assertTrue(inst.file is not f)
- self.assertEqual(nf.getvalue(), b'd')
-
- def test_prune_remain_zero_tell_zero(self):
- f = io.BytesIO()
- inst = self._makeOne(f)
- inst.remain = 0
- inst.prune()
- self.assertTrue(inst.file is f)
+ def test_read_too_much(self):
+ inst = self._makeOneFromBytes(b'data')
+ result = inst.read(100)
+ self.assertEqual(result, b'data')
+ self.assertEqual(inst.remaining, 0)
+
+ def test_rollback(self):
+ inst = self._makeOneFromBytes(b'data')
+ self.assertEqual(inst.remaining, 4)
+ result = inst.read(3)
+ self.assertEqual(inst.remaining, 1)
+ self.assertEqual(result, b'dat')
+ inst.rollback(len(result))
+ self.assertEqual(inst.remaining, 4)
+ result = inst.read()
+ self.assertEqual(inst.remaining, 0)
+ self.assertEqual(result, b'data')
def test_close(self):
- f = io.BytesIO()
- inst = self._makeOne(f)
+ inst = self._makeOneFromBytes()
inst.close()
- self.assertTrue(f.closed)
+ self.assertEqual(inst.remaining, 0)
-class TestTempfileBasedBuffer(unittest.TestCase):
+class TestTempfileBasedBuffer(FileBasedBufferTests, unittest.TestCase):
def _makeOne(self, from_buffer=None):
from waitress.buffers import TempfileBasedBuffer
- return TempfileBasedBuffer(from_buffer=from_buffer)
+ buffer = TempfileBasedBuffer(from_buffer=from_buffer)
+ self.buffers.append(buffer)
+ return buffer
- def test_newfile(self):
- inst = self._makeOne()
- r = inst.newfile()
- self.assertTrue(hasattr(r, 'fileno')) # file
+ def _makeOneFromBytes(self, from_bytes=None):
+ return self._makeOne(from_buffer=io.BytesIO(from_bytes))
-class TestBytesIOBasedBuffer(unittest.TestCase):
+ def setUp(self):
+ self.buffers = []
- def _makeOne(self, from_buffer=None):
- from waitress.buffers import BytesIOBasedBuffer
- return BytesIOBasedBuffer(from_buffer=from_buffer)
+ def tearDown(self):
+ for b in self.buffers:
+ b.close()
- def test_ctor_from_buffer_not_None(self):
- f = io.BytesIO()
- f.getfile = lambda *x: f
- inst = self._makeOne(f)
- self.assertTrue(hasattr(inst.file, 'read'))
+class TestBytesIOBasedBuffer(FileBasedBufferTests, unittest.TestCase):
- def test_ctor_from_buffer_None(self):
- inst = self._makeOne()
- self.assertTrue(hasattr(inst.file, 'read'))
+ def _makeOne(self, from_bytes=None):
+ from waitress.buffers import BytesIOBasedBuffer
+ return BytesIOBasedBuffer(from_bytes)
- def test_newfile(self):
- inst = self._makeOne()
- r = inst.newfile()
- self.assertTrue(hasattr(r, 'read'))
+ _makeOneFromBytes = _makeOne
-class TestReadOnlyFileBasedBuffer(unittest.TestCase):
+class TestReadOnlyFileBasedBuffer(FileBasedBufferTests, unittest.TestCase):
- def _makeOne(self, file, block_size=8192):
+ def _makeOne(self, file, block_size=32768):
from waitress.buffers import ReadOnlyFileBasedBuffer
- return ReadOnlyFileBasedBuffer(file, block_size)
+ buffer = ReadOnlyFileBasedBuffer(file, block_size)
+ self.buffers.append(buffer)
+ return buffer
- def test_prepare_not_seekable(self):
- f = KindaFilelike(b'abc')
- inst = self._makeOne(f)
- result = inst.prepare()
- self.assertEqual(result, False)
- self.assertEqual(inst.remain, 0)
+ def _makeOneFromBytes(self, from_bytes=None):
+ buffer = self._makeOne(io.BytesIO(from_bytes))
+ buffer.prepare()
+ return buffer
+
+ def setUp(self):
+ self.buffers = []
+
+ def tearDown(self):
+ for b in self.buffers:
+ b.close()
- def test_prepare_not_seekable_closeable(self):
- f = KindaFilelike(b'abc', close=1)
+ def test_append(self): # overrides FileBasedBufferTests.test_append
+ inst = self._makeOneFromBytes()
+ self.assertRaises(NotImplementedError, inst.append, 'a')
+
+ def test_prepare_unseekable(self):
+ f = KindaFilelike(b'abc')
inst = self._makeOne(f)
result = inst.prepare()
- self.assertEqual(result, False)
- self.assertEqual(inst.remain, 0)
- self.assertTrue(hasattr(inst, 'close'))
+ self.assertEqual(result, -1)
+ self.assertFalse(inst.seekable)
+ self.assertEqual(inst.remaining, -1)
- def test_prepare_seekable_closeable(self):
- f = Filelike(b'abc', close=1, tellresults=[0, 10])
+ def test_prepare_seekable(self):
+ f = Filelike(b'abc', tellresults=[0, 10])
inst = self._makeOne(f)
result = inst.prepare()
self.assertEqual(result, 10)
- self.assertEqual(inst.remain, 10)
+ self.assertTrue(inst.seekable)
+ self.assertEqual(inst.remaining, 10)
self.assertEqual(inst.file.seeked, 0)
- self.assertTrue(hasattr(inst, 'close'))
- def test_get_numbytes_neg_one(self):
- f = io.BytesIO(b'abcdef')
+ def test_prepare_maxsize_lt_len(self):
+ f = Filelike(b'abc', tellresults=[0, 10])
inst = self._makeOne(f)
- inst.remain = 2
- result = inst.get(-1)
- self.assertEqual(result, b'ab')
- self.assertEqual(inst.remain, 2)
- self.assertEqual(f.tell(), 0)
+ result = inst.prepare(3)
+ self.assertEqual(result, 3)
+ self.assertEqual(inst.remaining, 3)
+ self.assertTrue(inst.seekable)
- def test_get_numbytes_gt_remain(self):
- f = io.BytesIO(b'abcdef')
+ def test_prepare_maxsize_gt_len(self):
+ f = Filelike(b'abc', tellresults=[3, 10])
inst = self._makeOne(f)
- inst.remain = 2
- result = inst.get(3)
- self.assertEqual(result, b'ab')
- self.assertEqual(inst.remain, 2)
- self.assertEqual(f.tell(), 0)
+ result = inst.prepare(15)
+ self.assertEqual(result, 7)
+ self.assertEqual(inst.remaining, 7)
+ self.assertTrue(inst.seekable)
- def test_get_numbytes_lt_remain(self):
+ def test_read_numbytes_neg_one(self):
f = io.BytesIO(b'abcdef')
+ f.seek(4)
inst = self._makeOne(f)
- inst.remain = 2
- result = inst.get(1)
- self.assertEqual(result, b'a')
- self.assertEqual(inst.remain, 2)
- self.assertEqual(f.tell(), 0)
+ inst.prepare()
+ self.assertEqual(inst.remaining, 2)
+ result = inst.read(-1)
+ self.assertEqual(result, b'ef')
+ self.assertEqual(inst.remaining, 0)
+ self.assertEqual(f.tell(), 6)
- def test_get_numbytes_gt_remain_withskip(self):
+ def test_get_numbytes_gt_remain(self):
f = io.BytesIO(b'abcdef')
inst = self._makeOne(f)
- inst.remain = 2
- result = inst.get(3, skip=True)
+ inst.remaining = 2
+ result = inst.read(3)
self.assertEqual(result, b'ab')
- self.assertEqual(inst.remain, 0)
+ self.assertEqual(inst.remaining, 0)
self.assertEqual(f.tell(), 2)
- def test_get_numbytes_lt_remain_withskip(self):
+ def test_get_numbytes_lt_remain(self):
f = io.BytesIO(b'abcdef')
inst = self._makeOne(f)
- inst.remain = 2
- result = inst.get(1, skip=True)
+ inst.remaining = 2
+ result = inst.read(1)
self.assertEqual(result, b'a')
- self.assertEqual(inst.remain, 1)
+ self.assertEqual(inst.remaining, 1)
self.assertEqual(f.tell(), 1)
def test___iter__(self):
@@ -227,61 +192,37 @@ class TestReadOnlyFileBasedBuffer(unittest.TestCase):
r += val
self.assertEqual(r, data)
- def test_append(self):
- inst = self._makeOne(None)
- self.assertRaises(NotImplementedError, inst.append, 'a')
+ def test_unseekable_updates_remaining_at_eof(self):
+ f = io.BytesIO(b'abcdef')
+ inst = self._makeOne(f)
+ inst.remaining = -1
+ result1 = inst.read()
+ result2 = inst.read()
+ self.assertEqual(result1, b'abcdef')
+ self.assertEqual(result2, b'')
+ self.assertEqual(inst.remaining, 0)
+
-class TestOverflowableBuffer(unittest.TestCase):
+class TestOverflowableBuffer(FileBasedBufferTests, unittest.TestCase):
def _makeOne(self, overflow=10):
from waitress.buffers import OverflowableBuffer
- return OverflowableBuffer(overflow)
+ buffer = OverflowableBuffer(overflow)
+ self.buffers.append(buffer)
+ return buffer
- def test___len__buf_is_None(self):
- inst = self._makeOne()
- self.assertEqual(len(inst), 0)
+ def _makeOneFromBytes(self, from_bytes=None):
+ buffer = self._makeOne()
+ if from_bytes:
+ buffer.append(from_bytes)
+ return buffer
- def test___len__buf_is_not_None(self):
- inst = self._makeOne()
- inst.buf = b'abc'
- self.assertEqual(len(inst), 3)
+ def setUp(self):
+ self.buffers = []
- def test___nonzero__(self):
- inst = self._makeOne()
- inst.buf = b'abc'
- self.assertEqual(bool(inst), True)
- inst.buf = b''
- self.assertEqual(bool(inst), False)
-
- def test___nonzero___on_int_overflow_buffer(self):
- inst = self._makeOne()
-
- class int_overflow_buf(bytes):
- def __len__(self):
- # maxint + 1
- return 0x7fffffffffffffff + 1
- inst.buf = int_overflow_buf()
- self.assertEqual(bool(inst), True)
- inst.buf = b''
- self.assertEqual(bool(inst), False)
-
- def test__create_buffer_large(self):
- from waitress.buffers import TempfileBasedBuffer
- inst = self._makeOne()
- inst.strbuf = b'x' * 11
- inst._create_buffer()
- self.assertEqual(inst.buf.__class__, TempfileBasedBuffer)
- self.assertEqual(inst.buf.get(100), b'x' * 11)
- self.assertEqual(inst.strbuf, b'')
-
- def test__create_buffer_small(self):
- from waitress.buffers import BytesIOBasedBuffer
- inst = self._makeOne()
- inst.strbuf = b'x' * 5
- inst._create_buffer()
- self.assertEqual(inst.buf.__class__, BytesIOBasedBuffer)
- self.assertEqual(inst.buf.get(100), b'x' * 5)
- self.assertEqual(inst.strbuf, b'')
+ def tearDown(self):
+ for b in self.buffers:
+ b.close()
def test_append_with_len_more_than_max_int(self):
from waitress.compat import MAXINT
@@ -289,123 +230,66 @@ class TestOverflowableBuffer(unittest.TestCase):
inst.overflowed = True
buf = DummyBuffer(length=MAXINT)
inst.buf = buf
+ inst.remaining = MAXINT
result = inst.append(b'x')
# we don't want this to throw an OverflowError on Python 2 (see
# https://github.com/Pylons/waitress/issues/47)
self.assertEqual(result, None)
- def test_append_buf_None_not_longer_than_srtbuf_limit(self):
+ def test_append_buf_None_not_longer_than_strbuf_limit(self):
inst = self._makeOne()
inst.strbuf = b'x' * 5
+ inst.remaining = len(inst.strbuf)
inst.append(b'hello')
self.assertEqual(inst.strbuf, b'xxxxxhello')
+ self.assertEqual(inst.remaining, 10)
def test_append_buf_None_longer_than_strbuf_limit(self):
inst = self._makeOne(10000)
inst.strbuf = b'x' * 8192
+ inst.remaining = len(inst.strbuf)
inst.append(b'hello')
self.assertEqual(inst.strbuf, b'')
- self.assertEqual(len(inst.buf), 8197)
+ self.assertEqual(inst.buf.remaining, 8197)
def test_append_overflow(self):
inst = self._makeOne(10)
inst.strbuf = b'x' * 8192
+ inst.remaining = len(inst.strbuf)
inst.append(b'hello')
self.assertEqual(inst.strbuf, b'')
- self.assertEqual(len(inst.buf), 8197)
+ self.assertEqual(inst.buf.remaining, 8197)
def test_append_sz_gt_overflow(self):
from waitress.buffers import BytesIOBasedBuffer
- f = io.BytesIO(b'data')
- inst = self._makeOne(f)
+ inst = self._makeOne()
buf = BytesIOBasedBuffer()
inst.buf = buf
inst.overflow = 2
inst.append(b'data2')
- self.assertEqual(f.getvalue(), b'data')
self.assertTrue(inst.overflowed)
self.assertNotEqual(inst.buf, buf)
- def test_get_buf_None_skip_False(self):
- inst = self._makeOne()
- inst.strbuf = b'x' * 5
- r = inst.get(5)
- self.assertEqual(r, b'xxxxx')
-
- def test_get_buf_None_skip_True(self):
- inst = self._makeOne()
- inst.strbuf = b'x' * 5
- r = inst.get(5, skip=True)
- self.assertFalse(inst.buf is None)
- self.assertEqual(r, b'xxxxx')
-
- def test_skip_buf_None(self):
- inst = self._makeOne()
- inst.strbuf = b'data'
- inst.skip(4)
- self.assertEqual(inst.strbuf, b'')
- self.assertNotEqual(inst.buf, None)
-
- def test_skip_buf_None_allow_prune_True(self):
- inst = self._makeOne()
- inst.strbuf = b'data'
- inst.skip(4, True)
- self.assertEqual(inst.strbuf, b'')
- self.assertEqual(inst.buf, None)
-
- def test_prune_buf_None(self):
- inst = self._makeOne()
- inst.prune()
- self.assertEqual(inst.strbuf, b'')
-
- def test_prune_with_buf(self):
- inst = self._makeOne()
- class Buf(object):
- def prune(self):
- self.pruned = True
- inst.buf = Buf()
- inst.prune()
- self.assertEqual(inst.buf.pruned, True)
-
- def test_prune_with_buf_overflow(self):
- inst = self._makeOne()
- class DummyBuffer(io.BytesIO):
- def getfile(self):
- return self
- def prune(self):
- return True
- def __len__(self):
- return 5
- buf = DummyBuffer(b'data')
- inst.buf = buf
- inst.overflowed = True
- inst.overflow = 10
- inst.prune()
- self.assertNotEqual(inst.buf, buf)
-
- def test_prune_with_buflen_more_than_max_int(self):
- from waitress.compat import MAXINT
- inst = self._makeOne()
- inst.overflowed = True
- buf = DummyBuffer(length=MAXINT+1)
- inst.buf = buf
- result = inst.prune()
- # we don't want this to throw an OverflowError on Python 2 (see
- # https://github.com/Pylons/waitress/issues/47)
- self.assertEqual(result, None)
-
- def test_getfile_buf_None(self):
- inst = self._makeOne()
- f = inst.getfile()
- self.assertTrue(hasattr(f, 'read'))
+ def test_read_strbuf(self):
+ inst = self._makeOne(10)
+ inst.strbuf = b'x'
+ inst.remaining = len(inst.strbuf)
+ result = inst.read()
+ self.assertEqual(result, b'x')
+ self.assertEqual(inst.remaining, 0)
- def test_getfile_buf_not_None(self):
- inst = self._makeOne()
- buf = io.BytesIO()
- buf.getfile = lambda *x: buf
- inst.buf = buf
- f = inst.getfile()
- self.assertEqual(f, buf)
+ def test_rollback_strbuf(self):
+ inst = self._makeOne(10)
+ inst.strbuf = b'x'
+ inst.remaining = len(inst.strbuf)
+ result = inst.read()
+ self.assertEqual(result, b'x')
+ self.assertEqual(inst.remaining, 0)
+ inst.rollback(1)
+ self.assertEqual(inst.remaining, 1)
+ result = inst.read()
+ self.assertEqual(result, b'x')
+ self.assertEqual(inst.remaining, 0)
def test_close_nobuf(self):
inst = self._makeOne()
@@ -428,7 +312,7 @@ class KindaFilelike(object):
self.bytes = bytes
self.tellresults = tellresults
if close is not None:
- self.close = close
+ self.close = lambda: close
class Filelike(KindaFilelike):
@@ -441,13 +325,10 @@ class Filelike(KindaFilelike):
class DummyBuffer(object):
def __init__(self, length=0):
- self.length = length
-
- def __len__(self):
- return self.length
+ self.remaining = length
def append(self, s):
- self.length = self.length + len(s)
+ self.remaining = self.remaining + len(s)
- def prune(self):
- pass
+ def close(self):
+ self.closed = True