summaryrefslogtreecommitdiff
path: root/Lib/test/test_devpoll.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_devpoll.py')
-rw-r--r--Lib/test/test_devpoll.py94
1 files changed, 94 insertions, 0 deletions
diff --git a/Lib/test/test_devpoll.py b/Lib/test/test_devpoll.py
new file mode 100644
index 0000000000..bef4e1846b
--- /dev/null
+++ b/Lib/test/test_devpoll.py
@@ -0,0 +1,94 @@
+# Test case for the select.devpoll() function
+
+# Initial tests are copied as is from "test_poll.py"
+
+import os, select, random, unittest, sys
+from test.support import TESTFN, run_unittest
+
+try:
+ select.devpoll
+except AttributeError:
+ raise unittest.SkipTest("select.devpoll not defined -- skipping test_devpoll")
+
+
+def find_ready_matching(ready, flag):
+ match = []
+ for fd, mode in ready:
+ if mode & flag:
+ match.append(fd)
+ return match
+
+class DevPollTests(unittest.TestCase):
+
+ def test_devpoll1(self):
+ # Basic functional test of poll object
+ # Create a bunch of pipe and test that poll works with them.
+
+ p = select.devpoll()
+
+ NUM_PIPES = 12
+ MSG = b" This is a test."
+ MSG_LEN = len(MSG)
+ readers = []
+ writers = []
+ r2w = {}
+ w2r = {}
+
+ for i in range(NUM_PIPES):
+ rd, wr = os.pipe()
+ p.register(rd)
+ p.modify(rd, select.POLLIN)
+ p.register(wr, select.POLLOUT)
+ readers.append(rd)
+ writers.append(wr)
+ r2w[rd] = wr
+ w2r[wr] = rd
+
+ bufs = []
+
+ while writers:
+ ready = p.poll()
+ ready_writers = find_ready_matching(ready, select.POLLOUT)
+ if not ready_writers:
+ self.fail("no pipes ready for writing")
+ wr = random.choice(ready_writers)
+ os.write(wr, MSG)
+
+ ready = p.poll()
+ ready_readers = find_ready_matching(ready, select.POLLIN)
+ if not ready_readers:
+ self.fail("no pipes ready for reading")
+ self.assertEqual([w2r[wr]], ready_readers)
+ rd = ready_readers[0]
+ buf = os.read(rd, MSG_LEN)
+ self.assertEqual(len(buf), MSG_LEN)
+ bufs.append(buf)
+ os.close(r2w[rd]) ; os.close(rd)
+ p.unregister(r2w[rd])
+ p.unregister(rd)
+ writers.remove(r2w[rd])
+
+ self.assertEqual(bufs, [MSG] * NUM_PIPES)
+
+ def test_timeout_overflow(self):
+ pollster = select.devpoll()
+ w, r = os.pipe()
+ pollster.register(w)
+
+ pollster.poll(-1)
+ self.assertRaises(OverflowError, pollster.poll, -2)
+ self.assertRaises(OverflowError, pollster.poll, -1 << 31)
+ self.assertRaises(OverflowError, pollster.poll, -1 << 64)
+
+ pollster.poll(0)
+ pollster.poll(1)
+ pollster.poll(1 << 30)
+ self.assertRaises(OverflowError, pollster.poll, 1 << 31)
+ self.assertRaises(OverflowError, pollster.poll, 1 << 63)
+ self.assertRaises(OverflowError, pollster.poll, 1 << 64)
+
+def test_main():
+ run_unittest(DevPollTests)
+
+if __name__ == '__main__':
+ test_main()