diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-12-15 09:31:34 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-12-15 09:31:34 +0100 |
commit | e00b2456bad2565ceb3509d6f6ae3a6977a99db7 (patch) | |
tree | 7d0949dcbc9d0d6222be6b47c82c8c10ad8de044 /tests/test_thread.py | |
parent | 1b5a35ac98a02a487768d39499a426e961fe4ef4 (diff) | |
download | aioeventlet-e00b2456bad2565ceb3509d6f6ae3a6977a99db7.tar.gz |
Remove tests duplicated in aiotest
Diffstat (limited to 'tests/test_thread.py')
-rw-r--r-- | tests/test_thread.py | 101 |
1 files changed, 0 insertions, 101 deletions
diff --git a/tests/test_thread.py b/tests/test_thread.py deleted file mode 100644 index 4c4185b..0000000 --- a/tests/test_thread.py +++ /dev/null @@ -1,101 +0,0 @@ -import eventlet -import tests -threading = eventlet.patcher.original('threading') -try: - import asyncio -except ImportError: - import trollius as asyncio - -try: - get_ident = threading.get_ident # Python 3 -except AttributeError: - get_ident = threading._get_ident # Python 2 - -class ThreadTests(tests.TestCase): - def test_ident(self): - result = {'ident': None} - - def work(): - result['ident'] = get_ident() - - fut = self.loop.run_in_executor(None, work) - self.loop.run_until_complete(fut) - - # ensure that work() was executed in a different thread - work_ident = result['ident'] - self.assertIsNotNone(work_ident) - self.assertNotEqual(work_ident, get_ident()) - - def test_run_twice(self): - result = [] - - def work(): - result.append("run") - - fut = self.loop.run_in_executor(None, work) - self.loop.run_until_complete(fut) - self.assertEqual(result, ["run"]) - - # ensure that run_in_executor() can be called twice - fut = self.loop.run_in_executor(None, work) - self.loop.run_until_complete(fut) - self.assertEqual(result, ["run", "run"]) - - def test_policy(self): - result = {'loop': 'not set'} # sentinel, different than None - - def work(): - try: - result['loop'] = asyncio.get_event_loop() - except AssertionError as exc: - result['loop'] = exc - - # get_event_loop() must return None in a different thread - fut = self.loop.run_in_executor(None, work) - self.loop.run_until_complete(fut) - self.assertIsInstance(result['loop'], AssertionError) - - def test_run_in_thread(self): - class LoopThread(threading.Thread): - def __init__(self, event): - super(LoopThread, self).__init__() - self.loop = None - self.event = event - - def run(self): - self.loop = asyncio.new_event_loop() - try: - self.loop.set_debug(True) - asyncio.set_event_loop(self.loop) - - self.event.set() - self.loop.run_forever() - finally: - self.loop.close() - asyncio.set_event_loop(None) - - result = [] - - # start an event loop in a thread - event = threading.Event() - thread = LoopThread(event) - thread.start() - event.wait() - loop = thread.loop - - def func(loop): - result.append(threading.current_thread().ident) - loop.stop() - - # call func() in a different thread using the event loop - tid = thread.ident - loop.call_soon_threadsafe(func, loop) - - # stop the event loop - thread.join() - self.assertEqual(result, [tid]) - - -if __name__ == '__main__': - import unittest - unittest.main() |