summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-12-15 09:31:34 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-12-15 09:31:34 +0100
commite00b2456bad2565ceb3509d6f6ae3a6977a99db7 (patch)
tree7d0949dcbc9d0d6222be6b47c82c8c10ad8de044
parent1b5a35ac98a02a487768d39499a426e961fe4ef4 (diff)
downloadaioeventlet-e00b2456bad2565ceb3509d6f6ae3a6977a99db7.tar.gz
Remove tests duplicated in aiotest
-rw-r--r--tests/test_add_reader.py81
-rw-r--r--tests/test_callback.py38
-rw-r--r--tests/test_coroutine.py62
-rw-r--r--tests/test_network.py84
-rw-r--r--tests/test_thread.py101
-rw-r--r--tests/test_timer.py48
6 files changed, 0 insertions, 414 deletions
diff --git a/tests/test_add_reader.py b/tests/test_add_reader.py
deleted file mode 100644
index 12de90b..0000000
--- a/tests/test_add_reader.py
+++ /dev/null
@@ -1,81 +0,0 @@
-from __future__ import absolute_import
-from aioeventlet import socketpair
-import eventlet
-import tests
-socket = eventlet.patcher.original('socket')
-
-
-class AddReaderTests(tests.TestCase):
- def test_add_reader(self):
- result = {'received': None}
- rsock, wsock = socketpair()
- self.addCleanup(rsock.close)
- self.addCleanup(wsock.close)
-
- def reader():
- data = rsock.recv(100)
- result['received'] = data
- self.loop.remove_reader(rsock)
- self.loop.stop()
-
- def writer():
- self.loop.remove_writer(wsock)
- self.loop.call_soon(wsock.send, b'abc')
-
- self.loop.add_reader(rsock, reader)
- self.loop.add_writer(wsock, writer)
-
- self.loop.run_forever()
- self.assertEqual(result['received'], b'abc')
-
- def check_add_replace(self, event):
- selector = self.loop._selector
- if event == 'reader':
- add_sock = self.loop.add_reader
- remove_sock = self.loop.remove_reader
- def get_handle(fileobj):
- return selector.get_key(fileobj).data[0]
- else:
- add_sock = self.loop.add_writer
- remove_sock = self.loop.remove_writer
- def get_handle(fileobj):
- return selector.get_key(fileobj).data[1]
-
- sock = socket.socket()
- self.addCleanup(sock.close)
-
- def func():
- pass
-
- def func2():
- pass
-
- self.assertRaises(KeyError, get_handle, sock)
-
- add_sock(sock, func)
- handle1 = get_handle(sock)
- self.assertFalse(handle1._cancelled)
-
- add_sock(sock, func2)
- handle2 = get_handle(sock)
- self.assertIsNot(handle1, handle2)
- self.assertTrue(handle1._cancelled)
- self.assertFalse(handle2._cancelled)
-
- removed = remove_sock(sock)
- self.assertTrue(removed)
- self.assertTrue(handle2._cancelled)
-
- removed = remove_sock(sock)
- self.assertFalse(removed)
-
- def test_add_reader_replace(self):
- self.check_add_replace("reader")
-
- def test_add_writer_replace(self):
- self.check_add_replace("writer")
-
-
-if __name__ == '__main__':
- import unittest
- unittest.main()
diff --git a/tests/test_callback.py b/tests/test_callback.py
deleted file mode 100644
index 58f787b..0000000
--- a/tests/test_callback.py
+++ /dev/null
@@ -1,38 +0,0 @@
-import tests
-
-class CallbackTests(tests.TestCase):
- def test_hello_world(self):
- result = []
-
- def hello_world(loop):
- result.append('Hello World')
- loop.stop()
-
- self.loop.call_soon(hello_world, self.loop)
- self.loop.run_forever()
- self.assertEqual(result, ['Hello World'])
-
- def test_soon_stop_soon(self):
- result = []
-
- def hello():
- result.append("Hello")
-
- def world():
- result.append("World")
- self.loop.stop()
-
- self.loop.call_soon(hello)
- self.loop.stop()
- self.loop.call_soon(world)
-
- self.loop.run_forever()
- self.assertEqual(result, ["Hello"])
-
- self.loop.run_forever()
- self.assertEqual(result, ["Hello", "World"])
-
-
-if __name__ == '__main__':
- import unittest
- unittest.main()
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py
deleted file mode 100644
index 5bb0af1..0000000
--- a/tests/test_coroutine.py
+++ /dev/null
@@ -1,62 +0,0 @@
-import tests
-
-try:
- import asyncio
-
- exec('''if 1:
- def hello_world(result, delay):
- result.append("Hello")
- # retrieve the event loop from the policy
- yield from asyncio.sleep(delay)
- result.append('World')
- return "."
-
- def waiter(result):
- loop = asyncio.get_event_loop()
- fut = asyncio.Future(loop=loop)
- loop.call_soon(fut.set_result, "Future")
-
- value = yield from fut
- result.append(value)
-
- value = yield from hello_world(result, 0.001)
- result.append(value)
- ''')
-except ImportError:
- import trollius as asyncio
- from trollius import From, Return
-
- def hello_world(result, delay):
- result.append("Hello")
- # retrieve the event loop from the policy
- yield From(asyncio.sleep(delay))
- result.append('World')
- raise Return(".")
-
- def waiter(result):
- loop = asyncio.get_event_loop()
- fut = asyncio.Future(loop=loop)
- loop.call_soon(fut.set_result, "Future")
-
- value = yield From(fut)
- result.append(value)
-
- value = yield From(hello_world(result, 0.001))
- result.append(value)
-
-
-class CallbackTests(tests.TestCase):
- def test_hello_world(self):
- result = []
- self.loop.run_until_complete(hello_world(result, 0.001))
- self.assertEqual(result, ['Hello', 'World'])
-
- def test_waiter(self):
- result = []
- self.loop.run_until_complete(waiter(result))
- self.assertEqual(result, ['Future', 'Hello', 'World', '.'])
-
-
-if __name__ == '__main__':
- import unittest
- unittest.main()
diff --git a/tests/test_network.py b/tests/test_network.py
deleted file mode 100644
index be0bebd..0000000
--- a/tests/test_network.py
+++ /dev/null
@@ -1,84 +0,0 @@
-import eventlet
-import tests
-socket = eventlet.patcher.original('socket')
-threading = eventlet.patcher.original('threading')
-
-class TcpEchoClientProtocol(tests.asyncio.Protocol):
- def __init__(self, message, loop):
- self.message = message
- self.loop = loop
- self.state = 'new'
- self.received = None
-
- def connection_made(self, transport):
- self.state = 'ping'
- transport.write(self.message)
-
- def data_received(self, data):
- self.state = 'pong'
- self.received = data
-
- def connection_lost(self, exc):
- self.state = 'closed'
- self.loop.stop()
-
-
-class TcpServer(threading.Thread):
- def __init__(self, host, port, event):
- super(TcpServer, self).__init__()
- self.host = host
- self.port = port
- self.event = event
- self.sock = None
- self.client = None
-
- def run(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock = sock
- try:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind((self.host, self.port))
- sock.listen(1)
-
- self.event.set()
- client, addr = sock.accept()
- self.client = client
- try:
- message = client.recv(100)
- client.sendall(message)
- finally:
- client.close()
- self.client = None
- finally:
- sock.close()
- self.sock = None
-
- def stop(self):
- self.join()
-
-
-class NetworkTests(tests.TestCase):
- def test_tcp_hello(self):
- port = 8888
- host = '127.0.0.1'
- message = b'Hello World!'
-
- event = threading.Event()
- server = TcpServer(host, port, event)
- server.start()
- self.addCleanup(server.stop)
- event.wait()
-
- proto = TcpEchoClientProtocol(message, self.loop)
- coro = self.loop.create_connection(lambda: proto, host, port)
- self.loop.run_until_complete(coro)
- self.assertNotEqual(proto.state, 'new')
-
- self.loop.run_forever()
- self.assertEqual(proto.state, 'closed')
- self.assertEqual(proto.received, message)
-
-
-if __name__ == '__main__':
- import unittest
- unittest.main()
diff --git a/tests/test_thread.py b/tests/test_thread.py
deleted file mode 100644
index 4c4185b..0000000
--- a/tests/test_thread.py
+++ /dev/null
@@ -1,101 +0,0 @@
-import eventlet
-import tests
-threading = eventlet.patcher.original('threading')
-try:
- import asyncio
-except ImportError:
- import trollius as asyncio
-
-try:
- get_ident = threading.get_ident # Python 3
-except AttributeError:
- get_ident = threading._get_ident # Python 2
-
-class ThreadTests(tests.TestCase):
- def test_ident(self):
- result = {'ident': None}
-
- def work():
- result['ident'] = get_ident()
-
- fut = self.loop.run_in_executor(None, work)
- self.loop.run_until_complete(fut)
-
- # ensure that work() was executed in a different thread
- work_ident = result['ident']
- self.assertIsNotNone(work_ident)
- self.assertNotEqual(work_ident, get_ident())
-
- def test_run_twice(self):
- result = []
-
- def work():
- result.append("run")
-
- fut = self.loop.run_in_executor(None, work)
- self.loop.run_until_complete(fut)
- self.assertEqual(result, ["run"])
-
- # ensure that run_in_executor() can be called twice
- fut = self.loop.run_in_executor(None, work)
- self.loop.run_until_complete(fut)
- self.assertEqual(result, ["run", "run"])
-
- def test_policy(self):
- result = {'loop': 'not set'} # sentinel, different than None
-
- def work():
- try:
- result['loop'] = asyncio.get_event_loop()
- except AssertionError as exc:
- result['loop'] = exc
-
- # get_event_loop() must return None in a different thread
- fut = self.loop.run_in_executor(None, work)
- self.loop.run_until_complete(fut)
- self.assertIsInstance(result['loop'], AssertionError)
-
- def test_run_in_thread(self):
- class LoopThread(threading.Thread):
- def __init__(self, event):
- super(LoopThread, self).__init__()
- self.loop = None
- self.event = event
-
- def run(self):
- self.loop = asyncio.new_event_loop()
- try:
- self.loop.set_debug(True)
- asyncio.set_event_loop(self.loop)
-
- self.event.set()
- self.loop.run_forever()
- finally:
- self.loop.close()
- asyncio.set_event_loop(None)
-
- result = []
-
- # start an event loop in a thread
- event = threading.Event()
- thread = LoopThread(event)
- thread.start()
- event.wait()
- loop = thread.loop
-
- def func(loop):
- result.append(threading.current_thread().ident)
- loop.stop()
-
- # call func() in a different thread using the event loop
- tid = thread.ident
- loop.call_soon_threadsafe(func, loop)
-
- # stop the event loop
- thread.join()
- self.assertEqual(result, [tid])
-
-
-if __name__ == '__main__':
- import unittest
- unittest.main()
diff --git a/tests/test_timer.py b/tests/test_timer.py
deleted file mode 100644
index f2b26d6..0000000
--- a/tests/test_timer.py
+++ /dev/null
@@ -1,48 +0,0 @@
-import datetime
-import eventlet
-import tests
-
-class TimerTests(tests.TestCase):
- def test_display_date(self):
- result = []
- delay = 0.050
- count = 3
-
- def display_date(end_time, loop):
- result.append(datetime.datetime.now())
- if (loop.time() + delay) < end_time:
- loop.call_later(delay, display_date, end_time, loop)
- else:
- loop.stop()
-
- end_time = self.loop.time() + delay * count
- self.loop.call_soon(display_date, end_time, self.loop)
- self.loop.run_forever()
-
- self.assertEqual(len(result), count, result)
-
- def test_later_stop_later(self):
- result = []
-
- def hello():
- result.append("Hello")
-
- def world(loop):
- result.append("World")
- loop.stop()
-
- self.loop.call_later(0.001, hello)
- self.loop.call_later(0.025, self.loop.stop)
- self.loop.call_later(0.050, world, self.loop)
- self.loop.run_forever()
-
- eventlet.sleep(0.100)
- self.assertEqual(result, ["Hello"])
-
- self.loop.run_forever()
- self.assertEqual(result, ["Hello", "World"])
-
-
-if __name__ == '__main__':
- import unittest
- unittest.main()