diff options
Diffstat (limited to 'Lib/test/test_telnetlib.py')
-rw-r--r-- | Lib/test/test_telnetlib.py | 101 |
1 files changed, 22 insertions, 79 deletions
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py index c9f2ccb462..ba33064623 100644 --- a/Lib/test/test_telnetlib.py +++ b/Lib/test/test_telnetlib.py @@ -1,10 +1,9 @@ import socket -import select +import selectors import telnetlib import time import contextlib -import unittest from unittest import TestCase from test import support threading = support.import_module('threading') @@ -112,40 +111,32 @@ class TelnetAlike(telnetlib.Telnet): self._messages += out.getvalue() return -def mock_select(*s_args): - block = False - for l in s_args: - for fob in l: - if isinstance(fob, TelnetAlike): - block = fob.sock.block - if block: - return [[], [], []] - else: - return s_args - -class MockPoller(object): - test_case = None # Set during TestCase setUp. +class MockSelector(selectors.BaseSelector): def __init__(self): - self._file_objs = [] + super().__init__() + self.keys = {} + + def register(self, fileobj, events, data=None): + key = selectors.SelectorKey(fileobj, 0, events, data) + self.keys[fileobj] = key + return key - def register(self, fd, eventmask): - self.test_case.assertTrue(hasattr(fd, 'fileno'), fd) - self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI) - self._file_objs.append(fd) + def unregister(self, fileobj): + key = self.keys.pop(fileobj) + return key - def poll(self, timeout=None): + def select(self, timeout=None): block = False - for fob in self._file_objs: - if isinstance(fob, TelnetAlike): - block = fob.sock.block + for fileobj in self.keys: + if isinstance(fileobj, TelnetAlike): + block = fileobj.sock.block + break if block: return [] else: - return zip(self._file_objs, [select.POLLIN]*len(self._file_objs)) + return [(key, key.events) for key in self.keys.values()] - def unregister(self, fd): - self._file_objs.remove(fd) @contextlib.contextmanager def test_socket(reads): @@ -159,7 +150,7 @@ def test_socket(reads): socket.create_connection = old_conn return -def test_telnet(reads=(), cls=TelnetAlike, use_poll=None): +def test_telnet(reads=(), cls=TelnetAlike): ''' return a telnetlib.Telnet object that uses a SocketStub with reads queued up to be read ''' for x in reads: @@ -167,29 +158,14 @@ def test_telnet(reads=(), cls=TelnetAlike, use_poll=None): with test_socket(reads): telnet = cls('dummy', 0) telnet._messages = '' # debuglevel output - if use_poll is not None: - if use_poll and not telnet._has_poll: - raise unittest.SkipTest('select.poll() required.') - telnet._has_poll = use_poll return telnet - class ExpectAndReadTestCase(TestCase): def setUp(self): - self.old_select = select.select - select.select = mock_select - self.old_poll = False - if hasattr(select, 'poll'): - self.old_poll = select.poll - select.poll = MockPoller - MockPoller.test_case = self - + self.old_selector = telnetlib._TelnetSelector + telnetlib._TelnetSelector = MockSelector def tearDown(self): - if self.old_poll: - MockPoller.test_case = None - select.poll = self.old_poll - select.select = self.old_select - + telnetlib._TelnetSelector = self.old_selector class ReadTests(ExpectAndReadTestCase): def test_read_until(self): @@ -208,22 +184,6 @@ class ReadTests(ExpectAndReadTestCase): data = telnet.read_until(b'match') self.assertEqual(data, expect) - def test_read_until_with_poll(self): - """Use select.poll() to implement telnet.read_until().""" - want = [b'x' * 10, b'match', b'y' * 10] - telnet = test_telnet(want, use_poll=True) - select.select = lambda *_: self.fail('unexpected select() call.') - data = telnet.read_until(b'match') - self.assertEqual(data, b''.join(want[:-1])) - - def test_read_until_with_select(self): - """Use select.select() to implement telnet.read_until().""" - want = [b'x' * 10, b'match', b'y' * 10] - telnet = test_telnet(want, use_poll=False) - if self.old_poll: - select.poll = lambda *_: self.fail('unexpected poll() call.') - data = telnet.read_until(b'match') - self.assertEqual(data, b''.join(want[:-1])) def test_read_all(self): """ @@ -427,23 +387,6 @@ class ExpectTests(ExpectAndReadTestCase): (_,_,data) = telnet.expect([b'match']) self.assertEqual(data, b''.join(want[:-1])) - def test_expect_with_poll(self): - """Use select.poll() to implement telnet.expect().""" - want = [b'x' * 10, b'match', b'y' * 10] - telnet = test_telnet(want, use_poll=True) - select.select = lambda *_: self.fail('unexpected select() call.') - (_,_,data) = telnet.expect([b'match']) - self.assertEqual(data, b''.join(want[:-1])) - - def test_expect_with_select(self): - """Use select.select() to implement telnet.expect().""" - want = [b'x' * 10, b'match', b'y' * 10] - telnet = test_telnet(want, use_poll=False) - if self.old_poll: - select.poll = lambda *_: self.fail('unexpected poll() call.') - (_,_,data) = telnet.expect([b'match']) - self.assertEqual(data, b''.join(want[:-1])) - def test_main(verbose=None): support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests, |