summaryrefslogtreecommitdiff
path: root/Lib/test/test_sched.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_sched.py')
-rw-r--r--Lib/test/test_sched.py137
1 files changed, 129 insertions, 8 deletions
diff --git a/Lib/test/test_sched.py b/Lib/test/test_sched.py
index 91b8f0c939..79fa7d3e3d 100644
--- a/Lib/test/test_sched.py
+++ b/Lib/test/test_sched.py
@@ -1,9 +1,42 @@
-#!/usr/bin/env python
-
+import queue
import sched
import time
import unittest
from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+
+TIMEOUT = 10
+
+
+class Timer:
+ def __init__(self):
+ self._cond = threading.Condition()
+ self._time = 0
+ self._stop = 0
+
+ def time(self):
+ with self._cond:
+ return self._time
+
+ # increase the time but not beyond the established limit
+ def sleep(self, t):
+ assert t >= 0
+ with self._cond:
+ t += self._time
+ while self._stop < t:
+ self._time = self._stop
+ self._cond.wait()
+ self._time = t
+
+ # advance time limit for user code
+ def advance(self, t):
+ assert t >= 0
+ with self._cond:
+ self._stop += t
+ self._cond.notify_all()
class TestCase(unittest.TestCase):
@@ -26,6 +59,37 @@ class TestCase(unittest.TestCase):
scheduler.run()
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_enter_concurrent(self):
+ q = queue.Queue()
+ fun = q.put
+ timer = Timer()
+ scheduler = sched.scheduler(timer.time, timer.sleep)
+ scheduler.enter(1, 1, fun, (1,))
+ scheduler.enter(3, 1, fun, (3,))
+ t = threading.Thread(target=scheduler.run)
+ t.start()
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 1)
+ self.assertTrue(q.empty())
+ for x in [4, 5, 2]:
+ z = scheduler.enter(x - 1, 1, fun, (x,))
+ timer.advance(2)
+ self.assertEqual(q.get(timeout=TIMEOUT), 2)
+ self.assertEqual(q.get(timeout=TIMEOUT), 3)
+ self.assertTrue(q.empty())
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 4)
+ self.assertTrue(q.empty())
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 5)
+ self.assertTrue(q.empty())
+ timer.advance(1000)
+ t.join(timeout=TIMEOUT)
+ self.assertFalse(t.is_alive())
+ self.assertTrue(q.empty())
+ self.assertEqual(timer.time(), 5)
+
def test_priority(self):
l = []
fun = lambda x: l.append(x)
@@ -50,6 +114,39 @@ class TestCase(unittest.TestCase):
scheduler.run()
self.assertEqual(l, [0.02, 0.03, 0.04])
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_cancel_concurrent(self):
+ q = queue.Queue()
+ fun = q.put
+ timer = Timer()
+ scheduler = sched.scheduler(timer.time, timer.sleep)
+ now = timer.time()
+ event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
+ event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
+ event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
+ event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
+ event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
+ t = threading.Thread(target=scheduler.run)
+ t.start()
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 1)
+ self.assertTrue(q.empty())
+ scheduler.cancel(event2)
+ scheduler.cancel(event5)
+ timer.advance(1)
+ self.assertTrue(q.empty())
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 3)
+ self.assertTrue(q.empty())
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 4)
+ self.assertTrue(q.empty())
+ timer.advance(1000)
+ t.join(timeout=TIMEOUT)
+ self.assertFalse(t.is_alive())
+ self.assertTrue(q.empty())
+ self.assertEqual(timer.time(), 4)
+
def test_empty(self):
l = []
fun = lambda x: l.append(x)
@@ -63,15 +160,39 @@ class TestCase(unittest.TestCase):
def test_queue(self):
l = []
- events = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
- self.assertEqual(scheduler._queue, [])
- for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
- events.append(scheduler.enterabs(x, 1, fun, (x,)))
- self.assertEqual(scheduler._queue.sort(), events.sort())
+ now = time.time()
+ e5 = scheduler.enterabs(now + 0.05, 1, fun)
+ e1 = scheduler.enterabs(now + 0.01, 1, fun)
+ e2 = scheduler.enterabs(now + 0.02, 1, fun)
+ e4 = scheduler.enterabs(now + 0.04, 1, fun)
+ e3 = scheduler.enterabs(now + 0.03, 1, fun)
+ # queue property is supposed to return an order list of
+ # upcoming events
+ self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
+
+ def test_args_kwargs(self):
+ flag = []
+
+ def fun(*a, **b):
+ flag.append(None)
+ self.assertEqual(a, (1,2,3))
+ self.assertEqual(b, {"foo":1})
+
+ scheduler = sched.scheduler(time.time, time.sleep)
+ z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1})
scheduler.run()
- self.assertEqual(scheduler._queue, [])
+ self.assertEqual(flag, [None])
+
+ def test_run_non_blocking(self):
+ l = []
+ fun = lambda x: l.append(x)
+ scheduler = sched.scheduler(time.time, time.sleep)
+ for x in [10, 9, 8, 7, 6]:
+ scheduler.enter(x, 1, fun, (x,))
+ scheduler.run(blocking=False)
+ self.assertEqual(l, [])
def test_main():