diff options
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 116 |
1 files changed, 105 insertions, 11 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index db074cab15..1944a04573 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -203,6 +203,9 @@ class MockUnseekableIO: def tell(self, *args): raise self.UnsupportedOperation("not seekable") + def truncate(self, *args): + raise self.UnsupportedOperation("not seekable") + class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): UnsupportedOperation = io.UnsupportedOperation @@ -361,6 +364,107 @@ class IOTest(unittest.TestCase): self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) self.assertRaises(exc, fp.seek, -1, self.SEEK_END) + def test_optional_abilities(self): + # Test for OSError when optional APIs are not supported + # The purpose of this test is to try fileno(), reading, writing and + # seeking operations with various objects that indicate they do not + # support these operations. + + def pipe_reader(): + [r, w] = os.pipe() + os.close(w) # So that read() is harmless + return self.FileIO(r, "r") + + def pipe_writer(): + [r, w] = os.pipe() + self.addCleanup(os.close, r) + # Guarantee that we can write into the pipe without blocking + thread = threading.Thread(target=os.read, args=(r, 100)) + thread.start() + self.addCleanup(thread.join) + return self.FileIO(w, "w") + + def buffered_reader(): + return self.BufferedReader(self.MockUnseekableIO()) + + def buffered_writer(): + return self.BufferedWriter(self.MockUnseekableIO()) + + def buffered_random(): + return self.BufferedRandom(self.BytesIO()) + + def buffered_rw_pair(): + return self.BufferedRWPair(self.MockUnseekableIO(), + self.MockUnseekableIO()) + + def text_reader(): + class UnseekableReader(self.MockUnseekableIO): + writable = self.BufferedIOBase.writable + write = self.BufferedIOBase.write + return self.TextIOWrapper(UnseekableReader(), "ascii") + + def text_writer(): + class UnseekableWriter(self.MockUnseekableIO): + readable = self.BufferedIOBase.readable + read = self.BufferedIOBase.read + return self.TextIOWrapper(UnseekableWriter(), "ascii") + + tests = ( + (pipe_reader, "fr"), (pipe_writer, "fw"), + (buffered_reader, "r"), (buffered_writer, "w"), + (buffered_random, "rws"), (buffered_rw_pair, "rw"), + (text_reader, "r"), (text_writer, "w"), + (self.BytesIO, "rws"), (self.StringIO, "rws"), + ) + for [test, abilities] in tests: + if test is pipe_writer and not threading: + continue # Skip subtest that uses a background thread + with self.subTest(test), test() as obj: + readable = "r" in abilities + self.assertEqual(obj.readable(), readable) + writable = "w" in abilities + self.assertEqual(obj.writable(), writable) + seekable = "s" in abilities + self.assertEqual(obj.seekable(), seekable) + + if isinstance(obj, self.TextIOBase): + data = "3" + elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): + data = b"3" + else: + self.fail("Unknown base class") + + if "f" in abilities: + obj.fileno() + else: + self.assertRaises(OSError, obj.fileno) + + if readable: + obj.read(1) + obj.read() + else: + self.assertRaises(OSError, obj.read, 1) + self.assertRaises(OSError, obj.read) + + if writable: + obj.write(data) + else: + self.assertRaises(OSError, obj.write, data) + + if seekable: + obj.tell() + obj.seek(0) + else: + self.assertRaises(OSError, obj.tell) + self.assertRaises(OSError, obj.seek, 0) + + if writable and seekable: + obj.truncate() + obj.truncate(0) + else: + self.assertRaises(OSError, obj.truncate) + self.assertRaises(OSError, obj.truncate, 0) + def test_open_handles_NUL_chars(self): fn_with_NUL = 'foo\0bar' self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') @@ -751,12 +855,6 @@ class CommonBufferedTests: self.assertEqual(42, bufio.fileno()) - @unittest.skip('test having existential crisis') - def test_no_fileno(self): - # XXX will we always have fileno() function? If so, kill - # this test. Else, write it. - pass - def test_invalid_args(self): rawio = self.MockRawIO() bufio = self.tp(rawio) @@ -784,13 +882,9 @@ class CommonBufferedTests: super().flush() rawio = self.MockRawIO() bufio = MyBufferedIO(rawio) - writable = bufio.writable() del bufio support.gc_collect() - if writable: - self.assertEqual(record, [1, 2, 3]) - else: - self.assertEqual(record, [1, 2]) + self.assertEqual(record, [1, 2, 3]) def test_context_manager(self): # Test usability as a context manager |