summaryrefslogtreecommitdiff
path: root/Lib/test/test_asyncio/test_proactor_events.py
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2013-12-14 20:42:22 +0200
committerSerhiy Storchaka <storchaka@gmail.com>2013-12-14 20:42:22 +0200
commit8ad7d762f53e094d9b88127b6cf09e38ea565e96 (patch)
tree6e2b04cdd658ad01c55640ed8cf8b52121f0e31d /Lib/test/test_asyncio/test_proactor_events.py
parent043c30ba7dfbd17596a59f6ac6afe508a2e04fbd (diff)
parenta566549211d48775b6573e74dc55dd0fcde0600b (diff)
downloadcpython-8ad7d762f53e094d9b88127b6cf09e38ea565e96.tar.gz
Issue #19623: Fixed writing to unseekable files in the aifc module.
Diffstat (limited to 'Lib/test/test_asyncio/test_proactor_events.py')
-rw-r--r--Lib/test/test_asyncio/test_proactor_events.py484
1 files changed, 484 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
new file mode 100644
index 0000000000..9964f425d2
--- /dev/null
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -0,0 +1,484 @@
+"""Tests for proactor_events.py"""
+
+import socket
+import unittest
+import unittest.mock
+
+import asyncio
+from asyncio.proactor_events import BaseProactorEventLoop
+from asyncio.proactor_events import _ProactorSocketTransport
+from asyncio.proactor_events import _ProactorWritePipeTransport
+from asyncio.proactor_events import _ProactorDuplexPipeTransport
+from asyncio import test_utils
+
+
+class ProactorSocketTransportTests(unittest.TestCase):
+
+ def setUp(self):
+ self.loop = test_utils.TestLoop()
+ self.proactor = unittest.mock.Mock()
+ self.loop._proactor = self.proactor
+ self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
+ self.sock = unittest.mock.Mock(socket.socket)
+
+ def test_ctor(self):
+ fut = asyncio.Future(loop=self.loop)
+ tr = _ProactorSocketTransport(
+ self.loop, self.sock, self.protocol, fut)
+ test_utils.run_briefly(self.loop)
+ self.assertIsNone(fut.result())
+ self.protocol.connection_made(tr)
+ self.proactor.recv.assert_called_with(self.sock, 4096)
+
+ def test_loop_reading(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._loop_reading()
+ self.loop._proactor.recv.assert_called_with(self.sock, 4096)
+ self.assertFalse(self.protocol.data_received.called)
+ self.assertFalse(self.protocol.eof_received.called)
+
+ def test_loop_reading_data(self):
+ res = asyncio.Future(loop=self.loop)
+ res.set_result(b'data')
+
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+
+ tr._read_fut = res
+ tr._loop_reading(res)
+ self.loop._proactor.recv.assert_called_with(self.sock, 4096)
+ self.protocol.data_received.assert_called_with(b'data')
+
+ def test_loop_reading_no_data(self):
+ res = asyncio.Future(loop=self.loop)
+ res.set_result(b'')
+
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+
+ self.assertRaises(AssertionError, tr._loop_reading, res)
+
+ tr.close = unittest.mock.Mock()
+ tr._read_fut = res
+ tr._loop_reading(res)
+ self.assertFalse(self.loop._proactor.recv.called)
+ self.assertTrue(self.protocol.eof_received.called)
+ self.assertTrue(tr.close.called)
+
+ def test_loop_reading_aborted(self):
+ err = self.loop._proactor.recv.side_effect = ConnectionAbortedError()
+
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._fatal_error = unittest.mock.Mock()
+ tr._loop_reading()
+ tr._fatal_error.assert_called_with(err)
+
+ def test_loop_reading_aborted_closing(self):
+ self.loop._proactor.recv.side_effect = ConnectionAbortedError()
+
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._closing = True
+ tr._fatal_error = unittest.mock.Mock()
+ tr._loop_reading()
+ self.assertFalse(tr._fatal_error.called)
+
+ def test_loop_reading_aborted_is_fatal(self):
+ self.loop._proactor.recv.side_effect = ConnectionAbortedError()
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._closing = False
+ tr._fatal_error = unittest.mock.Mock()
+ tr._loop_reading()
+ self.assertTrue(tr._fatal_error.called)
+
+ def test_loop_reading_conn_reset_lost(self):
+ err = self.loop._proactor.recv.side_effect = ConnectionResetError()
+
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._closing = False
+ tr._fatal_error = unittest.mock.Mock()
+ tr._force_close = unittest.mock.Mock()
+ tr._loop_reading()
+ self.assertFalse(tr._fatal_error.called)
+ tr._force_close.assert_called_with(err)
+
+ def test_loop_reading_exception(self):
+ err = self.loop._proactor.recv.side_effect = (OSError())
+
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._fatal_error = unittest.mock.Mock()
+ tr._loop_reading()
+ tr._fatal_error.assert_called_with(err)
+
+ def test_write(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._loop_writing = unittest.mock.Mock()
+ tr.write(b'data')
+ self.assertEqual(tr._buffer, None)
+ tr._loop_writing.assert_called_with(data=b'data')
+
+ def test_write_no_data(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr.write(b'')
+ self.assertFalse(tr._buffer)
+
+ def test_write_more(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._write_fut = unittest.mock.Mock()
+ tr._loop_writing = unittest.mock.Mock()
+ tr.write(b'data')
+ self.assertEqual(tr._buffer, b'data')
+ self.assertFalse(tr._loop_writing.called)
+
+ def test_loop_writing(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._buffer = bytearray(b'data')
+ tr._loop_writing()
+ self.loop._proactor.send.assert_called_with(self.sock, b'data')
+ self.loop._proactor.send.return_value.add_done_callback.\
+ assert_called_with(tr._loop_writing)
+
+ @unittest.mock.patch('asyncio.proactor_events.logger')
+ def test_loop_writing_err(self, m_log):
+ err = self.loop._proactor.send.side_effect = OSError()
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._fatal_error = unittest.mock.Mock()
+ tr._buffer = [b'da', b'ta']
+ tr._loop_writing()
+ tr._fatal_error.assert_called_with(err)
+ tr._conn_lost = 1
+
+ tr.write(b'data')
+ tr.write(b'data')
+ tr.write(b'data')
+ tr.write(b'data')
+ tr.write(b'data')
+ self.assertEqual(tr._buffer, None)
+ m_log.warning.assert_called_with('socket.send() raised exception.')
+
+ def test_loop_writing_stop(self):
+ fut = asyncio.Future(loop=self.loop)
+ fut.set_result(b'data')
+
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._write_fut = fut
+ tr._loop_writing(fut)
+ self.assertIsNone(tr._write_fut)
+
+ def test_loop_writing_closing(self):
+ fut = asyncio.Future(loop=self.loop)
+ fut.set_result(1)
+
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._write_fut = fut
+ tr.close()
+ tr._loop_writing(fut)
+ self.assertIsNone(tr._write_fut)
+ test_utils.run_briefly(self.loop)
+ self.protocol.connection_lost.assert_called_with(None)
+
+ def test_abort(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._force_close = unittest.mock.Mock()
+ tr.abort()
+ tr._force_close.assert_called_with(None)
+
+ def test_close(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr.close()
+ test_utils.run_briefly(self.loop)
+ self.protocol.connection_lost.assert_called_with(None)
+ self.assertTrue(tr._closing)
+ self.assertEqual(tr._conn_lost, 1)
+
+ self.protocol.connection_lost.reset_mock()
+ tr.close()
+ test_utils.run_briefly(self.loop)
+ self.assertFalse(self.protocol.connection_lost.called)
+
+ def test_close_write_fut(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._write_fut = unittest.mock.Mock()
+ tr.close()
+ test_utils.run_briefly(self.loop)
+ self.assertFalse(self.protocol.connection_lost.called)
+
+ def test_close_buffer(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._buffer = [b'data']
+ tr.close()
+ test_utils.run_briefly(self.loop)
+ self.assertFalse(self.protocol.connection_lost.called)
+
+ @unittest.mock.patch('asyncio.proactor_events.logger')
+ def test_fatal_error(self, m_logging):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._force_close = unittest.mock.Mock()
+ tr._fatal_error(None)
+ self.assertTrue(tr._force_close.called)
+ self.assertTrue(m_logging.exception.called)
+
+ def test_force_close(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._buffer = [b'data']
+ read_fut = tr._read_fut = unittest.mock.Mock()
+ write_fut = tr._write_fut = unittest.mock.Mock()
+ tr._force_close(None)
+
+ read_fut.cancel.assert_called_with()
+ write_fut.cancel.assert_called_with()
+ test_utils.run_briefly(self.loop)
+ self.protocol.connection_lost.assert_called_with(None)
+ self.assertEqual(None, tr._buffer)
+ self.assertEqual(tr._conn_lost, 1)
+
+ def test_force_close_idempotent(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._closing = True
+ tr._force_close(None)
+ test_utils.run_briefly(self.loop)
+ self.assertFalse(self.protocol.connection_lost.called)
+
+ def test_fatal_error_2(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._buffer = [b'data']
+ tr._force_close(None)
+
+ test_utils.run_briefly(self.loop)
+ self.protocol.connection_lost.assert_called_with(None)
+ self.assertEqual(None, tr._buffer)
+
+ def test_call_connection_lost(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ tr._call_connection_lost(None)
+ self.assertTrue(self.protocol.connection_lost.called)
+ self.assertTrue(self.sock.close.called)
+
+ def test_write_eof(self):
+ tr = _ProactorSocketTransport(
+ self.loop, self.sock, self.protocol)
+ self.assertTrue(tr.can_write_eof())
+ tr.write_eof()
+ self.sock.shutdown.assert_called_with(socket.SHUT_WR)
+ tr.write_eof()
+ self.assertEqual(self.sock.shutdown.call_count, 1)
+ tr.close()
+
+ def test_write_eof_buffer(self):
+ tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
+ f = asyncio.Future(loop=self.loop)
+ tr._loop._proactor.send.return_value = f
+ tr.write(b'data')
+ tr.write_eof()
+ self.assertTrue(tr._eof_written)
+ self.assertFalse(self.sock.shutdown.called)
+ tr._loop._proactor.send.assert_called_with(self.sock, b'data')
+ f.set_result(4)
+ self.loop._run_once()
+ self.sock.shutdown.assert_called_with(socket.SHUT_WR)
+ tr.close()
+
+ def test_write_eof_write_pipe(self):
+ tr = _ProactorWritePipeTransport(
+ self.loop, self.sock, self.protocol)
+ self.assertTrue(tr.can_write_eof())
+ tr.write_eof()
+ self.assertTrue(tr._closing)
+ self.loop._run_once()
+ self.assertTrue(self.sock.close.called)
+ tr.close()
+
+ def test_write_eof_buffer_write_pipe(self):
+ tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
+ f = asyncio.Future(loop=self.loop)
+ tr._loop._proactor.send.return_value = f
+ tr.write(b'data')
+ tr.write_eof()
+ self.assertTrue(tr._closing)
+ self.assertFalse(self.sock.shutdown.called)
+ tr._loop._proactor.send.assert_called_with(self.sock, b'data')
+ f.set_result(4)
+ self.loop._run_once()
+ self.loop._run_once()
+ self.assertTrue(self.sock.close.called)
+ tr.close()
+
+ def test_write_eof_duplex_pipe(self):
+ tr = _ProactorDuplexPipeTransport(
+ self.loop, self.sock, self.protocol)
+ self.assertFalse(tr.can_write_eof())
+ with self.assertRaises(NotImplementedError):
+ tr.write_eof()
+ tr.close()
+
+ def test_pause_resume_reading(self):
+ tr = _ProactorSocketTransport(
+ self.loop, self.sock, self.protocol)
+ futures = []
+ for msg in [b'data1', b'data2', b'data3', b'data4', b'']:
+ f = asyncio.Future(loop=self.loop)
+ f.set_result(msg)
+ futures.append(f)
+ self.loop._proactor.recv.side_effect = futures
+ self.loop._run_once()
+ self.assertFalse(tr._paused)
+ self.loop._run_once()
+ self.protocol.data_received.assert_called_with(b'data1')
+ self.loop._run_once()
+ self.protocol.data_received.assert_called_with(b'data2')
+ tr.pause_reading()
+ self.assertTrue(tr._paused)
+ for i in range(10):
+ self.loop._run_once()
+ self.protocol.data_received.assert_called_with(b'data2')
+ tr.resume_reading()
+ self.assertFalse(tr._paused)
+ self.loop._run_once()
+ self.protocol.data_received.assert_called_with(b'data3')
+ self.loop._run_once()
+ self.protocol.data_received.assert_called_with(b'data4')
+ tr.close()
+
+
+class BaseProactorEventLoopTests(unittest.TestCase):
+
+ def setUp(self):
+ self.sock = unittest.mock.Mock(socket.socket)
+ self.proactor = unittest.mock.Mock()
+
+ self.ssock, self.csock = unittest.mock.Mock(), unittest.mock.Mock()
+
+ class EventLoop(BaseProactorEventLoop):
+ def _socketpair(s):
+ return (self.ssock, self.csock)
+
+ self.loop = EventLoop(self.proactor)
+
+ @unittest.mock.patch.object(BaseProactorEventLoop, 'call_soon')
+ @unittest.mock.patch.object(BaseProactorEventLoop, '_socketpair')
+ def test_ctor(self, socketpair, call_soon):
+ ssock, csock = socketpair.return_value = (
+ unittest.mock.Mock(), unittest.mock.Mock())
+ loop = BaseProactorEventLoop(self.proactor)
+ self.assertIs(loop._ssock, ssock)
+ self.assertIs(loop._csock, csock)
+ self.assertEqual(loop._internal_fds, 1)
+ call_soon.assert_called_with(loop._loop_self_reading)
+
+ def test_close_self_pipe(self):
+ self.loop._close_self_pipe()
+ self.assertEqual(self.loop._internal_fds, 0)
+ self.assertTrue(self.ssock.close.called)
+ self.assertTrue(self.csock.close.called)
+ self.assertIsNone(self.loop._ssock)
+ self.assertIsNone(self.loop._csock)
+
+ def test_close(self):
+ self.loop._close_self_pipe = unittest.mock.Mock()
+ self.loop.close()
+ self.assertTrue(self.loop._close_self_pipe.called)
+ self.assertTrue(self.proactor.close.called)
+ self.assertIsNone(self.loop._proactor)
+
+ self.loop._close_self_pipe.reset_mock()
+ self.loop.close()
+ self.assertFalse(self.loop._close_self_pipe.called)
+
+ def test_sock_recv(self):
+ self.loop.sock_recv(self.sock, 1024)
+ self.proactor.recv.assert_called_with(self.sock, 1024)
+
+ def test_sock_sendall(self):
+ self.loop.sock_sendall(self.sock, b'data')
+ self.proactor.send.assert_called_with(self.sock, b'data')
+
+ def test_sock_connect(self):
+ self.loop.sock_connect(self.sock, 123)
+ self.proactor.connect.assert_called_with(self.sock, 123)
+
+ def test_sock_accept(self):
+ self.loop.sock_accept(self.sock)
+ self.proactor.accept.assert_called_with(self.sock)
+
+ def test_socketpair(self):
+ self.assertRaises(
+ NotImplementedError, BaseProactorEventLoop, self.proactor)
+
+ def test_make_socket_transport(self):
+ tr = self.loop._make_socket_transport(self.sock, unittest.mock.Mock())
+ self.assertIsInstance(tr, _ProactorSocketTransport)
+
+ def test_loop_self_reading(self):
+ self.loop._loop_self_reading()
+ self.proactor.recv.assert_called_with(self.ssock, 4096)
+ self.proactor.recv.return_value.add_done_callback.assert_called_with(
+ self.loop._loop_self_reading)
+
+ def test_loop_self_reading_fut(self):
+ fut = unittest.mock.Mock()
+ self.loop._loop_self_reading(fut)
+ self.assertTrue(fut.result.called)
+ self.proactor.recv.assert_called_with(self.ssock, 4096)
+ self.proactor.recv.return_value.add_done_callback.assert_called_with(
+ self.loop._loop_self_reading)
+
+ def test_loop_self_reading_exception(self):
+ self.loop.close = unittest.mock.Mock()
+ self.proactor.recv.side_effect = OSError()
+ self.assertRaises(OSError, self.loop._loop_self_reading)
+ self.assertTrue(self.loop.close.called)
+
+ def test_write_to_self(self):
+ self.loop._write_to_self()
+ self.csock.send.assert_called_with(b'x')
+
+ def test_process_events(self):
+ self.loop._process_events([])
+
+ @unittest.mock.patch('asyncio.proactor_events.logger')
+ def test_create_server(self, m_log):
+ pf = unittest.mock.Mock()
+ call_soon = self.loop.call_soon = unittest.mock.Mock()
+
+ self.loop._start_serving(pf, self.sock)
+ self.assertTrue(call_soon.called)
+
+ # callback
+ loop = call_soon.call_args[0][0]
+ loop()
+ self.proactor.accept.assert_called_with(self.sock)
+
+ # conn
+ fut = unittest.mock.Mock()
+ fut.result.return_value = (unittest.mock.Mock(), unittest.mock.Mock())
+
+ make_tr = self.loop._make_socket_transport = unittest.mock.Mock()
+ loop(fut)
+ self.assertTrue(fut.result.called)
+ self.assertTrue(make_tr.called)
+
+ # exception
+ fut.result.side_effect = OSError()
+ loop(fut)
+ self.assertTrue(self.sock.close.called)
+ self.assertTrue(m_log.exception.called)
+
+ def test_create_server_cancel(self):
+ pf = unittest.mock.Mock()
+ call_soon = self.loop.call_soon = unittest.mock.Mock()
+
+ self.loop._start_serving(pf, self.sock)
+ loop = call_soon.call_args[0][0]
+
+ # cancelled
+ fut = asyncio.Future(loop=self.loop)
+ fut.cancel()
+ loop(fut)
+ self.assertTrue(self.sock.close.called)
+
+ def test_stop_serving(self):
+ sock = unittest.mock.Mock()
+ self.loop._stop_serving(sock)
+ self.assertTrue(sock.close.called)
+ self.proactor._stop_serving.assert_called_with(sock)
+
+
+if __name__ == '__main__':
+ unittest.main()