diff options
Diffstat (limited to 'waitress/tests/test_buffers.py')
-rw-r--r-- | waitress/tests/test_buffers.py | 489 |
1 files changed, 185 insertions, 304 deletions
diff --git a/waitress/tests/test_buffers.py b/waitress/tests/test_buffers.py index 46a215e..e614d22 100644 --- a/waitress/tests/test_buffers.py +++ b/waitress/tests/test_buffers.py @@ -1,221 +1,186 @@ import unittest import io -class TestFileBasedBuffer(unittest.TestCase): - - def _makeOne(self, file=None, from_buffer=None): - from waitress.buffers import FileBasedBuffer - return FileBasedBuffer(file, from_buffer=from_buffer) - - def test_ctor_from_buffer_None(self): - inst = self._makeOne('file') - self.assertEqual(inst.file, 'file') - - def test_ctor_from_buffer(self): - from_buffer = io.BytesIO(b'data') - from_buffer.getfile = lambda *x: from_buffer - f = io.BytesIO() - inst = self._makeOne(f, from_buffer) - self.assertEqual(inst.file, f) - del from_buffer.getfile - self.assertEqual(inst.remain, 4) - from_buffer.close() - - def test___len__(self): - inst = self._makeOne() - inst.remain = 10 - self.assertEqual(len(inst), 10) +class FileBasedBufferTests(object): - def test___nonzero__(self): - inst = self._makeOne() - inst.remain = 10 + def test_seekable(self): + inst = self._makeOneFromBytes() + self.assertTrue(inst.seekable) + self.assertEqual(inst.remaining, 0) + + def test___bool__(self): + inst = self._makeOneFromBytes() + inst.remaining = 10 self.assertEqual(bool(inst), True) - inst.remain = 0 + inst.remaining = 0 + self.assertEqual(bool(inst), False) + inst.remaining = -1 self.assertEqual(bool(inst), True) def test_append(self): - f = io.BytesIO(b'data') - inst = self._makeOne(f) + inst = self._makeOneFromBytes(b'data') inst.append(b'data2') - self.assertEqual(f.getvalue(), b'datadata2') - self.assertEqual(inst.remain, 5) - - def test_get_skip_true(self): - f = io.BytesIO(b'data') - inst = self._makeOne(f) - result = inst.get(100, skip=True) + self.assertEqual(inst.remaining, 9) + self.assertEqual(inst.read(), b'datadata2') + self.assertEqual(inst.remaining, 0) + + def test_read_zero(self): + inst = self._makeOneFromBytes(b'data') + result = inst.read(0) + self.assertEqual(result, b'') + self.assertEqual(inst.remaining, 4) + + def test_read_all(self): + inst = self._makeOneFromBytes(b'data') + result = inst.read() self.assertEqual(result, b'data') - self.assertEqual(inst.remain, -4) + self.assertEqual(inst.remaining, 0) - def test_get_skip_false(self): - f = io.BytesIO(b'data') - inst = self._makeOne(f) - result = inst.get(100, skip=False) - self.assertEqual(result, b'data') - self.assertEqual(inst.remain, 0) + def test_read_not_enough(self): + inst = self._makeOneFromBytes(b'data') + result = inst.read(3) + self.assertEqual(result, b'dat') + self.assertEqual(inst.remaining, 1) - def test_get_skip_bytes_less_than_zero(self): - f = io.BytesIO(b'data') - inst = self._makeOne(f) - result = inst.get(-1, skip=False) + def test_read_exact(self): + inst = self._makeOneFromBytes(b'data') + result = inst.read(4) self.assertEqual(result, b'data') - self.assertEqual(inst.remain, 0) - - def test_skip_remain_gt_bytes(self): - f = io.BytesIO(b'd') - inst = self._makeOne(f) - inst.remain = 1 - inst.skip(1) - self.assertEqual(inst.remain, 0) - - def test_skip_remain_lt_bytes(self): - f = io.BytesIO(b'd') - inst = self._makeOne(f) - inst.remain = 1 - self.assertRaises(ValueError, inst.skip, 2) + self.assertEqual(inst.remaining, 0) - def test_newfile(self): - inst = self._makeOne() - self.assertRaises(NotImplementedError, inst.newfile) - - def test_prune_remain_notzero(self): - f = io.BytesIO(b'd') - inst = self._makeOne(f) - inst.remain = 1 - nf = io.BytesIO() - inst.newfile = lambda *x: nf - inst.prune() - self.assertTrue(inst.file is not f) - self.assertEqual(nf.getvalue(), b'd') - - def test_prune_remain_zero_tell_notzero(self): - f = io.BytesIO(b'd') - inst = self._makeOne(f) - nf = io.BytesIO(b'd') - inst.newfile = lambda *x: nf - inst.remain = 0 - inst.prune() - self.assertTrue(inst.file is not f) - self.assertEqual(nf.getvalue(), b'd') - - def test_prune_remain_zero_tell_zero(self): - f = io.BytesIO() - inst = self._makeOne(f) - inst.remain = 0 - inst.prune() - self.assertTrue(inst.file is f) + def test_read_too_much(self): + inst = self._makeOneFromBytes(b'data') + result = inst.read(100) + self.assertEqual(result, b'data') + self.assertEqual(inst.remaining, 0) + + def test_rollback(self): + inst = self._makeOneFromBytes(b'data') + self.assertEqual(inst.remaining, 4) + result = inst.read(3) + self.assertEqual(inst.remaining, 1) + self.assertEqual(result, b'dat') + inst.rollback(len(result)) + self.assertEqual(inst.remaining, 4) + result = inst.read() + self.assertEqual(inst.remaining, 0) + self.assertEqual(result, b'data') def test_close(self): - f = io.BytesIO() - inst = self._makeOne(f) + inst = self._makeOneFromBytes() inst.close() - self.assertTrue(f.closed) + self.assertEqual(inst.remaining, 0) -class TestTempfileBasedBuffer(unittest.TestCase): +class TestTempfileBasedBuffer(FileBasedBufferTests, unittest.TestCase): def _makeOne(self, from_buffer=None): from waitress.buffers import TempfileBasedBuffer - return TempfileBasedBuffer(from_buffer=from_buffer) + buffer = TempfileBasedBuffer(from_buffer=from_buffer) + self.buffers.append(buffer) + return buffer - def test_newfile(self): - inst = self._makeOne() - r = inst.newfile() - self.assertTrue(hasattr(r, 'fileno')) # file + def _makeOneFromBytes(self, from_bytes=None): + return self._makeOne(from_buffer=io.BytesIO(from_bytes)) -class TestBytesIOBasedBuffer(unittest.TestCase): + def setUp(self): + self.buffers = [] - def _makeOne(self, from_buffer=None): - from waitress.buffers import BytesIOBasedBuffer - return BytesIOBasedBuffer(from_buffer=from_buffer) + def tearDown(self): + for b in self.buffers: + b.close() - def test_ctor_from_buffer_not_None(self): - f = io.BytesIO() - f.getfile = lambda *x: f - inst = self._makeOne(f) - self.assertTrue(hasattr(inst.file, 'read')) +class TestBytesIOBasedBuffer(FileBasedBufferTests, unittest.TestCase): - def test_ctor_from_buffer_None(self): - inst = self._makeOne() - self.assertTrue(hasattr(inst.file, 'read')) + def _makeOne(self, from_bytes=None): + from waitress.buffers import BytesIOBasedBuffer + return BytesIOBasedBuffer(from_bytes) - def test_newfile(self): - inst = self._makeOne() - r = inst.newfile() - self.assertTrue(hasattr(r, 'read')) + _makeOneFromBytes = _makeOne -class TestReadOnlyFileBasedBuffer(unittest.TestCase): +class TestReadOnlyFileBasedBuffer(FileBasedBufferTests, unittest.TestCase): - def _makeOne(self, file, block_size=8192): + def _makeOne(self, file, block_size=32768): from waitress.buffers import ReadOnlyFileBasedBuffer - return ReadOnlyFileBasedBuffer(file, block_size) + buffer = ReadOnlyFileBasedBuffer(file, block_size) + self.buffers.append(buffer) + return buffer - def test_prepare_not_seekable(self): - f = KindaFilelike(b'abc') - inst = self._makeOne(f) - result = inst.prepare() - self.assertEqual(result, False) - self.assertEqual(inst.remain, 0) + def _makeOneFromBytes(self, from_bytes=None): + buffer = self._makeOne(io.BytesIO(from_bytes)) + buffer.prepare() + return buffer + + def setUp(self): + self.buffers = [] + + def tearDown(self): + for b in self.buffers: + b.close() - def test_prepare_not_seekable_closeable(self): - f = KindaFilelike(b'abc', close=1) + def test_append(self): # overrides FileBasedBufferTests.test_append + inst = self._makeOneFromBytes() + self.assertRaises(NotImplementedError, inst.append, 'a') + + def test_prepare_unseekable(self): + f = KindaFilelike(b'abc') inst = self._makeOne(f) result = inst.prepare() - self.assertEqual(result, False) - self.assertEqual(inst.remain, 0) - self.assertTrue(hasattr(inst, 'close')) + self.assertEqual(result, -1) + self.assertFalse(inst.seekable) + self.assertEqual(inst.remaining, -1) - def test_prepare_seekable_closeable(self): - f = Filelike(b'abc', close=1, tellresults=[0, 10]) + def test_prepare_seekable(self): + f = Filelike(b'abc', tellresults=[0, 10]) inst = self._makeOne(f) result = inst.prepare() self.assertEqual(result, 10) - self.assertEqual(inst.remain, 10) + self.assertTrue(inst.seekable) + self.assertEqual(inst.remaining, 10) self.assertEqual(inst.file.seeked, 0) - self.assertTrue(hasattr(inst, 'close')) - def test_get_numbytes_neg_one(self): - f = io.BytesIO(b'abcdef') + def test_prepare_maxsize_lt_len(self): + f = Filelike(b'abc', tellresults=[0, 10]) inst = self._makeOne(f) - inst.remain = 2 - result = inst.get(-1) - self.assertEqual(result, b'ab') - self.assertEqual(inst.remain, 2) - self.assertEqual(f.tell(), 0) + result = inst.prepare(3) + self.assertEqual(result, 3) + self.assertEqual(inst.remaining, 3) + self.assertTrue(inst.seekable) - def test_get_numbytes_gt_remain(self): - f = io.BytesIO(b'abcdef') + def test_prepare_maxsize_gt_len(self): + f = Filelike(b'abc', tellresults=[3, 10]) inst = self._makeOne(f) - inst.remain = 2 - result = inst.get(3) - self.assertEqual(result, b'ab') - self.assertEqual(inst.remain, 2) - self.assertEqual(f.tell(), 0) + result = inst.prepare(15) + self.assertEqual(result, 7) + self.assertEqual(inst.remaining, 7) + self.assertTrue(inst.seekable) - def test_get_numbytes_lt_remain(self): + def test_read_numbytes_neg_one(self): f = io.BytesIO(b'abcdef') + f.seek(4) inst = self._makeOne(f) - inst.remain = 2 - result = inst.get(1) - self.assertEqual(result, b'a') - self.assertEqual(inst.remain, 2) - self.assertEqual(f.tell(), 0) + inst.prepare() + self.assertEqual(inst.remaining, 2) + result = inst.read(-1) + self.assertEqual(result, b'ef') + self.assertEqual(inst.remaining, 0) + self.assertEqual(f.tell(), 6) - def test_get_numbytes_gt_remain_withskip(self): + def test_get_numbytes_gt_remain(self): f = io.BytesIO(b'abcdef') inst = self._makeOne(f) - inst.remain = 2 - result = inst.get(3, skip=True) + inst.remaining = 2 + result = inst.read(3) self.assertEqual(result, b'ab') - self.assertEqual(inst.remain, 0) + self.assertEqual(inst.remaining, 0) self.assertEqual(f.tell(), 2) - def test_get_numbytes_lt_remain_withskip(self): + def test_get_numbytes_lt_remain(self): f = io.BytesIO(b'abcdef') inst = self._makeOne(f) - inst.remain = 2 - result = inst.get(1, skip=True) + inst.remaining = 2 + result = inst.read(1) self.assertEqual(result, b'a') - self.assertEqual(inst.remain, 1) + self.assertEqual(inst.remaining, 1) self.assertEqual(f.tell(), 1) def test___iter__(self): @@ -227,61 +192,37 @@ class TestReadOnlyFileBasedBuffer(unittest.TestCase): r += val self.assertEqual(r, data) - def test_append(self): - inst = self._makeOne(None) - self.assertRaises(NotImplementedError, inst.append, 'a') + def test_unseekable_updates_remaining_at_eof(self): + f = io.BytesIO(b'abcdef') + inst = self._makeOne(f) + inst.remaining = -1 + result1 = inst.read() + result2 = inst.read() + self.assertEqual(result1, b'abcdef') + self.assertEqual(result2, b'') + self.assertEqual(inst.remaining, 0) + -class TestOverflowableBuffer(unittest.TestCase): +class TestOverflowableBuffer(FileBasedBufferTests, unittest.TestCase): def _makeOne(self, overflow=10): from waitress.buffers import OverflowableBuffer - return OverflowableBuffer(overflow) + buffer = OverflowableBuffer(overflow) + self.buffers.append(buffer) + return buffer - def test___len__buf_is_None(self): - inst = self._makeOne() - self.assertEqual(len(inst), 0) + def _makeOneFromBytes(self, from_bytes=None): + buffer = self._makeOne() + if from_bytes: + buffer.append(from_bytes) + return buffer - def test___len__buf_is_not_None(self): - inst = self._makeOne() - inst.buf = b'abc' - self.assertEqual(len(inst), 3) + def setUp(self): + self.buffers = [] - def test___nonzero__(self): - inst = self._makeOne() - inst.buf = b'abc' - self.assertEqual(bool(inst), True) - inst.buf = b'' - self.assertEqual(bool(inst), False) - - def test___nonzero___on_int_overflow_buffer(self): - inst = self._makeOne() - - class int_overflow_buf(bytes): - def __len__(self): - # maxint + 1 - return 0x7fffffffffffffff + 1 - inst.buf = int_overflow_buf() - self.assertEqual(bool(inst), True) - inst.buf = b'' - self.assertEqual(bool(inst), False) - - def test__create_buffer_large(self): - from waitress.buffers import TempfileBasedBuffer - inst = self._makeOne() - inst.strbuf = b'x' * 11 - inst._create_buffer() - self.assertEqual(inst.buf.__class__, TempfileBasedBuffer) - self.assertEqual(inst.buf.get(100), b'x' * 11) - self.assertEqual(inst.strbuf, b'') - - def test__create_buffer_small(self): - from waitress.buffers import BytesIOBasedBuffer - inst = self._makeOne() - inst.strbuf = b'x' * 5 - inst._create_buffer() - self.assertEqual(inst.buf.__class__, BytesIOBasedBuffer) - self.assertEqual(inst.buf.get(100), b'x' * 5) - self.assertEqual(inst.strbuf, b'') + def tearDown(self): + for b in self.buffers: + b.close() def test_append_with_len_more_than_max_int(self): from waitress.compat import MAXINT @@ -289,123 +230,66 @@ class TestOverflowableBuffer(unittest.TestCase): inst.overflowed = True buf = DummyBuffer(length=MAXINT) inst.buf = buf + inst.remaining = MAXINT result = inst.append(b'x') # we don't want this to throw an OverflowError on Python 2 (see # https://github.com/Pylons/waitress/issues/47) self.assertEqual(result, None) - def test_append_buf_None_not_longer_than_srtbuf_limit(self): + def test_append_buf_None_not_longer_than_strbuf_limit(self): inst = self._makeOne() inst.strbuf = b'x' * 5 + inst.remaining = len(inst.strbuf) inst.append(b'hello') self.assertEqual(inst.strbuf, b'xxxxxhello') + self.assertEqual(inst.remaining, 10) def test_append_buf_None_longer_than_strbuf_limit(self): inst = self._makeOne(10000) inst.strbuf = b'x' * 8192 + inst.remaining = len(inst.strbuf) inst.append(b'hello') self.assertEqual(inst.strbuf, b'') - self.assertEqual(len(inst.buf), 8197) + self.assertEqual(inst.buf.remaining, 8197) def test_append_overflow(self): inst = self._makeOne(10) inst.strbuf = b'x' * 8192 + inst.remaining = len(inst.strbuf) inst.append(b'hello') self.assertEqual(inst.strbuf, b'') - self.assertEqual(len(inst.buf), 8197) + self.assertEqual(inst.buf.remaining, 8197) def test_append_sz_gt_overflow(self): from waitress.buffers import BytesIOBasedBuffer - f = io.BytesIO(b'data') - inst = self._makeOne(f) + inst = self._makeOne() buf = BytesIOBasedBuffer() inst.buf = buf inst.overflow = 2 inst.append(b'data2') - self.assertEqual(f.getvalue(), b'data') self.assertTrue(inst.overflowed) self.assertNotEqual(inst.buf, buf) - def test_get_buf_None_skip_False(self): - inst = self._makeOne() - inst.strbuf = b'x' * 5 - r = inst.get(5) - self.assertEqual(r, b'xxxxx') - - def test_get_buf_None_skip_True(self): - inst = self._makeOne() - inst.strbuf = b'x' * 5 - r = inst.get(5, skip=True) - self.assertFalse(inst.buf is None) - self.assertEqual(r, b'xxxxx') - - def test_skip_buf_None(self): - inst = self._makeOne() - inst.strbuf = b'data' - inst.skip(4) - self.assertEqual(inst.strbuf, b'') - self.assertNotEqual(inst.buf, None) - - def test_skip_buf_None_allow_prune_True(self): - inst = self._makeOne() - inst.strbuf = b'data' - inst.skip(4, True) - self.assertEqual(inst.strbuf, b'') - self.assertEqual(inst.buf, None) - - def test_prune_buf_None(self): - inst = self._makeOne() - inst.prune() - self.assertEqual(inst.strbuf, b'') - - def test_prune_with_buf(self): - inst = self._makeOne() - class Buf(object): - def prune(self): - self.pruned = True - inst.buf = Buf() - inst.prune() - self.assertEqual(inst.buf.pruned, True) - - def test_prune_with_buf_overflow(self): - inst = self._makeOne() - class DummyBuffer(io.BytesIO): - def getfile(self): - return self - def prune(self): - return True - def __len__(self): - return 5 - buf = DummyBuffer(b'data') - inst.buf = buf - inst.overflowed = True - inst.overflow = 10 - inst.prune() - self.assertNotEqual(inst.buf, buf) - - def test_prune_with_buflen_more_than_max_int(self): - from waitress.compat import MAXINT - inst = self._makeOne() - inst.overflowed = True - buf = DummyBuffer(length=MAXINT+1) - inst.buf = buf - result = inst.prune() - # we don't want this to throw an OverflowError on Python 2 (see - # https://github.com/Pylons/waitress/issues/47) - self.assertEqual(result, None) - - def test_getfile_buf_None(self): - inst = self._makeOne() - f = inst.getfile() - self.assertTrue(hasattr(f, 'read')) + def test_read_strbuf(self): + inst = self._makeOne(10) + inst.strbuf = b'x' + inst.remaining = len(inst.strbuf) + result = inst.read() + self.assertEqual(result, b'x') + self.assertEqual(inst.remaining, 0) - def test_getfile_buf_not_None(self): - inst = self._makeOne() - buf = io.BytesIO() - buf.getfile = lambda *x: buf - inst.buf = buf - f = inst.getfile() - self.assertEqual(f, buf) + def test_rollback_strbuf(self): + inst = self._makeOne(10) + inst.strbuf = b'x' + inst.remaining = len(inst.strbuf) + result = inst.read() + self.assertEqual(result, b'x') + self.assertEqual(inst.remaining, 0) + inst.rollback(1) + self.assertEqual(inst.remaining, 1) + result = inst.read() + self.assertEqual(result, b'x') + self.assertEqual(inst.remaining, 0) def test_close_nobuf(self): inst = self._makeOne() @@ -428,7 +312,7 @@ class KindaFilelike(object): self.bytes = bytes self.tellresults = tellresults if close is not None: - self.close = close + self.close = lambda: close class Filelike(KindaFilelike): @@ -441,13 +325,10 @@ class Filelike(KindaFilelike): class DummyBuffer(object): def __init__(self, length=0): - self.length = length - - def __len__(self): - return self.length + self.remaining = length def append(self, s): - self.length = self.length + len(s) + self.remaining = self.remaining + len(s) - def prune(self): - pass + def close(self): + self.closed = True |