import io import unittest class TestFileBasedBuffer(unittest.TestCase): def _makeOne(self, file=None, from_buffer=None): from waitress.buffers import FileBasedBuffer buf = FileBasedBuffer(file, from_buffer=from_buffer) self.buffers_to_close.append(buf) return buf def setUp(self): self.buffers_to_close = [] def tearDown(self): for buf in self.buffers_to_close: buf.close() def test_ctor_from_buffer_None(self): inst = self._makeOne("file") self.assertEqual(inst.file, "file") def test_ctor_from_buffer(self): from_buffer = io.BytesIO(b"data") from_buffer.getfile = lambda *x: from_buffer f = io.BytesIO() inst = self._makeOne(f, from_buffer) self.assertEqual(inst.file, f) del from_buffer.getfile self.assertEqual(inst.remain, 4) from_buffer.close() def test___len__(self): inst = self._makeOne() inst.remain = 10 self.assertEqual(len(inst), 10) def test___nonzero__(self): inst = self._makeOne() inst.remain = 10 self.assertEqual(bool(inst), True) inst.remain = 0 self.assertEqual(bool(inst), True) def test_append(self): f = io.BytesIO(b"data") inst = self._makeOne(f) inst.append(b"data2") self.assertEqual(f.getvalue(), b"datadata2") self.assertEqual(inst.remain, 5) def test_get_skip_true(self): f = io.BytesIO(b"data") inst = self._makeOne(f) result = inst.get(100, skip=True) self.assertEqual(result, b"data") self.assertEqual(inst.remain, -4) def test_get_skip_false(self): f = io.BytesIO(b"data") inst = self._makeOne(f) result = inst.get(100, skip=False) self.assertEqual(result, b"data") self.assertEqual(inst.remain, 0) def test_get_skip_bytes_less_than_zero(self): f = io.BytesIO(b"data") inst = self._makeOne(f) result = inst.get(-1, skip=False) self.assertEqual(result, b"data") self.assertEqual(inst.remain, 0) def test_skip_remain_gt_bytes(self): f = io.BytesIO(b"d") inst = self._makeOne(f) inst.remain = 1 inst.skip(1) self.assertEqual(inst.remain, 0) def test_skip_remain_lt_bytes(self): f = io.BytesIO(b"d") inst = self._makeOne(f) inst.remain = 1 self.assertRaises(ValueError, inst.skip, 2) def test_newfile(self): inst = self._makeOne() self.assertRaises(NotImplementedError, inst.newfile) def test_prune_remain_notzero(self): f = io.BytesIO(b"d") inst = self._makeOne(f) inst.remain = 1 nf = io.BytesIO() inst.newfile = lambda *x: nf inst.prune() self.assertTrue(inst.file is not f) self.assertEqual(nf.getvalue(), b"d") def test_prune_remain_zero_tell_notzero(self): f = io.BytesIO(b"d") inst = self._makeOne(f) nf = io.BytesIO(b"d") inst.newfile = lambda *x: nf inst.remain = 0 inst.prune() self.assertTrue(inst.file is not f) self.assertEqual(nf.getvalue(), b"d") def test_prune_remain_zero_tell_zero(self): f = io.BytesIO() inst = self._makeOne(f) inst.remain = 0 inst.prune() self.assertTrue(inst.file is f) def test_close(self): f = io.BytesIO() inst = self._makeOne(f) inst.close() self.assertTrue(f.closed) self.buffers_to_close.remove(inst) class TestTempfileBasedBuffer(unittest.TestCase): def _makeOne(self, from_buffer=None): from waitress.buffers import TempfileBasedBuffer buf = TempfileBasedBuffer(from_buffer=from_buffer) self.buffers_to_close.append(buf) return buf def setUp(self): self.buffers_to_close = [] def tearDown(self): for buf in self.buffers_to_close: buf.close() def test_newfile(self): inst = self._makeOne() r = inst.newfile() self.assertTrue(hasattr(r, "fileno")) # file r.close() class TestBytesIOBasedBuffer(unittest.TestCase): def _makeOne(self, from_buffer=None): from waitress.buffers import BytesIOBasedBuffer return BytesIOBasedBuffer(from_buffer=from_buffer) def test_ctor_from_buffer_not_None(self): f = io.BytesIO() f.getfile = lambda *x: f inst = self._makeOne(f) self.assertTrue(hasattr(inst.file, "read")) def test_ctor_from_buffer_None(self): inst = self._makeOne() self.assertTrue(hasattr(inst.file, "read")) def test_newfile(self): inst = self._makeOne() r = inst.newfile() self.assertTrue(hasattr(r, "read")) class TestReadOnlyFileBasedBuffer(unittest.TestCase): def _makeOne(self, file, block_size=8192): from waitress.buffers import ReadOnlyFileBasedBuffer buf = ReadOnlyFileBasedBuffer(file, block_size) self.buffers_to_close.append(buf) return buf def setUp(self): self.buffers_to_close = [] def tearDown(self): for buf in self.buffers_to_close: buf.close() def test_prepare_not_seekable(self): f = KindaFilelike(b"abc") inst = self._makeOne(f) result = inst.prepare() self.assertEqual(result, False) self.assertEqual(inst.remain, 0) def test_prepare_not_seekable_closeable(self): f = KindaFilelike(b"abc", close=1) inst = self._makeOne(f) result = inst.prepare() self.assertEqual(result, False) self.assertEqual(inst.remain, 0) self.assertTrue(hasattr(inst, "close")) def test_prepare_seekable_closeable(self): f = Filelike(b"abc", close=1, tellresults=[0, 10]) inst = self._makeOne(f) result = inst.prepare() self.assertEqual(result, 10) self.assertEqual(inst.remain, 10) self.assertEqual(inst.file.seeked, 0) self.assertTrue(hasattr(inst, "close")) def test_get_numbytes_neg_one(self): f = io.BytesIO(b"abcdef") inst = self._makeOne(f) inst.remain = 2 result = inst.get(-1) self.assertEqual(result, b"ab") self.assertEqual(inst.remain, 2) self.assertEqual(f.tell(), 0) def test_get_numbytes_gt_remain(self): f = io.BytesIO(b"abcdef") inst = self._makeOne(f) inst.remain = 2 result = inst.get(3) self.assertEqual(result, b"ab") self.assertEqual(inst.remain, 2) self.assertEqual(f.tell(), 0) def test_get_numbytes_lt_remain(self): f = io.BytesIO(b"abcdef") inst = self._makeOne(f) inst.remain = 2 result = inst.get(1) self.assertEqual(result, b"a") self.assertEqual(inst.remain, 2) self.assertEqual(f.tell(), 0) def test_get_numbytes_gt_remain_withskip(self): f = io.BytesIO(b"abcdef") inst = self._makeOne(f) inst.remain = 2 result = inst.get(3, skip=True) self.assertEqual(result, b"ab") self.assertEqual(inst.remain, 0) self.assertEqual(f.tell(), 2) def test_get_numbytes_lt_remain_withskip(self): f = io.BytesIO(b"abcdef") inst = self._makeOne(f) inst.remain = 2 result = inst.get(1, skip=True) self.assertEqual(result, b"a") self.assertEqual(inst.remain, 1) self.assertEqual(f.tell(), 1) def test___iter__(self): data = b"a" * 10000 f = io.BytesIO(data) inst = self._makeOne(f) r = b"" for val in inst: r += val self.assertEqual(r, data) def test_append(self): inst = self._makeOne(None) self.assertRaises(NotImplementedError, inst.append, "a") class TestOverflowableBuffer(unittest.TestCase): def _makeOne(self, overflow=10): from waitress.buffers import OverflowableBuffer buf = OverflowableBuffer(overflow) self.buffers_to_close.append(buf) return buf def setUp(self): self.buffers_to_close = [] def tearDown(self): for buf in self.buffers_to_close: buf.close() def test___len__buf_is_None(self): inst = self._makeOne() self.assertEqual(len(inst), 0) def test___len__buf_is_not_None(self): inst = self._makeOne() inst.buf = b"abc" self.assertEqual(len(inst), 3) self.buffers_to_close.remove(inst) def test___nonzero__(self): inst = self._makeOne() inst.buf = b"abc" self.assertEqual(bool(inst), True) inst.buf = b"" self.assertEqual(bool(inst), False) self.buffers_to_close.remove(inst) def test___nonzero___on_int_overflow_buffer(self): inst = self._makeOne() class int_overflow_buf(bytes): def __len__(self): # maxint + 1 return 0x7FFFFFFFFFFFFFFF + 1 inst.buf = int_overflow_buf() self.assertEqual(bool(inst), True) inst.buf = b"" self.assertEqual(bool(inst), False) self.buffers_to_close.remove(inst) def test__create_buffer_large(self): from waitress.buffers import TempfileBasedBuffer inst = self._makeOne() inst.strbuf = b"x" * 11 inst._create_buffer() self.assertEqual(inst.buf.__class__, TempfileBasedBuffer) self.assertEqual(inst.buf.get(100), b"x" * 11) self.assertEqual(inst.strbuf, b"") def test__create_buffer_small(self): from waitress.buffers import BytesIOBasedBuffer inst = self._makeOne() inst.strbuf = b"x" * 5 inst._create_buffer() self.assertEqual(inst.buf.__class__, BytesIOBasedBuffer) self.assertEqual(inst.buf.get(100), b"x" * 5) self.assertEqual(inst.strbuf, b"") def test_append_with_len_more_than_max_int(self): from waitress.compat import MAXINT inst = self._makeOne() inst.overflowed = True buf = DummyBuffer(length=MAXINT) inst.buf = buf result = inst.append(b"x") # we don't want this to throw an OverflowError on Python 2 (see # https://github.com/Pylons/waitress/issues/47) self.assertEqual(result, None) self.buffers_to_close.remove(inst) def test_append_buf_None_not_longer_than_srtbuf_limit(self): inst = self._makeOne() inst.strbuf = b"x" * 5 inst.append(b"hello") self.assertEqual(inst.strbuf, b"xxxxxhello") def test_append_buf_None_longer_than_strbuf_limit(self): inst = self._makeOne(10000) inst.strbuf = b"x" * 8192 inst.append(b"hello") self.assertEqual(inst.strbuf, b"") self.assertEqual(len(inst.buf), 8197) def test_append_overflow(self): inst = self._makeOne(10) inst.strbuf = b"x" * 8192 inst.append(b"hello") self.assertEqual(inst.strbuf, b"") self.assertEqual(len(inst.buf), 8197) def test_append_sz_gt_overflow(self): from waitress.buffers import BytesIOBasedBuffer f = io.BytesIO(b"data") inst = self._makeOne(f) buf = BytesIOBasedBuffer() inst.buf = buf inst.overflow = 2 inst.append(b"data2") self.assertEqual(f.getvalue(), b"data") self.assertTrue(inst.overflowed) self.assertNotEqual(inst.buf, buf) def test_get_buf_None_skip_False(self): inst = self._makeOne() inst.strbuf = b"x" * 5 r = inst.get(5) self.assertEqual(r, b"xxxxx") def test_get_buf_None_skip_True(self): inst = self._makeOne() inst.strbuf = b"x" * 5 r = inst.get(5, skip=True) self.assertFalse(inst.buf is None) self.assertEqual(r, b"xxxxx") def test_skip_buf_None(self): inst = self._makeOne() inst.strbuf = b"data" inst.skip(4) self.assertEqual(inst.strbuf, b"") self.assertNotEqual(inst.buf, None) def test_skip_buf_None_allow_prune_True(self): inst = self._makeOne() inst.strbuf = b"data" inst.skip(4, True) self.assertEqual(inst.strbuf, b"") self.assertEqual(inst.buf, None) def test_prune_buf_None(self): inst = self._makeOne() inst.prune() self.assertEqual(inst.strbuf, b"") def test_prune_with_buf(self): inst = self._makeOne() class Buf: def prune(self): self.pruned = True inst.buf = Buf() inst.prune() self.assertEqual(inst.buf.pruned, True) self.buffers_to_close.remove(inst) def test_prune_with_buf_overflow(self): inst = self._makeOne() class DummyBuffer(io.BytesIO): def getfile(self): return self def prune(self): return True def __len__(self): return 5 def close(self): pass buf = DummyBuffer(b"data") inst.buf = buf inst.overflowed = True inst.overflow = 10 inst.prune() self.assertNotEqual(inst.buf, buf) def test_prune_with_buflen_more_than_max_int(self): from waitress.compat import MAXINT inst = self._makeOne() inst.overflowed = True buf = DummyBuffer(length=MAXINT + 1) inst.buf = buf result = inst.prune() # we don't want this to throw an OverflowError on Python 2 (see # https://github.com/Pylons/waitress/issues/47) self.assertEqual(result, None) def test_getfile_buf_None(self): inst = self._makeOne() f = inst.getfile() self.assertTrue(hasattr(f, "read")) def test_getfile_buf_not_None(self): inst = self._makeOne() buf = io.BytesIO() buf.getfile = lambda *x: buf inst.buf = buf f = inst.getfile() self.assertEqual(f, buf) def test_close_nobuf(self): inst = self._makeOne() inst.buf = None self.assertEqual(inst.close(), None) # doesnt raise self.buffers_to_close.remove(inst) def test_close_withbuf(self): class Buffer: def close(self): self.closed = True buf = Buffer() inst = self._makeOne() inst.buf = buf inst.close() self.assertTrue(buf.closed) self.buffers_to_close.remove(inst) class KindaFilelike: def __init__(self, bytes, close=None, tellresults=None): self.bytes = bytes self.tellresults = tellresults if close is not None: self.close = lambda: close class Filelike(KindaFilelike): def seek(self, v, whence=0): self.seeked = v def tell(self): v = self.tellresults.pop(0) return v class DummyBuffer: def __init__(self, length=0): self.length = length def __len__(self): return self.length def append(self, s): self.length = self.length + len(s) def prune(self): pass def close(self): pass