summaryrefslogtreecommitdiff
path: root/Lib/sched.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/sched.py')
-rw-r--r--Lib/sched.py90
1 files changed, 59 insertions, 31 deletions
diff --git a/Lib/sched.py b/Lib/sched.py
index a119892c3f..b9a7ad1afa 100644
--- a/Lib/sched.py
+++ b/Lib/sched.py
@@ -13,12 +13,12 @@ also be used to integrate scheduling with STDWIN events; the delay
function is allowed to modify the queue. Time can be expressed as
integers or floating point numbers, as long as it is consistent.
-Events are specified by tuples (time, priority, action, argument).
+Events are specified by tuples (time, priority, action, argument, kwargs).
As in UNIX, lower priority numbers mean higher priority; in this
way the queue can be maintained as a priority queue. Execution of the
event means calling the action function, passing it the argument
sequence in "argument" (remember that in Python, multiple function
-arguments are be packed in a sequence).
+arguments are be packed in a sequence) and keyword parameters in "kwargs".
The action function may be an instance method so it
has another way to reference private data (besides global variables).
"""
@@ -28,12 +28,21 @@ has another way to reference private data (besides global variables).
# XXX instead of having to define a module or class just to hold
# XXX the global state of your particular time and delay functions.
+import time
import heapq
from collections import namedtuple
+try:
+ import threading
+except ImportError:
+ import dummy_threading as threading
+try:
+ from time import monotonic as _time
+except ImportError:
+ from time import time as _time
__all__ = ["scheduler"]
-class Event(namedtuple('Event', 'time, priority, action, argument')):
+class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')):
def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority)
def __ne__(s, o): return (s.time, s.priority) != (o.time, o.priority)
def __lt__(s, o): return (s.time, s.priority) < (o.time, o.priority)
@@ -41,33 +50,41 @@ class Event(namedtuple('Event', 'time, priority, action, argument')):
def __gt__(s, o): return (s.time, s.priority) > (o.time, o.priority)
def __ge__(s, o): return (s.time, s.priority) >= (o.time, o.priority)
+_sentinel = object()
+
class scheduler:
- def __init__(self, timefunc, delayfunc):
+
+ def __init__(self, timefunc=_time, delayfunc=time.sleep):
"""Initialize a new instance, passing the time and delay
functions"""
self._queue = []
+ self._lock = threading.RLock()
self.timefunc = timefunc
self.delayfunc = delayfunc
- def enterabs(self, time, priority, action, argument):
+ def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
- event = Event(time, priority, action, argument)
- heapq.heappush(self._queue, event)
- return event # The ID
-
- def enter(self, delay, priority, action, argument):
+ if kwargs is _sentinel:
+ kwargs = {}
+ with self._lock:
+ event = Event(time, priority, action, argument, kwargs)
+ heapq.heappush(self._queue, event)
+ return event # The ID
+
+ def enter(self, delay, priority, action, argument=(), kwargs=_sentinel):
"""A variant that specifies the time as a relative time.
This is actually the more commonly used interface.
"""
- time = self.timefunc() + delay
- return self.enterabs(time, priority, action, argument)
+ with self._lock:
+ time = self.timefunc() + delay
+ return self.enterabs(time, priority, action, argument, kwargs)
def cancel(self, event):
"""Remove an event from the queue.
@@ -76,15 +93,20 @@ class scheduler:
If the event is not in the queue, this raises ValueError.
"""
- self._queue.remove(event)
- heapq.heapify(self._queue)
+ with self._lock:
+ self._queue.remove(event)
+ heapq.heapify(self._queue)
def empty(self):
"""Check whether the queue is empty."""
- return not self._queue
+ with self._lock:
+ return not self._queue
- def run(self):
+ def run(self, blocking=True):
"""Execute events until the queue is empty.
+ If blocking is False executes the scheduled events due to
+ expire soonest (if any) and then return the deadline of the
+ next scheduled call in the scheduler.
When there is a positive delay until the first event, the
delay function is called and the event is left in the queue;
@@ -106,35 +128,41 @@ class scheduler:
"""
# localize variable access to minimize overhead
# and to improve thread safety
+ lock = self._lock
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
- while q:
- time, priority, action, argument = checked_event = q[0]
- now = timefunc()
- if now < time:
+ while True:
+ with lock:
+ if not q:
+ break
+ time, priority, action, argument, kwargs = q[0]
+ now = timefunc()
+ if time > now:
+ delay = True
+ else:
+ delay = False
+ pop(q)
+ if delay:
+ if not blocking:
+ return time - now
delayfunc(time - now)
else:
- event = pop(q)
- # Verify that the event was not removed or altered
- # by another thread after we last looked at q[0].
- if event is checked_event:
- action(*argument)
- delayfunc(0) # Let other threads run
- else:
- heapq.heappush(q, event)
+ action(*argument, **kwargs)
+ delayfunc(0) # Let other threads run
@property
def queue(self):
"""An ordered list of upcoming events.
Events are named tuples with fields for:
- time, priority, action, arguments
+ time, priority, action, arguments, kwargs
"""
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
- events = self._queue[:]
- return map(heapq.heappop, [events]*len(events))
+ with self._lock:
+ events = self._queue[:]
+ return list(map(heapq.heappop, [events]*len(events)))