diff options
author | Giampaolo Rodola <g.rodola@gmail.com> | 2011-02-10 09:55:00 +0000 |
---|---|---|
committer | Giampaolo Rodola <g.rodola@gmail.com> | 2011-02-10 09:55:00 +0000 |
commit | fc3b4ab21d63209e2ee9f8c82b90d18748be9dbe (patch) | |
tree | 34be8c3bf8f93167bfaf556a30cf3afbdcf8c214 | |
parent | 1bc64043b3fcc427a5ec7a416835d9b1a5cda6c9 (diff) | |
download | pysendfile-fc3b4ab21d63209e2ee9f8c82b90d18748be9dbe.tar.gz |
rewrite benchmark.py script so that it no longer uses a threaded asyncore server but a simpler one using multiprocessing module
-rw-r--r-- | test/benchmark.py | 236 |
1 files changed, 77 insertions, 159 deletions
diff --git a/test/benchmark.py b/test/benchmark.py index e984f08..1740099 100644 --- a/test/benchmark.py +++ b/test/benchmark.py @@ -8,12 +8,13 @@ performances in terms of CPU time spent and bytes transmitted per second. This is what I get on my Linux 2.6.35-22 box, Intel core duo 3.1 GHz: === send() === -cpu: 10.98 usec/pass -speed: 918.86 Mb/sec +cpu: 6.60 usec/pass +speed: 1527.67 Mb/sec === sendfile() === -cpu: 4.94 usec/pass -speed: 1939.39 Mb/sec +cpu: 3.47 usec/pass +speed: 2882.97 Mb/sec + Working with python 2.x only. """ @@ -21,16 +22,13 @@ Working with python 2.x only. import socket import sys import os -import asyncore -import asynchat -import threading import errno import timeit import time import atexit -from errno import EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EWOULDBLOCK +from multiprocessing import Process -import sendfile +from sendfile import sendfile HOST = "127.0.0.1" @@ -40,129 +38,16 @@ BIGFILE_SIZE = 1024 * 1024 * 1024 # 1 GB BUFFER_SIZE = 65536 -class Handler(asynchat.async_chat): - - def __init__(self, conn, server): - asynchat.async_chat.__init__(self, conn) - self.server = server - self.file = open(BIGFILE, "rb") - self.sockfd = self.socket.fileno() - self.filefd = self.file.fileno() - self.offset = 0 - self.handle_write() - - def writable(self): - return 1 - - def handle_write(self): - if self.server.use_sendfile: - self.do_sendfile() - else: - self.do_send() - - def do_send(self): - chunk = self.file.read(BUFFER_SIZE) - if not chunk: - self.handle_close() - try: - self.socket.sendall(chunk) - except socket.error, err: - if err.errno == EWOULDBLOCK: - return - if err.errno in (EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED): - return self.handle_close() - raise - - def do_sendfile(self): - try: - sent, _ = sendfile.sendfile(self.sockfd, self.filefd, self.offset, - BUFFER_SIZE) - except OSError, err: - if err.errno == errno.ECONNRESET: - # can occur on Linux - self.handle_close() - elif err.errno == errno.EAGAIN: - # can occur on BSD, meaning we have to retry send data - return - else: - raise - else: - self.offset += sent - if sent == 0: - self.handle_close() - - def handle_close(self): - if self.server.keep_sending: - self.file.seek(0) - self.offset = 0 - else: - self.file.close() - self.close() - - def handle_error(self): - raise - - -class Server(asyncore.dispatcher, threading.Thread): - - handler = Handler - use_sendfile = True - keep_sending = True - - def __init__(self): - threading.Thread.__init__(self) - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind((HOST, PORT)) - self.listen(5) - self.host, self.port = self.socket.getsockname()[:2] - self.handler_instance = None - self._active = False - self._active_lock = threading.Lock() - - # --- public API - - @property - def running(self): - return self._active - - def start(self): - assert not self.running - self.__flag = threading.Event() - threading.Thread.start(self) - self.__flag.wait() - - def stop(self): - assert self.running - self._active = False - self.join() - - # --- internals - - def run(self): - self._active = True - self.__flag.set() - while self._active and asyncore.socket_map: - self._active_lock.acquire() - asyncore.loop(timeout=0.001, count=1) - self._active_lock.release() - asyncore.close_all() - - def handle_accept(self): - conn, addr = self.accept() - self.handler_instance = self.handler(conn, self) - - def handle_connect(self): - self.close() - handle_read = handle_connect - - def writable(self): - return 0 - - def handle_error(self): - raise - +def create_file(filename, size): + f = open(filename, 'wb') + bytes = 0 + while 1: + data = "x" * 1024 + bytes += len(data) + f.write(data) + if bytes >= size: + break + f.close() class Client: @@ -188,56 +73,90 @@ class Client: return bytes_recv -def create_file(filename, size): - f = open(filename, 'wb') - bytes = 0 - while 1: - data = "x" * 1024 - bytes += len(data) - f.write(data) - if bytes >= size: - break - f.close() - def start_server(use_sendfile=True, keep_sending=False): - server = Server() - server.use_sendfile = use_sendfile - server.keep_sending = keep_sending - server.start() - return server + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((HOST, PORT)) + sock.listen(1) + conn, addr = sock.accept() + file = open(BIGFILE, 'rb') + if not use_sendfile: + while 1: + chunk = file.read(BUFFER_SIZE) + if not chunk: + # EOF + if keep_sending: + file.seek(0) + continue + else: + break + conn.sendall(chunk) + else: + offset = 0 + sockno = conn.fileno() + fileno = file.fileno() + while 1: + try: + sent = sendfile(sockno, fileno, offset, BUFFER_SIZE) + except OSError, err: + if err.errno in (errno.EAGAIN, errno.EBUSY): + continue + raise + else: + if sent == 0: + # EOF + if keep_sending: + offset = 0 + continue + else: + break + offset += sent + file.close() + conn.close() def main(): - atexit.register(lambda: os.remove(BIGFILE)) - if not os.path.exists(BIGFILE) or os.path.getsize(BIGFILE) < BIGFILE_SIZE: print "creating big file . . ." create_file(BIGFILE, BIGFILE_SIZE) print "starting benchmark . . .\n" + #atexit.register(lambda: os.remove(BIGFILE)) + # CPU time: use sendfile() - server = start_server(use_sendfile=1) + server = Process(target=start_server, kwargs={"use_sendfile":True}) + server.start() + time.sleep(0.1) t1 = timeit.Timer(setup="from __main__ import Client; client = Client()", stmt="client.retr()").timeit(number=1) - server.stop() + server.terminate() # CPU time: use send() - server = start_server(use_sendfile=0) + server = Process(target=start_server, kwargs={"use_sendfile":False}) + server.start() + time.sleep(0.1) t2 = timeit.Timer(setup="from __main__ import Client; client = Client()", stmt="client.retr()").timeit(number=1) - server.stop() + server.terminate() # MB/sec: use sendfile() - server = start_server(use_sendfile=1, keep_sending=1) + server = Process(target=start_server, kwargs={"use_sendfile":True, + "keep_sending":True}) + server.start() + time.sleep(0.1) client = Client() bytes1 = client.retr_for_1_sec() - server.stop() + server.terminate() # MB/sec: use send() - server = start_server(use_sendfile=0, keep_sending=1) + # MB/sec: use sendfile() + server = Process(target=start_server, kwargs={"use_sendfile":False, + "keep_sending":True}) + server.start() + time.sleep(0.1) client = Client() bytes2 = client.retr_for_1_sec() - server.stop() + server.terminate() print "=== send() ===" print "cpu: %.2f usec/pass" % (1000000 * t2 / 100000) @@ -246,7 +165,6 @@ def main(): print "=== sendfile() ===" print "cpu: %.2f usec/pass" % (1000000 * t1 / 100000) print "speed: %s Mb/sec" % round(bytes1 / 1024.0 / 1024.0, 2) - print if __name__ == '__main__': main() |