diff options
author | Chris McDonough <chrism@plope.com> | 2012-01-16 03:40:51 -0500 |
---|---|---|
committer | Chris McDonough <chrism@plope.com> | 2012-01-16 03:40:51 -0500 |
commit | 4b6e73772c8ddbd8effc710c421e108648cac1e5 (patch) | |
tree | 685eb64803a1e9bb9feaafc2450575b1e421fe64 | |
parent | e95e78158aa6d533044a3a80236a5db6fc70cdf1 (diff) | |
download | waitress-4b6e73772c8ddbd8effc710c421e108648cac1e5.tar.gz |
Support the WSGI ``wsgi.file_wrapper`` protocol as per
http://www.python.org/dev/peps/pep-0333/#optional-platform-specific-file-handling.
-rw-r--r-- | CHANGES.txt | 54 | ||||
-rw-r--r-- | docs/differences.rst | 2 | ||||
-rw-r--r-- | docs/filewrapper.rst | 52 | ||||
-rw-r--r-- | docs/index.rst | 1 | ||||
-rw-r--r-- | waitress/buffers.py | 40 | ||||
-rw-r--r-- | waitress/channel.py | 108 | ||||
-rw-r--r-- | waitress/parser.py | 5 | ||||
-rw-r--r-- | waitress/receiver.py | 7 | ||||
-rw-r--r-- | waitress/task.py | 15 | ||||
-rw-r--r-- | waitress/tests/fixtureapps/filewrapper.py | 52 | ||||
-rw-r--r-- | waitress/tests/fixtureapps/groundhog1.jpg | bin | 0 -> 45448 bytes | |||
-rw-r--r-- | waitress/tests/test_buffers.py | 80 | ||||
-rw-r--r-- | waitress/tests/test_channel.py | 109 | ||||
-rw-r--r-- | waitress/tests/test_functional.py | 153 | ||||
-rw-r--r-- | waitress/tests/test_parser.py | 17 | ||||
-rw-r--r-- | waitress/tests/test_receiver.py | 9 | ||||
-rw-r--r-- | waitress/tests/test_task.py | 34 |
17 files changed, 680 insertions, 58 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index dcee98c..4b834a5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,57 @@ +Next release +------------ + +Features +~~~~~~~~ + +- Support the WSGI ``wsgi.file_wrapper`` protocol as per + http://www.python.org/dev/peps/pep-0333/#optional-platform-specific-file-handling. + Here's a usage example:: + + import os + + here = os.path.dirname(os.path.abspath(__file__)) + + def myapp(environ, start_response): + f = open(os.path.join(here, 'myphoto.jpg'), 'rb') + headers = [('Content-Type', 'image/jpeg')] + start_response( + '200 OK', + headers + ) + return environ['wsgi.file_wrapper'](f, 32768) + + The signature of the file wrapper constructor is ``(filelike_object, + block_size)``. Both arguments must be passed as positional (not keyword) + arguments. The result of creating a file wrapper should be **returned** as + the ``app_iter`` from a WSGI application. + + The object passed as ``filelike_object`` to the wrapper must be a file-like + object which supports *at least* the ``read()`` method, and the ``read()`` + method must support an optional size hint argument. It *should* support + the ``seek()`` and ``tell()`` methods. If it does not, normal iteration + over the filelike object using the provided block_size is used (and copying + is done, negating any benefit of the file wrapper). It *should* support a + ``close()`` method. + + The specified ``block_size`` argument to the file wrapper constructor will + be used only when the ``filelike_object`` doesn't support ``seek`` and/or + ``tell`` methods. Waitress needs to use normal iteration to serve the file + in this degenerate case (as per the WSGI spec), and this block size will be + used as the iteration chunk size. The ``block_size`` argument is optional; + if it is not passed, a default value``32768`` is used. + + Waitress will set a ``Content-Length`` header on the behalf of an + application when a file wrapper with a sufficiently filelike object is used + if the application hasn't already set one. + + The machinery which handles a file wrapper currently doesn't do anything + particularly special using fancy system calls (it doesn't use ``sendfile`` + for example); using it currently just prevents the system from needing to + copy data to a temporary buffer in order to send it to the client. No + copying of data is done when a WSGI app returns a file wrapper that wraps a + sufficiently filelike object. It may do something fancier in the future. + 0.7 (2012-01-11) ---------------- diff --git a/docs/differences.rst b/docs/differences.rst index 1dd7d80..07783fa 100644 --- a/docs/differences.rst +++ b/docs/differences.rst @@ -61,3 +61,5 @@ Differences from ``zope.server`` Content-Length header. - Dont hang a thread up trying to send data to slow clients. + +- Supports ``wsgi.file_wrapper`` protocol. diff --git a/docs/filewrapper.rst b/docs/filewrapper.rst new file mode 100644 index 0000000..fe0037f --- /dev/null +++ b/docs/filewrapper.rst @@ -0,0 +1,52 @@ +Support for ``wsgi.file_wrapper`` +--------------------------------- + +Waitress supports the `WSGI file_wrapper protocol +<http://www.python.org/dev/peps/pep-0333/#optional-platform-specific-file-handling>`_ +. Here's a usage example:: + + import os + + here = os.path.dirname(os.path.abspath(__file__)) + + def myapp(environ, start_response): + f = open(os.path.join(here, 'myphoto.jpg'), 'rb') + headers = [('Content-Type', 'image/jpeg')] + start_response( + '200 OK', + headers + ) + return environ['wsgi.file_wrapper'](f, 32768) + +The file wrapper constructor is accessed via +``environ['wsgi.file_wrapper']``. The signature of the file wrapper +constructor is ``(filelike_object, block_size)``. Both arguments must be +passed as positional (not keyword) arguments. The result of creating a file +wrapper should be **returned** as the ``app_iter`` from a WSGI application. + +The object passed as ``filelike_object`` to the wrapper must be a file-like +object which supports *at least* the ``read()`` method, and the ``read()`` +method must support an optional size hint argument and the ``read()`` method +*must* return **bytes** objects (never unicode). It *should* support the +``seek()`` and ``tell()`` methods. If it does not, normal iteration over the +filelike object using the provided block_size is used (and copying is done, +negating any benefit of the file wrapper). It *should* support a ``close()`` +method. + +The specified ``block_size`` argument to the file wrapper constructor will be +used only when the ``filelike_object`` doesn't support ``seek`` and/or +``tell`` methods. Waitress needs to use normal iteration to serve the file +in this degenerate case (as per the WSGI pec), and this block size will be +used as the iteration chunk size. The ``block_size`` argument is optional; +if it is not passed, a default value``32768`` is used. + +Waitress will set a ``Content-Length`` header on the behalf of an application +when a file wrapper with a sufficiently filelike object is used if the +application hasn't already set one. + +The machinery which handles a file wrapper currently doesn't do anything +particularly special using fancy system calls (it doesn't use ``sendfile`` +for example); using it currently just prevents the system from needing to +copy data to a temporary buffer in order to send it to the client. No +copying of data is done when a WSGI app returns a file wrapper that wraps a +sufficiently filelike object. It may do something fancier in the future. diff --git a/docs/index.rst b/docs/index.rst index 6882731..4843bfb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -184,6 +184,7 @@ Extended Documentation differences.rst api.rst arguments.rst + filewrapper.rst glossary.rst Change History diff --git a/waitress/buffers.py b/waitress/buffers.py index 92891f5..6c0b290 100644 --- a/waitress/buffers.py +++ b/waitress/buffers.py @@ -15,8 +15,6 @@ """ from io import BytesIO -from waitress.compat import thread - # copy_bytes controls the size of temp. strings for shuffling data around. COPY_BYTES = 1 << 18 # 256K @@ -106,7 +104,12 @@ class FileBasedBuffer(object): def getfile(self): return self.file - + def _close(self): + # named _close because ReadOnlyFileBasedBuffer is used as + # wsgi file.wrapper, and its protocol reserves "close" + if hasattr(self.file, 'close'): + self.file.close() + self.remain = 0 class TempfileBasedBuffer(FileBasedBuffer): @@ -131,7 +134,31 @@ class BytesIOBasedBuffer(FileBasedBuffer): def newfile(self): return BytesIO() +class ReadOnlyFileBasedBuffer(FileBasedBuffer): + # used as wsgi.file_wrapper + def __init__(self, file, block_size=32768): + self.file = file + self.block_size = block_size # for __iter__ + + def prepare(self): + if ( hasattr(self.file, 'seek') and + hasattr(self.file, 'tell') ): + start_pos = self.file.tell() + self.file.seek(0, 2) + end_pos = self.file.tell() + self.file.seek(start_pos) + self.remain = end_pos - start_pos + return True + elif hasattr(self.file, 'close'): + # called by task if self.filelike has no seek/tell + self.close = self.file.close + return False + + def __iter__(self): # called by task if self.filelike has no seek/tell + return iter(lambda: self.file.read(self.block_size), b'') + def append(self, s): + raise NotImplementedError class OverflowableBuffer(object): """ @@ -150,7 +177,6 @@ class OverflowableBuffer(object): def __init__(self, overflow): # overflow is the maximum to be stored in a StringIO buffer. self.overflow = overflow - self.lock = thread.allocate_lock() # API def __len__(self): buf = self.buf @@ -240,3 +266,9 @@ class OverflowableBuffer(object): if buf is None: buf = self._create_buffer() return buf.getfile() + + def _close(self): + buf = self.buf + if buf is not None: + buf._close() + diff --git a/waitress/channel.py b/waitress/channel.py index 0797d47..4c15de9 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -11,14 +11,16 @@ # FOR A PARTICULAR PURPOSE. # ############################################################################## -"""Dual-mode channel -""" import asyncore import socket import time import traceback -from waitress.buffers import OverflowableBuffer +from waitress.buffers import ( + OverflowableBuffer, + ReadOnlyFileBasedBuffer, + ) + from waitress.parser import HTTPRequestParser from waitress.compat import thread @@ -34,13 +36,11 @@ from waitress.utilities import ( ) class HTTPChannel(logging_dispatcher, object): - """Channel that switches between asynchronous and synchronous mode. - - Set self.requests = [somerequest] before using a channel in a thread other - than the thread handling the main loop. + """ + Setting self.requests = [somerequest] prevents more requests from being + received until the out buffers have been flushed. - Set self.requests = [] to give the channel back to the thread handling - the main loop. + Setting self.requests = [] allows more requests to be received. """ task_class = WSGITask error_task_class = ErrorTask @@ -53,6 +53,7 @@ class HTTPChannel(logging_dispatcher, object): requests = () # currently pending requests sent_continue = False # used as a latch after sending 100 continue task_lock = thread.allocate_lock() # lock used to push/pop requests + outbuf_lock = thread.allocate_lock() # lock used to access any outbuf force_flush = False # indicates a need to flush the outbuf # @@ -70,15 +71,21 @@ class HTTPChannel(logging_dispatcher, object): self.server = server self.addr = addr self.adj = adj - self.outbuf = OverflowableBuffer(adj.outbuf_overflow) + self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)] self.creation_time = self.last_activity = time.time() asyncore.dispatcher.__init__(self, sock, map=map) + def any_outbuf_has_data(self): + return any(bool(b) for b in self.outbufs) + + def total_outbufs_len(self): + return sum(len(b) for b in self.outbufs) + def writable(self): # if there's data in the out buffer or we've been instructed to close # the channel (possibly by our server maintenance logic), run # handle_write - return bool(self.outbuf) or self.will_close + return self.any_outbuf_has_data() or self.will_close def handle_write(self): # Precondition: there's data in the out buffer to be sent, or @@ -104,7 +111,7 @@ class HTTPChannel(logging_dispatcher, object): # won't get done. flush = self._flush_some_if_lockable self.force_flush = False - elif (len(self.outbuf) >= self.adj.send_bytes): + elif (self.total_outbufs_len() >= self.adj.send_bytes): # 1. There's a running task, so we need to try to lock # the outbuf before sending # 2. Only try to send if the data in the out buffer is larger @@ -122,8 +129,11 @@ class HTTPChannel(logging_dispatcher, object): if self.adj.log_socket_errors: self.logger.exception('Socket error') self.will_close = True + except: + self.logger.exception('Unexpected exception when flushing') + self.will_close = True - if self.close_when_flushed and not self.outbuf: + if self.close_when_flushed and not self.any_outbuf_has_data(): self.close_when_flushed = False self.will_close = True @@ -136,7 +146,8 @@ class HTTPChannel(logging_dispatcher, object): # 2. There's no already currently running task(s). # 3. There's no data in the output buffer that needs to be sent # before we potentially create a new task. - return not (self.will_close or self.requests or self.outbuf) + return not (self.will_close or self.requests or + self.any_outbuf_has_data()) def handle_read(self): try: @@ -152,7 +163,8 @@ class HTTPChannel(logging_dispatcher, object): def received(self, data): """ - Receives input asynchronously and assigns a task to the channel. + Receives input asynchronously and assigns one or more requests to the + channel. """ # Preconditions: there's no task(s) already running request = self.request @@ -171,7 +183,7 @@ class HTTPChannel(logging_dispatcher, object): if not self.sent_continue: # there's no current task, so we don't need to try to # lock the outbuf to append to it. - self.outbuf.append(b'HTTP/1.1 100 Continue\r\n\r\n') + self.outbufs[-1].append(b'HTTP/1.1 100 Continue\r\n\r\n') self.sent_expect_continue = True self._flush_some() request.completed = False @@ -196,34 +208,58 @@ class HTTPChannel(logging_dispatcher, object): def _flush_some_if_lockable(self): # Since our task may be appending to the outbuf, we try to acquire # the lock, but we don't block if we can't. - outbuf = self.outbuf - locked = outbuf.lock.acquire(0) + locked = self.outbuf_lock.acquire(0) if locked: try: self._flush_some() finally: - outbuf.lock.release() + self.outbuf_lock.release() def _flush_some(self): # Send as much data as possible to our client - outbuf = self.outbuf - outbuflen = len(outbuf) + sent = 0 - while outbuflen > 0: - chunk = outbuf.get(self.adj.send_bytes) - num_sent = self.send(chunk) - if num_sent: - outbuf.skip(num_sent, True) - outbuflen -= num_sent - sent += num_sent - else: + dobreak = False + + while True: + + outbuf = self.outbufs[0] + outbuflen = len(outbuf) + if outbuflen <= 0: + if len(self.outbufs) > 1: + toclose = self.outbufs.pop(0) + toclose._close() + continue + else: + dobreak = True + + while outbuflen > 0: + chunk = outbuf.get(self.adj.send_bytes) + num_sent = self.send(chunk) + if num_sent: + outbuf.skip(num_sent, True) + outbuflen -= num_sent + sent += num_sent + else: + dobreak = True + break + + if dobreak: break + if sent: self.last_activity = time.time() return True + return False def handle_close(self): + for outbuf in self.outbufs: + try: + outbuf._close() + except: + self.logger.exception( + 'Unknown exception while trying to close outbuf') self.connected = False asyncore.dispatcher.close(self) @@ -254,8 +290,13 @@ class HTTPChannel(logging_dispatcher, object): if data: # the async mainloop might be popping data off outbuf; we can # block here waiting for it because we're in a thread - with self.outbuf.lock: - self.outbuf.append(data) + with self.outbuf_lock: + if isinstance(data, ReadOnlyFileBasedBuffer): + self.outbufs.append(data) + nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) + self.outbufs.append(nextbuf) + else: + self.outbufs[-1].append(data) # XXX We might eventually need to pull the trigger here (to # instruct select to stop blocking), but it slows things down so # much that I'll hold off for now; "server push" on otherwise @@ -293,9 +334,12 @@ class HTTPChannel(logging_dispatcher, object): # here; otherwise the mainloop gets confused if task.close_on_finish: self.close_when_flushed = True + for request in self.requests: + request._close() self.requests = [] else: - self.requests.pop(0) + request = self.requests.pop(0) + request._close() self.force_flush = True self.server.pull_trigger() diff --git a/waitress/parser.py b/waitress/parser.py index ccfd9ca..d5865fa 100644 --- a/waitress/parser.py +++ b/waitress/parser.py @@ -219,6 +219,11 @@ class HTTPRequestParser(object): else: return BytesIO() + def _close(self): + body_rcv = self.body_rcv + if body_rcv is not None: + body_rcv.getbuf()._close() + def split_uri(uri): # urlsplit handles byte input by returning bytes on py3, so # scheme, netloc, path, query, and fragment are bytes diff --git a/waitress/receiver.py b/waitress/receiver.py index e0fb55d..be3c2f4 100644 --- a/waitress/receiver.py +++ b/waitress/receiver.py @@ -48,6 +48,9 @@ class FixedStreamReceiver(object): def getfile(self): return self.buf.getfile() + def getbuf(self): + return self.buf + class ChunkedReceiver(object): chunk_remainder = 0 @@ -137,3 +140,7 @@ class ChunkedReceiver(object): def getfile(self): return self.buf.getfile() + + def getbuf(self): + return self.buf + diff --git a/waitress/task.py b/waitress/task.py index 9f2ef14..e87363f 100644 --- a/waitress/task.py +++ b/waitress/task.py @@ -16,6 +16,8 @@ import socket import sys import time +from waitress.buffers import ReadOnlyFileBasedBuffer + from waitress.compat import ( tobytes, Queue, @@ -313,8 +315,7 @@ class ErrorTask(Task): self.write(tobytes(body)) class WSGITask(Task): - """A WSGI task accepts a request and writes to a channel. - + """A WSGI task produces a response from a WSGI application. """ environ = None @@ -374,6 +375,14 @@ class WSGITask(Task): app_iter = self.channel.server.application(env, start_response) try: + if app_iter.__class__ is ReadOnlyFileBasedBuffer: + ok = app_iter.prepare() + if ok: + if self.content_length == -1: + self.content_length = len(app_iter) + self.write(b'') # generate headers + self.channel.write_soon(app_iter) + return # By iterating manually at this point, we execute task.write() # multiple times, allowing partial data to be sent. first_chunk_len = None @@ -453,6 +462,8 @@ class WSGITask(Task): environ['wsgi.multiprocess'] = False environ['wsgi.run_once'] = False environ['wsgi.input'] = request.get_body_stream() + environ['wsgi.file_wrapper'] = ReadOnlyFileBasedBuffer self.environ = environ return environ + diff --git a/waitress/tests/fixtureapps/filewrapper.py b/waitress/tests/fixtureapps/filewrapper.py new file mode 100644 index 0000000..4cc9740 --- /dev/null +++ b/waitress/tests/fixtureapps/filewrapper.py @@ -0,0 +1,52 @@ +import os + +here = os.path.dirname(os.path.abspath(__file__)) +fn = os.path.join(here, 'groundhog1.jpg') + +class KindaFilelike(object): + def __init__(self, bytes): + self.bytes = bytes + + def read(self, n): + bytes = self.bytes[:n] + self.bytes = self.bytes[n:] + return bytes + +def app(environ, start_response): + if environ['PATH_INFO'].startswith('/filelike'): + f = open(fn, 'rb') + f.seek(0, 2) + cl = f.tell() + f.seek(0) + if environ['PATH_INFO'] == '/filelike': + headers = [ + ('Content-Length', str(cl)), ('Content-Type', 'image/jpeg') + ] + else: + headers = [('Content-Type', 'image/jpeg')] + else: + data = open(fn, 'rb').read() + f = KindaFilelike(data) + if environ['PATH_INFO'] == '/notfilelike': + headers = [('Content-Length', str(len(data))), + ('Content-Type', 'image/jpeg')] + + else: + headers = [('Content-Type', 'image/jpeg')] + + start_response( + '200 OK', + headers + ) + return environ['wsgi.file_wrapper'](f, 8192) + +if __name__ == '__main__': + import logging + class NullHandler(logging.Handler): + def emit(self, record): + pass + h = NullHandler() + logging.getLogger('waitress').addHandler(h) + from waitress import serve + serve(app, port=61523, _quiet=True) + diff --git a/waitress/tests/fixtureapps/groundhog1.jpg b/waitress/tests/fixtureapps/groundhog1.jpg Binary files differnew file mode 100644 index 0000000..90f610e --- /dev/null +++ b/waitress/tests/fixtureapps/groundhog1.jpg diff --git a/waitress/tests/test_buffers.py b/waitress/tests/test_buffers.py index 19d79df..4f9bf42 100644 --- a/waitress/tests/test_buffers.py +++ b/waitress/tests/test_buffers.py @@ -103,6 +103,12 @@ class TestFileBasedBuffer(unittest.TestCase): inst.prune() self.assertTrue(inst.file is f) + def test__close(self): + f = io.BytesIO() + inst = self._makeOne(f) + inst._close() + self.assertTrue(f.closed) + class TestTempfileBasedBuffer(unittest.TestCase): def _makeOne(self, from_buffer=None): from waitress.buffers import TempfileBasedBuffer @@ -133,6 +139,49 @@ class TestBytesIOBasedBuffer(unittest.TestCase): r = inst.newfile() self.assertTrue(hasattr(r, 'read')) +class TestReadOnlyFileBasedBuffer(unittest.TestCase): + def _makeOne(self, file, block_size=8192): + from waitress.buffers import ReadOnlyFileBasedBuffer + return ReadOnlyFileBasedBuffer(file, block_size) + + def test_prepare_not_seekable_not_closeable(self): + f = KindaFilelike(b'abc') + inst = self._makeOne(f) + result = inst.prepare() + self.assertEqual(result, False) + self.assertEqual(inst.remain, 0) + self.assertFalse(hasattr(inst, 'close')) + + def test_prepare_not_seekable_closeable(self): + f = KindaFilelike(b'abc', close=1) + inst = self._makeOne(f) + result = inst.prepare() + self.assertEqual(result, False) + self.assertEqual(inst.remain, 0) + self.assertEqual(inst.close, f.close) + + def test_prepare_seekable_closeable(self): + f = Filelike(b'abc', close=1, tellresults=[0, 10]) + inst = self._makeOne(f) + result = inst.prepare() + self.assertEqual(result, True) + self.assertEqual(inst.remain, 10) + self.assertEqual(inst.file.seeked, 0) + self.assertFalse(hasattr(inst, 'close')) + + def test___iter__(self): + data = b'a'*10000 + f = io.BytesIO(data) + inst = self._makeOne(f) + r = b'' + for val in inst: + r += val + self.assertEqual(r, data) + + def test_append(self): + inst = self._makeOne(None) + self.assertRaises(NotImplementedError, inst.append, 'a') + class TestOverflowableBuffer(unittest.TestCase): def _makeOne(self, overflow=10): from waitress.buffers import OverflowableBuffer @@ -270,6 +319,35 @@ class TestOverflowableBuffer(unittest.TestCase): inst.buf = buf f = inst.getfile() self.assertEqual(f, buf) + + def test__close_nobuf(self): + inst = self._makeOne() + inst.buf = None + self.assertEqual(inst._close(), None) # doesnt raise + + def test__close_withbuf(self): + class Buffer(object): + def _close(self): + self.closed = True + buf = Buffer() + inst = self._makeOne() + inst.buf = buf + inst._close() + self.assertTrue(buf.closed) - +class KindaFilelike(object): + def __init__(self, bytes, close=None, tellresults=None): + self.bytes = bytes + self.tellresults = tellresults + if close is not None: + self.close = close + +class Filelike(KindaFilelike): + def seek(self, v, whence=0): + self.seeked = v + + def tell(self): + v = self.tellresults.pop(0) + return v + diff --git a/waitress/tests/test_channel.py b/waitress/tests/test_channel.py index 9376fde..835681c 100644 --- a/waitress/tests/test_channel.py +++ b/waitress/tests/test_channel.py @@ -1,4 +1,5 @@ import unittest +import io class TestHTTPChannel(unittest.TestCase): def _makeOne(self, sock, addr, adj, map=None): @@ -12,6 +13,7 @@ class TestHTTPChannel(unittest.TestCase): sock = DummySock() map = {} inst = self._makeOne(sock, '127.0.0.1', adj, map=map) + inst.outbuf_lock = DummyLock() return inst, sock, map def test_ctor(self): @@ -21,23 +23,20 @@ class TestHTTPChannel(unittest.TestCase): def test_writable_something_in_outbuf(self): inst, sock, map = self._makeOneWithMap() - inst.outbuf = 'abc' + inst.outbufs[0].append(b'abc') self.assertTrue(inst.writable()) def test_writable_nothing_in_outbuf(self): inst, sock, map = self._makeOneWithMap() - inst.outbuf = '' self.assertFalse(inst.writable()) def test_writable_nothing_in_outbuf_will_close(self): inst, sock, map = self._makeOneWithMap() - inst.outbuf = '' inst.will_close = True self.assertTrue(inst.writable()) def test_handle_write_not_connected(self): inst, sock, map = self._makeOneWithMap() - inst.outbuf = '' inst.connected = False self.assertFalse(inst.handle_write()) @@ -52,7 +51,7 @@ class TestHTTPChannel(unittest.TestCase): def test_handle_write_no_request_with_outbuf(self): inst, sock, map = self._makeOneWithMap() inst.requests = [] - inst.outbuf = DummyBuffer(b'abc') + inst.outbufs = [DummyBuffer(b'abc')] inst.last_activity = 0 result = inst.handle_write() self.assertEqual(result, None) @@ -63,7 +62,8 @@ class TestHTTPChannel(unittest.TestCase): import socket inst, sock, map = self._makeOneWithMap() inst.requests = [] - inst.outbuf = DummyBuffer(b'abc', socket.error) + outbuf = DummyBuffer(b'abc', socket.error) + inst.outbufs = [outbuf] inst.last_activity = 0 inst.logger = DummyLogger() result = inst.handle_write() @@ -71,11 +71,27 @@ class TestHTTPChannel(unittest.TestCase): self.assertEqual(inst.last_activity, 0) self.assertEqual(sock.sent, b'') self.assertEqual(len(inst.logger.exceptions), 1) + self.assertTrue(outbuf.closed) + + def test_handle_write_outbuf_raises_othererror(self): + inst, sock, map = self._makeOneWithMap() + inst.requests = [] + outbuf = DummyBuffer(b'abc', IOError) + inst.outbufs = [outbuf] + inst.last_activity = 0 + inst.logger = DummyLogger() + result = inst.handle_write() + self.assertEqual(result, None) + self.assertEqual(inst.last_activity, 0) + self.assertEqual(sock.sent, b'') + self.assertEqual(len(inst.logger.exceptions), 1) + self.assertTrue(outbuf.closed) def test_handle_write_no_requests_no_outbuf_will_close(self): inst, sock, map = self._makeOneWithMap() inst.requests = [] - inst.outbuf = DummyBuffer(b'') + outbuf = DummyBuffer(b'') + inst.outbufs = [outbuf] inst.will_close = True inst.last_activity = 0 result = inst.handle_write() @@ -83,37 +99,39 @@ class TestHTTPChannel(unittest.TestCase): self.assertEqual(inst.connected, False) self.assertEqual(sock.closed, True) self.assertEqual(inst.last_activity, 0) + self.assertTrue(outbuf.closed) def test_handle_write_no_requests_force_flush(self): inst, sock, map = self._makeOneWithMap() inst.requests = [True] - inst.outbuf = DummyBuffer(b'abc') + inst.outbufs = [DummyBuffer(b'abc')] inst.will_close = False inst.force_flush = True inst.last_activity = 0 result = inst.handle_write() self.assertEqual(result, None) self.assertEqual(inst.will_close, False) - self.assertTrue(inst.outbuf.lock.acquired) + self.assertTrue(inst.outbuf_lock.acquired) self.assertEqual(inst.force_flush, False) self.assertEqual(sock.sent, b'abc') def test_handle_write_no_requests_outbuf_gt_send_bytes(self): inst, sock, map = self._makeOneWithMap() inst.requests = [True] - inst.outbuf = DummyBuffer(b'abc') + inst.outbufs = [DummyBuffer(b'abc')] inst.adj.send_bytes = 2 inst.will_close = False inst.last_activity = 0 result = inst.handle_write() self.assertEqual(result, None) self.assertEqual(inst.will_close, False) - self.assertTrue(inst.outbuf.lock.acquired) + self.assertTrue(inst.outbuf_lock.acquired) self.assertEqual(sock.sent, b'abc') def test_handle_write_close_when_flushed(self): inst, sock, map = self._makeOneWithMap() - inst.outbuf = DummyBuffer(b'abc') + outbuf = DummyBuffer(b'abc') + inst.outbufs = [outbuf] inst.will_close = False inst.close_when_flushed = True inst.last_activity = 0 @@ -122,6 +140,7 @@ class TestHTTPChannel(unittest.TestCase): self.assertEqual(inst.will_close, True) self.assertEqual(inst.close_when_flushed, False) self.assertEqual(sock.sent, b'abc') + self.assertTrue(outbuf.closed) def test_readable_no_requests_not_will_close(self): inst, sock, map = self._makeOneWithMap() @@ -169,13 +188,28 @@ class TestHTTPChannel(unittest.TestCase): inst, sock, map = self._makeOneWithMap() wrote = inst.write_soon(b'') self.assertEqual(wrote, 0) - self.assertEqual(len(inst.outbuf), 0) + self.assertEqual(len(inst.outbufs[0]), 0) def test_write_soon_nonempty_byte(self): inst, sock, map = self._makeOneWithMap() wrote = inst.write_soon(b'a') self.assertEqual(wrote, 1) - self.assertEqual(len(inst.outbuf), 1) + self.assertEqual(len(inst.outbufs[0]), 1) + + def test_write_soon_filewrapper(self): + from waitress.buffers import ReadOnlyFileBasedBuffer + f = io.BytesIO(b'abc') + wrapper = ReadOnlyFileBasedBuffer(f, 8192) + wrapper.prepare() + inst, sock, map = self._makeOneWithMap() + outbufs = inst.outbufs + orig_outbuf = outbufs[0] + wrote = inst.write_soon(wrapper) + self.assertEqual(wrote, 3) + self.assertEqual(len(outbufs), 3) + self.assertEqual(outbufs[0], orig_outbuf) + self.assertEqual(outbufs[1], wrapper) + self.assertEqual(outbufs[2].__class__.__name__, 'OverflowableBuffer') def test__flush_some_empty_outbuf(self): inst, sock, map = self._makeOneWithMap() @@ -184,23 +218,44 @@ class TestHTTPChannel(unittest.TestCase): def test__flush_some_full_outbuf_socket_returns_nonzero(self): inst, sock, map = self._makeOneWithMap() - inst.outbuf.append(b'abc') + inst.outbufs[0].append(b'abc') result = inst._flush_some() self.assertEqual(result, True) def test__flush_some_full_outbuf_socket_returns_zero(self): inst, sock, map = self._makeOneWithMap() sock.send = lambda x: False - inst.outbuf.append(b'abc') + inst.outbufs[0].append(b'abc') result = inst._flush_some() self.assertEqual(result, False) + def test_flush_some_multiple_buffers_first_empty(self): + inst, sock, map = self._makeOneWithMap() + sock.send = lambda x: len(x) + buffer = DummyBuffer(b'abc') + inst.outbufs.append(buffer) + result = inst._flush_some() + self.assertEqual(result, True) + self.assertEqual(buffer.skipped, 3) + self.assertEqual(inst.outbufs, [buffer]) + def test_handle_close(self): inst, sock, map = self._makeOneWithMap() inst.handle_close() self.assertEqual(inst.connected, False) self.assertEqual(sock.closed, True) + def test_handle_close_outbuf_raises_on_close(self): + inst, sock, map = self._makeOneWithMap() + def doraise(): + raise NotImplementedError + inst.outbufs[0]._close = doraise + inst.logger = DummyLogger() + inst.handle_close() + self.assertEqual(inst.connected, False) + self.assertEqual(sock.closed, True) + self.assertEqual(len(inst.logger.exceptions), 1) + def test_add_channel(self): inst, sock, map = self._makeOneWithMap() fileno = inst._fileno @@ -300,7 +355,7 @@ class TestHTTPChannel(unittest.TestCase): inst.received(b'GET / HTTP/1.1\n\n') self.assertEqual(inst.request, preq) self.assertEqual(inst.server.tasks, []) - self.assertEqual(inst.outbuf.get(100), b'') + self.assertEqual(inst.outbufs[0].get(100), b'') def test_received_headers_finished_expect_continue(self): inst, sock, map = self._makeOneWithMap() @@ -334,6 +389,7 @@ class TestHTTPChannel(unittest.TestCase): inst.service() self.assertEqual(inst.requests, []) self.assertTrue(request.serviced) + self.assertTrue(request.closed) def test_service_with_one_error_request(self): inst, sock, map = self._makeOneWithMap() @@ -344,6 +400,7 @@ class TestHTTPChannel(unittest.TestCase): inst.service() self.assertEqual(inst.requests, []) self.assertTrue(request.serviced) + self.assertTrue(request.closed) def test_service_with_multiple_requests(self): inst, sock, map = self._makeOneWithMap() @@ -355,6 +412,8 @@ class TestHTTPChannel(unittest.TestCase): self.assertEqual(inst.requests, []) self.assertTrue(request1.serviced) self.assertTrue(request2.serviced) + self.assertTrue(request1.closed) + self.assertTrue(request2.closed) def test_service_with_request_raises(self): inst, sock, map = self._makeOneWithMap() @@ -374,6 +433,7 @@ class TestHTTPChannel(unittest.TestCase): self.assertTrue(inst.last_activity) self.assertFalse(inst.will_close) self.assertEqual(inst.error_task_class.serviced, True) + self.assertTrue(request.closed) def test_service_with_requests_raises_already_wrote_header(self): inst, sock, map = self._makeOneWithMap() @@ -392,6 +452,7 @@ class TestHTTPChannel(unittest.TestCase): self.assertTrue(inst.last_activity) self.assertTrue(inst.close_when_flushed) self.assertEqual(inst.error_task_class.serviced, False) + self.assertTrue(request.closed) def test_service_with_requests_raises_didnt_write_header_expose_tbs(self): inst, sock, map = self._makeOneWithMap() @@ -411,6 +472,7 @@ class TestHTTPChannel(unittest.TestCase): self.assertTrue(inst.force_flush) self.assertTrue(inst.last_activity) self.assertEqual(inst.error_task_class.serviced, True) + self.assertTrue(request.closed) def test_service_with_requests_raises_didnt_write_header(self): inst, sock, map = self._makeOneWithMap() @@ -428,6 +490,7 @@ class TestHTTPChannel(unittest.TestCase): self.assertTrue(inst.force_flush) self.assertTrue(inst.last_activity) self.assertTrue(inst.close_when_flushed) + self.assertTrue(request.closed) def test_cancel_no_requests(self): inst, sock, map = self._makeOneWithMap() @@ -471,12 +534,16 @@ class DummyLock(object): return self.acquirable def release(self): self.released = True + def __exit__(self, type, val, traceback): + self.acquire(True) + def __enter__(self): + pass class DummyBuffer(object): + closed = False def __init__(self, data, toraise=None): self.data = data self.toraise = toraise - self.lock = DummyLock() def get(self, *arg): if self.toraise: @@ -491,6 +558,9 @@ class DummyBuffer(object): def __len__(self): return len(self.data) + def _close(self): + self.closed = True + class DummyAdjustments(object): outbuf_overflow = 1048576 inbuf_overflow = 512000 @@ -535,8 +605,11 @@ class DummyRequest(object): error = None path = '/' version = '1.0' + closed = False def __init__(self): self.headers = {} + def _close(self): + self.closed = True class DummyLogger(object): def __init__(self): diff --git a/waitress/tests/test_functional.py b/waitress/tests/test_functional.py index 1d78b87..40dc4a1 100644 --- a/waitress/tests/test_functional.py +++ b/waitress/tests/test_functional.py @@ -925,6 +925,159 @@ class TestInternalServerError(SubprocessTests, unittest.TestCase): self.sock.send(to_send) self.assertRaises(ConnectionClosed, read_http, fp) +class TestFileWrapper(SubprocessTests, unittest.TestCase): + def setUp(self): + echo = os.path.join(here, 'fixtureapps', 'filewrapper.py') + self.start_subprocess([self.exe, echo]) + + def tearDown(self): + self.stop_subprocess() + + def test_filelike_http11(self): + to_send = "GET /filelike HTTP/1.1\n\n" + to_send = tobytes(to_send) + + self.sock.connect((self.host, self.port)) + + for t in range(0, 2): + self.sock.send(to_send) + fp = self.sock.makefile('rb', 0) + line, headers, response_body = read_http(fp) + self.assertline(line, '200', 'OK', 'HTTP/1.1') + cl = int(headers['content-length']) + self.assertEqual(cl, len(response_body)) + ct = headers['content-type'] + self.assertEqual(ct, 'image/jpeg') + self.assertTrue(b'\377\330\377' in response_body) + # connection has not been closed + + def test_filelike_nocl_http11(self): + to_send = "GET /filelike_nocl HTTP/1.1\n\n" + to_send = tobytes(to_send) + + self.sock.connect((self.host, self.port)) + + for t in range(0, 2): + self.sock.send(to_send) + fp = self.sock.makefile('rb', 0) + line, headers, response_body = read_http(fp) + self.assertline(line, '200', 'OK', 'HTTP/1.1') + cl = int(headers['content-length']) + self.assertEqual(cl, len(response_body)) + ct = headers['content-type'] + self.assertEqual(ct, 'image/jpeg') + self.assertTrue(b'\377\330\377' in response_body) + # connection has not been closed + + def test_notfilelike_http11(self): + to_send = "GET /notfilelike HTTP/1.1\n\n" + to_send = tobytes(to_send) + + self.sock.connect((self.host, self.port)) + + for t in range(0, 2): + self.sock.send(to_send) + fp = self.sock.makefile('rb', 0) + line, headers, response_body = read_http(fp) + self.assertline(line, '200', 'OK', 'HTTP/1.1') + cl = int(headers['content-length']) + self.assertEqual(cl, len(response_body)) + ct = headers['content-type'] + self.assertEqual(ct, 'image/jpeg') + self.assertTrue(b'\377\330\377' in response_body) + # connection has not been closed + + def test_notfilelike_nocl_http11(self): + to_send = "GET /notfilelike_nocl HTTP/1.1\n\n" + to_send = tobytes(to_send) + + self.sock.connect((self.host, self.port)) + + self.sock.send(to_send) + fp = self.sock.makefile('rb', 0) + line, headers, response_body = read_http(fp) + self.assertline(line, '200', 'OK', 'HTTP/1.1') + ct = headers['content-type'] + self.assertEqual(ct, 'image/jpeg') + self.assertTrue(b'\377\330\377' in response_body) + # connection has been closed (no content-length) + self.sock.send(to_send) + self.assertRaises(ConnectionClosed, read_http, fp) + + def test_filelike_http10(self): + to_send = "GET /filelike HTTP/1.0\n\n" + to_send = tobytes(to_send) + + self.sock.connect((self.host, self.port)) + + self.sock.send(to_send) + fp = self.sock.makefile('rb', 0) + line, headers, response_body = read_http(fp) + self.assertline(line, '200', 'OK', 'HTTP/1.0') + cl = int(headers['content-length']) + self.assertEqual(cl, len(response_body)) + ct = headers['content-type'] + self.assertEqual(ct, 'image/jpeg') + self.assertTrue(b'\377\330\377' in response_body) + # connection has been closed + self.sock.send(to_send) + self.assertRaises(ConnectionClosed, read_http, fp) + + def test_filelike_nocl_http10(self): + to_send = "GET /filelike_nocl HTTP/1.0\n\n" + to_send = tobytes(to_send) + + self.sock.connect((self.host, self.port)) + + self.sock.send(to_send) + fp = self.sock.makefile('rb', 0) + line, headers, response_body = read_http(fp) + self.assertline(line, '200', 'OK', 'HTTP/1.0') + cl = int(headers['content-length']) + self.assertEqual(cl, len(response_body)) + ct = headers['content-type'] + self.assertEqual(ct, 'image/jpeg') + self.assertTrue(b'\377\330\377' in response_body) + # connection has been closed + self.sock.send(to_send) + self.assertRaises(ConnectionClosed, read_http, fp) + + def test_notfilelike_http10(self): + to_send = "GET /notfilelike HTTP/1.0\n\n" + to_send = tobytes(to_send) + + self.sock.connect((self.host, self.port)) + + self.sock.send(to_send) + fp = self.sock.makefile('rb', 0) + line, headers, response_body = read_http(fp) + self.assertline(line, '200', 'OK', 'HTTP/1.0') + cl = int(headers['content-length']) + self.assertEqual(cl, len(response_body)) + ct = headers['content-type'] + self.assertEqual(ct, 'image/jpeg') + self.assertTrue(b'\377\330\377' in response_body) + # connection has been closed + self.sock.send(to_send) + self.assertRaises(ConnectionClosed, read_http, fp) + + def test_notfilelike_nocl_http10(self): + to_send = "GET /notfilelike_nocl HTTP/1.0\n\n" + to_send = tobytes(to_send) + + self.sock.connect((self.host, self.port)) + + self.sock.send(to_send) + fp = self.sock.makefile('rb', 0) + line, headers, response_body = read_http(fp) + self.assertline(line, '200', 'OK', 'HTTP/1.0') + ct = headers['content-type'] + self.assertEqual(ct, 'image/jpeg') + self.assertTrue(b'\377\330\377' in response_body) + # connection has been closed (no content-length) + self.sock.send(to_send) + self.assertRaises(ConnectionClosed, read_http, fp) + def parse_headers(fp): """Parses only RFC2822 headers from a file pointer. """ diff --git a/waitress/tests/test_parser.py b/waitress/tests/test_parser.py index 7e7268b..c10dab3 100644 --- a/waitress/tests/test_parser.py +++ b/waitress/tests/test_parser.py @@ -169,6 +169,16 @@ foo: bar""" self.parser.parse_header(data) self.assertEqual(self.parser.connection_close, True) + def test__close_with_body_rcv(self): + body_rcv = DummyBodyStream() + self.parser.body_rcv = body_rcv + self.parser._close() + self.assertTrue(body_rcv.closed) + + def test__close_with_no_body_rcv(self): + self.parser.body_rcv = None + self.parser._close() # doesn't raise + class Test_split_uri(unittest.TestCase): def _callFUT(self, uri): from waitress.parser import split_uri @@ -351,3 +361,10 @@ Hello. class DummyBodyStream(object): def getfile(self): return self + + def getbuf(self): + return self + + def _close(self): + self.closed = True + diff --git a/waitress/tests/test_receiver.py b/waitress/tests/test_receiver.py index a7b0e93..c641971 100644 --- a/waitress/tests/test_receiver.py +++ b/waitress/tests/test_receiver.py @@ -36,6 +36,11 @@ class TestFixedStreamReceiver(unittest.TestCase): inst = self._makeOne(10, buf) self.assertEqual(inst.getfile(), buf) + def test_getbuf(self): + buf = DummyBuffer() + inst = self._makeOne(10, buf) + self.assertEqual(inst.getbuf(), buf) + class TestChunkedReceiver(unittest.TestCase): def _makeOne(self, buf): from waitress.receiver import ChunkedReceiver @@ -130,6 +135,10 @@ class TestChunkedReceiver(unittest.TestCase): inst = self._makeOne(buf) self.assertEqual(inst.getfile(), buf) + def test_getbuf(self): + buf = DummyBuffer() + inst = self._makeOne(buf) + self.assertEqual(inst.getbuf(), buf) class DummyBuffer(object): def __init__(self): diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py index dfce7e6..e4f1f0a 100644 --- a/waitress/tests/test_task.py +++ b/waitress/tests/test_task.py @@ -1,4 +1,5 @@ import unittest +import io class TestThreadedTaskDispatcher(unittest.TestCase): def _makeOne(self): @@ -463,6 +464,33 @@ class TestWSGITask(unittest.TestCase): inst.execute() self.assertEqual(foo.closed, True) + def test_execute_app_returns_filewrapper_prepare_returns_True(self): + from waitress.buffers import ReadOnlyFileBasedBuffer + f = io.BytesIO(b'abc') + app_iter = ReadOnlyFileBasedBuffer(f, 8192) + def app(environ, start_response): + start_response('200 OK', [('Content-Length', '3')]) + return app_iter + inst = self._makeOne() + inst.channel.server.application = app + inst.execute() + self.assertTrue(inst.channel.written) # header + self.assertEqual(inst.channel.otherdata, [app_iter]) + + def test_execute_app_returns_filewrapper_prepare_returns_True_nocl(self): + from waitress.buffers import ReadOnlyFileBasedBuffer + f = io.BytesIO(b'abc') + app_iter = ReadOnlyFileBasedBuffer(f, 8192) + def app(environ, start_response): + start_response('200 OK', []) + return app_iter + inst = self._makeOne() + inst.channel.server.application = app + inst.execute() + self.assertTrue(inst.channel.written) # header + self.assertEqual(inst.channel.otherdata, [app_iter]) + self.assertEqual(inst.content_length, 3) + def test_get_environment_already_cached(self): inst = self._makeOne() inst.environ = object() @@ -601,8 +629,12 @@ class DummyChannel(object): server = DummyServer() self.server = server self.written = b'' + self.otherdata = [] def write_soon(self, data): - self.written += data + if isinstance(data, bytes): + self.written += data + else: + self.otherdata.append(data) return len(data) class DummyParser(object): |