diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2013-12-14 20:42:22 +0200 |
---|---|---|
committer | Serhiy Storchaka <storchaka@gmail.com> | 2013-12-14 20:42:22 +0200 |
commit | 8ad7d762f53e094d9b88127b6cf09e38ea565e96 (patch) | |
tree | 6e2b04cdd658ad01c55640ed8cf8b52121f0e31d /Lib/test/test_asyncio/test_proactor_events.py | |
parent | 043c30ba7dfbd17596a59f6ac6afe508a2e04fbd (diff) | |
parent | a566549211d48775b6573e74dc55dd0fcde0600b (diff) | |
download | cpython-8ad7d762f53e094d9b88127b6cf09e38ea565e96.tar.gz |
Issue #19623: Fixed writing to unseekable files in the aifc module.
Diffstat (limited to 'Lib/test/test_asyncio/test_proactor_events.py')
-rw-r--r-- | Lib/test/test_asyncio/test_proactor_events.py | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py new file mode 100644 index 0000000000..9964f425d2 --- /dev/null +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -0,0 +1,484 @@ +"""Tests for proactor_events.py""" + +import socket +import unittest +import unittest.mock + +import asyncio +from asyncio.proactor_events import BaseProactorEventLoop +from asyncio.proactor_events import _ProactorSocketTransport +from asyncio.proactor_events import _ProactorWritePipeTransport +from asyncio.proactor_events import _ProactorDuplexPipeTransport +from asyncio import test_utils + + +class ProactorSocketTransportTests(unittest.TestCase): + + def setUp(self): + self.loop = test_utils.TestLoop() + self.proactor = unittest.mock.Mock() + self.loop._proactor = self.proactor + self.protocol = test_utils.make_test_protocol(asyncio.Protocol) + self.sock = unittest.mock.Mock(socket.socket) + + def test_ctor(self): + fut = asyncio.Future(loop=self.loop) + tr = _ProactorSocketTransport( + self.loop, self.sock, self.protocol, fut) + test_utils.run_briefly(self.loop) + self.assertIsNone(fut.result()) + self.protocol.connection_made(tr) + self.proactor.recv.assert_called_with(self.sock, 4096) + + def test_loop_reading(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._loop_reading() + self.loop._proactor.recv.assert_called_with(self.sock, 4096) + self.assertFalse(self.protocol.data_received.called) + self.assertFalse(self.protocol.eof_received.called) + + def test_loop_reading_data(self): + res = asyncio.Future(loop=self.loop) + res.set_result(b'data') + + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + + tr._read_fut = res + tr._loop_reading(res) + self.loop._proactor.recv.assert_called_with(self.sock, 4096) + self.protocol.data_received.assert_called_with(b'data') + + def test_loop_reading_no_data(self): + res = asyncio.Future(loop=self.loop) + res.set_result(b'') + + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + + self.assertRaises(AssertionError, tr._loop_reading, res) + + tr.close = unittest.mock.Mock() + tr._read_fut = res + tr._loop_reading(res) + self.assertFalse(self.loop._proactor.recv.called) + self.assertTrue(self.protocol.eof_received.called) + self.assertTrue(tr.close.called) + + def test_loop_reading_aborted(self): + err = self.loop._proactor.recv.side_effect = ConnectionAbortedError() + + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._fatal_error = unittest.mock.Mock() + tr._loop_reading() + tr._fatal_error.assert_called_with(err) + + def test_loop_reading_aborted_closing(self): + self.loop._proactor.recv.side_effect = ConnectionAbortedError() + + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._closing = True + tr._fatal_error = unittest.mock.Mock() + tr._loop_reading() + self.assertFalse(tr._fatal_error.called) + + def test_loop_reading_aborted_is_fatal(self): + self.loop._proactor.recv.side_effect = ConnectionAbortedError() + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._closing = False + tr._fatal_error = unittest.mock.Mock() + tr._loop_reading() + self.assertTrue(tr._fatal_error.called) + + def test_loop_reading_conn_reset_lost(self): + err = self.loop._proactor.recv.side_effect = ConnectionResetError() + + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._closing = False + tr._fatal_error = unittest.mock.Mock() + tr._force_close = unittest.mock.Mock() + tr._loop_reading() + self.assertFalse(tr._fatal_error.called) + tr._force_close.assert_called_with(err) + + def test_loop_reading_exception(self): + err = self.loop._proactor.recv.side_effect = (OSError()) + + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._fatal_error = unittest.mock.Mock() + tr._loop_reading() + tr._fatal_error.assert_called_with(err) + + def test_write(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._loop_writing = unittest.mock.Mock() + tr.write(b'data') + self.assertEqual(tr._buffer, None) + tr._loop_writing.assert_called_with(data=b'data') + + def test_write_no_data(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr.write(b'') + self.assertFalse(tr._buffer) + + def test_write_more(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._write_fut = unittest.mock.Mock() + tr._loop_writing = unittest.mock.Mock() + tr.write(b'data') + self.assertEqual(tr._buffer, b'data') + self.assertFalse(tr._loop_writing.called) + + def test_loop_writing(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._buffer = bytearray(b'data') + tr._loop_writing() + self.loop._proactor.send.assert_called_with(self.sock, b'data') + self.loop._proactor.send.return_value.add_done_callback.\ + assert_called_with(tr._loop_writing) + + @unittest.mock.patch('asyncio.proactor_events.logger') + def test_loop_writing_err(self, m_log): + err = self.loop._proactor.send.side_effect = OSError() + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._fatal_error = unittest.mock.Mock() + tr._buffer = [b'da', b'ta'] + tr._loop_writing() + tr._fatal_error.assert_called_with(err) + tr._conn_lost = 1 + + tr.write(b'data') + tr.write(b'data') + tr.write(b'data') + tr.write(b'data') + tr.write(b'data') + self.assertEqual(tr._buffer, None) + m_log.warning.assert_called_with('socket.send() raised exception.') + + def test_loop_writing_stop(self): + fut = asyncio.Future(loop=self.loop) + fut.set_result(b'data') + + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._write_fut = fut + tr._loop_writing(fut) + self.assertIsNone(tr._write_fut) + + def test_loop_writing_closing(self): + fut = asyncio.Future(loop=self.loop) + fut.set_result(1) + + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._write_fut = fut + tr.close() + tr._loop_writing(fut) + self.assertIsNone(tr._write_fut) + test_utils.run_briefly(self.loop) + self.protocol.connection_lost.assert_called_with(None) + + def test_abort(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._force_close = unittest.mock.Mock() + tr.abort() + tr._force_close.assert_called_with(None) + + def test_close(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr.close() + test_utils.run_briefly(self.loop) + self.protocol.connection_lost.assert_called_with(None) + self.assertTrue(tr._closing) + self.assertEqual(tr._conn_lost, 1) + + self.protocol.connection_lost.reset_mock() + tr.close() + test_utils.run_briefly(self.loop) + self.assertFalse(self.protocol.connection_lost.called) + + def test_close_write_fut(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._write_fut = unittest.mock.Mock() + tr.close() + test_utils.run_briefly(self.loop) + self.assertFalse(self.protocol.connection_lost.called) + + def test_close_buffer(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._buffer = [b'data'] + tr.close() + test_utils.run_briefly(self.loop) + self.assertFalse(self.protocol.connection_lost.called) + + @unittest.mock.patch('asyncio.proactor_events.logger') + def test_fatal_error(self, m_logging): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._force_close = unittest.mock.Mock() + tr._fatal_error(None) + self.assertTrue(tr._force_close.called) + self.assertTrue(m_logging.exception.called) + + def test_force_close(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._buffer = [b'data'] + read_fut = tr._read_fut = unittest.mock.Mock() + write_fut = tr._write_fut = unittest.mock.Mock() + tr._force_close(None) + + read_fut.cancel.assert_called_with() + write_fut.cancel.assert_called_with() + test_utils.run_briefly(self.loop) + self.protocol.connection_lost.assert_called_with(None) + self.assertEqual(None, tr._buffer) + self.assertEqual(tr._conn_lost, 1) + + def test_force_close_idempotent(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._closing = True + tr._force_close(None) + test_utils.run_briefly(self.loop) + self.assertFalse(self.protocol.connection_lost.called) + + def test_fatal_error_2(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._buffer = [b'data'] + tr._force_close(None) + + test_utils.run_briefly(self.loop) + self.protocol.connection_lost.assert_called_with(None) + self.assertEqual(None, tr._buffer) + + def test_call_connection_lost(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + tr._call_connection_lost(None) + self.assertTrue(self.protocol.connection_lost.called) + self.assertTrue(self.sock.close.called) + + def test_write_eof(self): + tr = _ProactorSocketTransport( + self.loop, self.sock, self.protocol) + self.assertTrue(tr.can_write_eof()) + tr.write_eof() + self.sock.shutdown.assert_called_with(socket.SHUT_WR) + tr.write_eof() + self.assertEqual(self.sock.shutdown.call_count, 1) + tr.close() + + def test_write_eof_buffer(self): + tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol) + f = asyncio.Future(loop=self.loop) + tr._loop._proactor.send.return_value = f + tr.write(b'data') + tr.write_eof() + self.assertTrue(tr._eof_written) + self.assertFalse(self.sock.shutdown.called) + tr._loop._proactor.send.assert_called_with(self.sock, b'data') + f.set_result(4) + self.loop._run_once() + self.sock.shutdown.assert_called_with(socket.SHUT_WR) + tr.close() + + def test_write_eof_write_pipe(self): + tr = _ProactorWritePipeTransport( + self.loop, self.sock, self.protocol) + self.assertTrue(tr.can_write_eof()) + tr.write_eof() + self.assertTrue(tr._closing) + self.loop._run_once() + self.assertTrue(self.sock.close.called) + tr.close() + + def test_write_eof_buffer_write_pipe(self): + tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol) + f = asyncio.Future(loop=self.loop) + tr._loop._proactor.send.return_value = f + tr.write(b'data') + tr.write_eof() + self.assertTrue(tr._closing) + self.assertFalse(self.sock.shutdown.called) + tr._loop._proactor.send.assert_called_with(self.sock, b'data') + f.set_result(4) + self.loop._run_once() + self.loop._run_once() + self.assertTrue(self.sock.close.called) + tr.close() + + def test_write_eof_duplex_pipe(self): + tr = _ProactorDuplexPipeTransport( + self.loop, self.sock, self.protocol) + self.assertFalse(tr.can_write_eof()) + with self.assertRaises(NotImplementedError): + tr.write_eof() + tr.close() + + def test_pause_resume_reading(self): + tr = _ProactorSocketTransport( + self.loop, self.sock, self.protocol) + futures = [] + for msg in [b'data1', b'data2', b'data3', b'data4', b'']: + f = asyncio.Future(loop=self.loop) + f.set_result(msg) + futures.append(f) + self.loop._proactor.recv.side_effect = futures + self.loop._run_once() + self.assertFalse(tr._paused) + self.loop._run_once() + self.protocol.data_received.assert_called_with(b'data1') + self.loop._run_once() + self.protocol.data_received.assert_called_with(b'data2') + tr.pause_reading() + self.assertTrue(tr._paused) + for i in range(10): + self.loop._run_once() + self.protocol.data_received.assert_called_with(b'data2') + tr.resume_reading() + self.assertFalse(tr._paused) + self.loop._run_once() + self.protocol.data_received.assert_called_with(b'data3') + self.loop._run_once() + self.protocol.data_received.assert_called_with(b'data4') + tr.close() + + +class BaseProactorEventLoopTests(unittest.TestCase): + + def setUp(self): + self.sock = unittest.mock.Mock(socket.socket) + self.proactor = unittest.mock.Mock() + + self.ssock, self.csock = unittest.mock.Mock(), unittest.mock.Mock() + + class EventLoop(BaseProactorEventLoop): + def _socketpair(s): + return (self.ssock, self.csock) + + self.loop = EventLoop(self.proactor) + + @unittest.mock.patch.object(BaseProactorEventLoop, 'call_soon') + @unittest.mock.patch.object(BaseProactorEventLoop, '_socketpair') + def test_ctor(self, socketpair, call_soon): + ssock, csock = socketpair.return_value = ( + unittest.mock.Mock(), unittest.mock.Mock()) + loop = BaseProactorEventLoop(self.proactor) + self.assertIs(loop._ssock, ssock) + self.assertIs(loop._csock, csock) + self.assertEqual(loop._internal_fds, 1) + call_soon.assert_called_with(loop._loop_self_reading) + + def test_close_self_pipe(self): + self.loop._close_self_pipe() + self.assertEqual(self.loop._internal_fds, 0) + self.assertTrue(self.ssock.close.called) + self.assertTrue(self.csock.close.called) + self.assertIsNone(self.loop._ssock) + self.assertIsNone(self.loop._csock) + + def test_close(self): + self.loop._close_self_pipe = unittest.mock.Mock() + self.loop.close() + self.assertTrue(self.loop._close_self_pipe.called) + self.assertTrue(self.proactor.close.called) + self.assertIsNone(self.loop._proactor) + + self.loop._close_self_pipe.reset_mock() + self.loop.close() + self.assertFalse(self.loop._close_self_pipe.called) + + def test_sock_recv(self): + self.loop.sock_recv(self.sock, 1024) + self.proactor.recv.assert_called_with(self.sock, 1024) + + def test_sock_sendall(self): + self.loop.sock_sendall(self.sock, b'data') + self.proactor.send.assert_called_with(self.sock, b'data') + + def test_sock_connect(self): + self.loop.sock_connect(self.sock, 123) + self.proactor.connect.assert_called_with(self.sock, 123) + + def test_sock_accept(self): + self.loop.sock_accept(self.sock) + self.proactor.accept.assert_called_with(self.sock) + + def test_socketpair(self): + self.assertRaises( + NotImplementedError, BaseProactorEventLoop, self.proactor) + + def test_make_socket_transport(self): + tr = self.loop._make_socket_transport(self.sock, unittest.mock.Mock()) + self.assertIsInstance(tr, _ProactorSocketTransport) + + def test_loop_self_reading(self): + self.loop._loop_self_reading() + self.proactor.recv.assert_called_with(self.ssock, 4096) + self.proactor.recv.return_value.add_done_callback.assert_called_with( + self.loop._loop_self_reading) + + def test_loop_self_reading_fut(self): + fut = unittest.mock.Mock() + self.loop._loop_self_reading(fut) + self.assertTrue(fut.result.called) + self.proactor.recv.assert_called_with(self.ssock, 4096) + self.proactor.recv.return_value.add_done_callback.assert_called_with( + self.loop._loop_self_reading) + + def test_loop_self_reading_exception(self): + self.loop.close = unittest.mock.Mock() + self.proactor.recv.side_effect = OSError() + self.assertRaises(OSError, self.loop._loop_self_reading) + self.assertTrue(self.loop.close.called) + + def test_write_to_self(self): + self.loop._write_to_self() + self.csock.send.assert_called_with(b'x') + + def test_process_events(self): + self.loop._process_events([]) + + @unittest.mock.patch('asyncio.proactor_events.logger') + def test_create_server(self, m_log): + pf = unittest.mock.Mock() + call_soon = self.loop.call_soon = unittest.mock.Mock() + + self.loop._start_serving(pf, self.sock) + self.assertTrue(call_soon.called) + + # callback + loop = call_soon.call_args[0][0] + loop() + self.proactor.accept.assert_called_with(self.sock) + + # conn + fut = unittest.mock.Mock() + fut.result.return_value = (unittest.mock.Mock(), unittest.mock.Mock()) + + make_tr = self.loop._make_socket_transport = unittest.mock.Mock() + loop(fut) + self.assertTrue(fut.result.called) + self.assertTrue(make_tr.called) + + # exception + fut.result.side_effect = OSError() + loop(fut) + self.assertTrue(self.sock.close.called) + self.assertTrue(m_log.exception.called) + + def test_create_server_cancel(self): + pf = unittest.mock.Mock() + call_soon = self.loop.call_soon = unittest.mock.Mock() + + self.loop._start_serving(pf, self.sock) + loop = call_soon.call_args[0][0] + + # cancelled + fut = asyncio.Future(loop=self.loop) + fut.cancel() + loop(fut) + self.assertTrue(self.sock.close.called) + + def test_stop_serving(self): + sock = unittest.mock.Mock() + self.loop._stop_serving(sock) + self.assertTrue(sock.close.called) + self.proactor._stop_serving.assert_called_with(sock) + + +if __name__ == '__main__': + unittest.main() |