From e00b2456bad2565ceb3509d6f6ae3a6977a99db7 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Mon, 15 Dec 2014 09:31:34 +0100 Subject: Remove tests duplicated in aiotest --- tests/test_add_reader.py | 81 ------------------------------------- tests/test_callback.py | 38 ------------------ tests/test_coroutine.py | 62 ----------------------------- tests/test_network.py | 84 --------------------------------------- tests/test_thread.py | 101 ----------------------------------------------- tests/test_timer.py | 48 ---------------------- 6 files changed, 414 deletions(-) delete mode 100644 tests/test_add_reader.py delete mode 100644 tests/test_callback.py delete mode 100644 tests/test_coroutine.py delete mode 100644 tests/test_network.py delete mode 100644 tests/test_thread.py delete mode 100644 tests/test_timer.py diff --git a/tests/test_add_reader.py b/tests/test_add_reader.py deleted file mode 100644 index 12de90b..0000000 --- a/tests/test_add_reader.py +++ /dev/null @@ -1,81 +0,0 @@ -from __future__ import absolute_import -from aioeventlet import socketpair -import eventlet -import tests -socket = eventlet.patcher.original('socket') - - -class AddReaderTests(tests.TestCase): - def test_add_reader(self): - result = {'received': None} - rsock, wsock = socketpair() - self.addCleanup(rsock.close) - self.addCleanup(wsock.close) - - def reader(): - data = rsock.recv(100) - result['received'] = data - self.loop.remove_reader(rsock) - self.loop.stop() - - def writer(): - self.loop.remove_writer(wsock) - self.loop.call_soon(wsock.send, b'abc') - - self.loop.add_reader(rsock, reader) - self.loop.add_writer(wsock, writer) - - self.loop.run_forever() - self.assertEqual(result['received'], b'abc') - - def check_add_replace(self, event): - selector = self.loop._selector - if event == 'reader': - add_sock = self.loop.add_reader - remove_sock = self.loop.remove_reader - def get_handle(fileobj): - return selector.get_key(fileobj).data[0] - else: - add_sock = self.loop.add_writer - remove_sock = self.loop.remove_writer - def get_handle(fileobj): - return selector.get_key(fileobj).data[1] - - sock = socket.socket() - self.addCleanup(sock.close) - - def func(): - pass - - def func2(): - pass - - self.assertRaises(KeyError, get_handle, sock) - - add_sock(sock, func) - handle1 = get_handle(sock) - self.assertFalse(handle1._cancelled) - - add_sock(sock, func2) - handle2 = get_handle(sock) - self.assertIsNot(handle1, handle2) - self.assertTrue(handle1._cancelled) - self.assertFalse(handle2._cancelled) - - removed = remove_sock(sock) - self.assertTrue(removed) - self.assertTrue(handle2._cancelled) - - removed = remove_sock(sock) - self.assertFalse(removed) - - def test_add_reader_replace(self): - self.check_add_replace("reader") - - def test_add_writer_replace(self): - self.check_add_replace("writer") - - -if __name__ == '__main__': - import unittest - unittest.main() diff --git a/tests/test_callback.py b/tests/test_callback.py deleted file mode 100644 index 58f787b..0000000 --- a/tests/test_callback.py +++ /dev/null @@ -1,38 +0,0 @@ -import tests - -class CallbackTests(tests.TestCase): - def test_hello_world(self): - result = [] - - def hello_world(loop): - result.append('Hello World') - loop.stop() - - self.loop.call_soon(hello_world, self.loop) - self.loop.run_forever() - self.assertEqual(result, ['Hello World']) - - def test_soon_stop_soon(self): - result = [] - - def hello(): - result.append("Hello") - - def world(): - result.append("World") - self.loop.stop() - - self.loop.call_soon(hello) - self.loop.stop() - self.loop.call_soon(world) - - self.loop.run_forever() - self.assertEqual(result, ["Hello"]) - - self.loop.run_forever() - self.assertEqual(result, ["Hello", "World"]) - - -if __name__ == '__main__': - import unittest - unittest.main() diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py deleted file mode 100644 index 5bb0af1..0000000 --- a/tests/test_coroutine.py +++ /dev/null @@ -1,62 +0,0 @@ -import tests - -try: - import asyncio - - exec('''if 1: - def hello_world(result, delay): - result.append("Hello") - # retrieve the event loop from the policy - yield from asyncio.sleep(delay) - result.append('World') - return "." - - def waiter(result): - loop = asyncio.get_event_loop() - fut = asyncio.Future(loop=loop) - loop.call_soon(fut.set_result, "Future") - - value = yield from fut - result.append(value) - - value = yield from hello_world(result, 0.001) - result.append(value) - ''') -except ImportError: - import trollius as asyncio - from trollius import From, Return - - def hello_world(result, delay): - result.append("Hello") - # retrieve the event loop from the policy - yield From(asyncio.sleep(delay)) - result.append('World') - raise Return(".") - - def waiter(result): - loop = asyncio.get_event_loop() - fut = asyncio.Future(loop=loop) - loop.call_soon(fut.set_result, "Future") - - value = yield From(fut) - result.append(value) - - value = yield From(hello_world(result, 0.001)) - result.append(value) - - -class CallbackTests(tests.TestCase): - def test_hello_world(self): - result = [] - self.loop.run_until_complete(hello_world(result, 0.001)) - self.assertEqual(result, ['Hello', 'World']) - - def test_waiter(self): - result = [] - self.loop.run_until_complete(waiter(result)) - self.assertEqual(result, ['Future', 'Hello', 'World', '.']) - - -if __name__ == '__main__': - import unittest - unittest.main() diff --git a/tests/test_network.py b/tests/test_network.py deleted file mode 100644 index be0bebd..0000000 --- a/tests/test_network.py +++ /dev/null @@ -1,84 +0,0 @@ -import eventlet -import tests -socket = eventlet.patcher.original('socket') -threading = eventlet.patcher.original('threading') - -class TcpEchoClientProtocol(tests.asyncio.Protocol): - def __init__(self, message, loop): - self.message = message - self.loop = loop - self.state = 'new' - self.received = None - - def connection_made(self, transport): - self.state = 'ping' - transport.write(self.message) - - def data_received(self, data): - self.state = 'pong' - self.received = data - - def connection_lost(self, exc): - self.state = 'closed' - self.loop.stop() - - -class TcpServer(threading.Thread): - def __init__(self, host, port, event): - super(TcpServer, self).__init__() - self.host = host - self.port = port - self.event = event - self.sock = None - self.client = None - - def run(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock = sock - try: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((self.host, self.port)) - sock.listen(1) - - self.event.set() - client, addr = sock.accept() - self.client = client - try: - message = client.recv(100) - client.sendall(message) - finally: - client.close() - self.client = None - finally: - sock.close() - self.sock = None - - def stop(self): - self.join() - - -class NetworkTests(tests.TestCase): - def test_tcp_hello(self): - port = 8888 - host = '127.0.0.1' - message = b'Hello World!' - - event = threading.Event() - server = TcpServer(host, port, event) - server.start() - self.addCleanup(server.stop) - event.wait() - - proto = TcpEchoClientProtocol(message, self.loop) - coro = self.loop.create_connection(lambda: proto, host, port) - self.loop.run_until_complete(coro) - self.assertNotEqual(proto.state, 'new') - - self.loop.run_forever() - self.assertEqual(proto.state, 'closed') - self.assertEqual(proto.received, message) - - -if __name__ == '__main__': - import unittest - unittest.main() diff --git a/tests/test_thread.py b/tests/test_thread.py deleted file mode 100644 index 4c4185b..0000000 --- a/tests/test_thread.py +++ /dev/null @@ -1,101 +0,0 @@ -import eventlet -import tests -threading = eventlet.patcher.original('threading') -try: - import asyncio -except ImportError: - import trollius as asyncio - -try: - get_ident = threading.get_ident # Python 3 -except AttributeError: - get_ident = threading._get_ident # Python 2 - -class ThreadTests(tests.TestCase): - def test_ident(self): - result = {'ident': None} - - def work(): - result['ident'] = get_ident() - - fut = self.loop.run_in_executor(None, work) - self.loop.run_until_complete(fut) - - # ensure that work() was executed in a different thread - work_ident = result['ident'] - self.assertIsNotNone(work_ident) - self.assertNotEqual(work_ident, get_ident()) - - def test_run_twice(self): - result = [] - - def work(): - result.append("run") - - fut = self.loop.run_in_executor(None, work) - self.loop.run_until_complete(fut) - self.assertEqual(result, ["run"]) - - # ensure that run_in_executor() can be called twice - fut = self.loop.run_in_executor(None, work) - self.loop.run_until_complete(fut) - self.assertEqual(result, ["run", "run"]) - - def test_policy(self): - result = {'loop': 'not set'} # sentinel, different than None - - def work(): - try: - result['loop'] = asyncio.get_event_loop() - except AssertionError as exc: - result['loop'] = exc - - # get_event_loop() must return None in a different thread - fut = self.loop.run_in_executor(None, work) - self.loop.run_until_complete(fut) - self.assertIsInstance(result['loop'], AssertionError) - - def test_run_in_thread(self): - class LoopThread(threading.Thread): - def __init__(self, event): - super(LoopThread, self).__init__() - self.loop = None - self.event = event - - def run(self): - self.loop = asyncio.new_event_loop() - try: - self.loop.set_debug(True) - asyncio.set_event_loop(self.loop) - - self.event.set() - self.loop.run_forever() - finally: - self.loop.close() - asyncio.set_event_loop(None) - - result = [] - - # start an event loop in a thread - event = threading.Event() - thread = LoopThread(event) - thread.start() - event.wait() - loop = thread.loop - - def func(loop): - result.append(threading.current_thread().ident) - loop.stop() - - # call func() in a different thread using the event loop - tid = thread.ident - loop.call_soon_threadsafe(func, loop) - - # stop the event loop - thread.join() - self.assertEqual(result, [tid]) - - -if __name__ == '__main__': - import unittest - unittest.main() diff --git a/tests/test_timer.py b/tests/test_timer.py deleted file mode 100644 index f2b26d6..0000000 --- a/tests/test_timer.py +++ /dev/null @@ -1,48 +0,0 @@ -import datetime -import eventlet -import tests - -class TimerTests(tests.TestCase): - def test_display_date(self): - result = [] - delay = 0.050 - count = 3 - - def display_date(end_time, loop): - result.append(datetime.datetime.now()) - if (loop.time() + delay) < end_time: - loop.call_later(delay, display_date, end_time, loop) - else: - loop.stop() - - end_time = self.loop.time() + delay * count - self.loop.call_soon(display_date, end_time, self.loop) - self.loop.run_forever() - - self.assertEqual(len(result), count, result) - - def test_later_stop_later(self): - result = [] - - def hello(): - result.append("Hello") - - def world(loop): - result.append("World") - loop.stop() - - self.loop.call_later(0.001, hello) - self.loop.call_later(0.025, self.loop.stop) - self.loop.call_later(0.050, world, self.loop) - self.loop.run_forever() - - eventlet.sleep(0.100) - self.assertEqual(result, ["Hello"]) - - self.loop.run_forever() - self.assertEqual(result, ["Hello", "World"]) - - -if __name__ == '__main__': - import unittest - unittest.main() -- cgit v1.2.1