diff options
Diffstat (limited to 'test/test_sendfile.py')
-rw-r--r-- | test/test_sendfile.py | 67 |
1 files changed, 39 insertions, 28 deletions
diff --git a/test/test_sendfile.py b/test/test_sendfile.py index 01ab5fc..ce223b1 100644 --- a/test/test_sendfile.py +++ b/test/test_sendfile.py @@ -1,12 +1,9 @@ #!/usr/bin/env python -# -# $Id$ -# # ====================================================================== # This software is distributed under the MIT license reproduced below: # -# Copyright (C) 2009-2012 Giampaolo Rodola' <g.rodola@gmail.com> +# Copyright (C) 2009-2014 Giampaolo Rodola' <g.rodola@gmail.com> # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby @@ -35,22 +32,23 @@ during tests. from __future__ import with_statement -import unittest +import asynchat +import asyncore +import atexit +import errno +import optparse import os -import sys import socket -import asyncore -import asynchat +import sys import threading -import errno import time -import atexit +import unittest import warnings -import optparse import sendfile -PY3 = sys.version_info >= (3,) +PY3 = sys.version_info >= (3, ) + def b(x): if PY3: @@ -72,12 +70,14 @@ except TypeError: except Exception: SUPPORT_HEADER_TRAILER = True + def safe_remove(file): try: os.remove(file) except OSError: pass + def has_large_file_support(): # taken from Python's Lib/test/test_largefile.py with open(TESTFN, 'wb', buffering=0) as f: @@ -150,7 +150,6 @@ class Server(asyncore.dispatcher, threading.Thread): self.handler_instance = None self._active = False self._active_lock = threading.Lock() - # --- public API @property @@ -174,7 +173,6 @@ class Server(asyncore.dispatcher, threading.Thread): while not getattr(self.handler_instance, "closed", True): time.sleep(0.001) self.stop() - # --- internals def run(self): @@ -201,7 +199,8 @@ class Server(asyncore.dispatcher, threading.Thread): raise -def sendfile_wrapper(sock, file, offset, nbytes=BUFFER_LEN, header="", trailer=""): +def sendfile_wrapper(sock, file, offset, nbytes=BUFFER_LEN, header="", + trailer=""): """A higher level wrapper representing how an application is supposed to use sendfile(). """ @@ -289,7 +288,8 @@ class TestSendfile(unittest.TestCase): def test_header(self): total_sent = 0 header = b("x") * 512 - sent = sendfile.sendfile(self.sockno, self.fileno, 0, header=header) + sent = sendfile.sendfile(self.sockno, self.fileno, 0, + header=header) total_sent += sent offset = BUFFER_LEN while 1: @@ -358,7 +358,6 @@ class TestSendfile(unittest.TestCase): err = sys.exc_info()[1] if err.errno not in (errno.EBUSY, errno.EAGAIN): raise - # --- corner cases def test_offset_overflow(self): @@ -485,13 +484,15 @@ class TestLargeFile(unittest.TestCase): sys.stdout.flush() def create_file(self): - if os.path.isfile(TESTFN3) and os.path.getsize(TESTFN3) >= BIGFILE_SIZE: + if (os.path.isfile(TESTFN3) and + os.path.getsize(TESTFN3) >= BIGFILE_SIZE): return f = open(TESTFN3, 'wb') chunk_len = 65536 chunk = b('x' * chunk_len) total = 0 - timer = RepeatedTimer(1, lambda: self.print_percent(total, BIGFILE_SIZE)) + timer = RepeatedTimer(1, lambda: self.print_percent(total, + BIGFILE_SIZE)) timer.start() try: while 1: @@ -538,6 +539,13 @@ class TestLargeFile(unittest.TestCase): self.assertEqual(file_size, data_len) +def cleanup(): + safe_remove(TESTFN) + safe_remove(TESTFN2) + +atexit.register(cleanup) + + def test_main(): parser = optparse.OptionParser() parser.add_option('-k', '--keepfile', action="store_true", default=False, @@ -546,22 +554,25 @@ def test_main(): if not options.keepfile: atexit.register(lambda: safe_remove(TESTFN3)) - def cleanup(): - safe_remove(TESTFN) - safe_remove(TESTFN2) - - atexit.register(cleanup) - test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(TestSendfile)) if has_large_file_support(): test_suite.addTest(unittest.makeSuite(TestLargeFile)) else: - atexit.register(warnings.warn, "large files unsupported", RuntimeWarning) + atexit.register(warnings.warn, "large files unsupported", + RuntimeWarning) cleanup() with open(TESTFN, "wb") as f: f.write(DATA) - unittest.TextTestRunner(verbosity=2).run(test_suite) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + if __name__ == '__main__': - test_main() + try: + if not test_main(): + sys.exit(1) + except (KeyboardInterrupt, SystemExit): + # this will make the threaded server exit immediately + asyncore.socket_map.clear() + raise |