diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-11-19 00:06:55 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-11-19 00:06:55 +0100 |
commit | dc285d328c116e8c1521f684e54c026a4027030f (patch) | |
tree | 1d76242292b26c666362de3064fb5083413e15bc | |
parent | 51a79679f8b12f7fb91abffe7455ec1e411dd6e3 (diff) | |
download | aioeventlet-dc285d328c116e8c1521f684e54c026a4027030f.tar.gz |
rewrite _run_once() to respect asyncio contract
* call_soon() now schedules a call to _run_once() and queue the callback
in the _ready list
* call_at() now schedules a call to _run_once() later using a timer, and queue
the callback in the _scheduled heap queue
* _run_once() is interrupted as soon as stop() is called
Calling loop.stop() now ensure that all callbacks scheduled after will *not* be executed
-rw-r--r-- | README | 1 | ||||
-rw-r--r-- | aiogreen/__init__.py | 171 |
2 files changed, 130 insertions, 42 deletions
@@ -21,7 +21,6 @@ Not supported: Todo: -* call_soon(func1); stop(); call_soon(func2) must not executed func2 * Support eventlet without monkey-patching * Test with Python 2 and 3 * Test with Trollius and asyncio diff --git a/aiogreen/__init__.py b/aiogreen/__init__.py index 5106e77..4edf2bd 100644 --- a/aiogreen/__init__.py +++ b/aiogreen/__init__.py @@ -7,6 +7,7 @@ from trollius.base_events import BaseEventLoop import errno import eventlet.greenio import eventlet.hubs.hub +import heapq import socket import sys import trollius @@ -77,19 +78,6 @@ class ThreadQueue: self._greenthread.wait() -class TimerHandle(trollius.Handle): - def __init__(self, callback, args, loop): - super(TimerHandle, self).__init__(callback, args, loop) - self._timer = None - - def _run(self): - super(TimerHandle, self)._run() - - def cancel(self): - super(TimerHandle, self).cancel() - self._timer.cancel() - - class SocketTransport(selector_events._SelectorSocketTransport): def __repr__(self): # override repr because _SelectorSocketTransport depends on @@ -103,10 +91,12 @@ class EventLoop(BaseEventLoop): self._pool = eventlet.GreenPool() # Queue used by call_soon_threadsafe() self._queue = ThreadQueue(self) - self._run = None + self._stop_event = None if self.get_debug(): hub = hubs.get_hub() hub.debug_blocking = True + self._run_once_scheduled = False + self._run_once_timer = None def time(self): # FIXME: is it safe to store the hub in an attribute of the event loop? @@ -120,7 +110,111 @@ class EventLoop(BaseEventLoop): handle._run() def _call_soon_handle(self, handle): - self._pool.spawn(self._call, handle) + self._ready.append(handle) + self._schedule() + + def is_running(self): + return (self._stop_event is not None) + + def _run_once_soon(self): + if not self._run_once_scheduled: + # _unschedule() was called, the event loop is no more running + assert self._stop_event is None + return + + self._unschedule_timer() + self._run_once() + + def _run_once(self): + run = self._stop_event + # if run is None, the event loop is not running + assert run is not None + + # FIXME: copy optimization from trollius to remove cancelled timers + + # Handle 'later' callbacks that are ready. + end_time = self.time() + self._clock_resolution + while self._scheduled: + handle = self._scheduled[0] + if handle._when >= end_time: + break + handle = heapq.heappop(self._scheduled) + handle._scheduled = False + if handle._cancelled: + continue + self._ready.append(handle) + + ntodo = len(self._ready) + for i in range(ntodo): + if run.ready(): + break + handle = self._ready.popleft() + if handle._cancelled: + continue + # FIXME: what happens if this method is interrupted, + # and _schedule() is called? + handle._run() + + self._run_once_scheduled = False + self._run_once_timer = None + + self._reschedule() + + def _unschedule_timer(self): + if not self._run_once_timer: + return + when, timer = self._run_once_timer + self._run_once_timer = None + timer.cancel() + + def _schedule(self): + """Schedule a call to _run_once().""" + if self._run_once_scheduled: + return + # FIXME: is it thread-safe and greenthread-safe? + self._run_once_scheduled = True + self._unschedule_timer() + self._pool.spawn(self._run_once_soon) + + # FIXME: unschedule the greenthread if the loop is interrupted or something like that? + + def _unschedule(self): + self._run_once_scheduled = False + # FIXME: cancel greenthread scheduled by _schedule() + + def _schedule_timer(self, when): + # FIXME: is this function greenthread-safe? + if self._run_once_scheduled: + return + + delay = when - self.time() + if delay <= 0: + self._schedule() + return + + if self._run_once_timer is not None: + if self._run_once_timer[0] <= when: + # a timer will ring earlier + return + # a timer was scheduled later + self._run_once_timer[1].cancel() + self._run_once_timer = None + + # inline spawn_after() to get the timer object, to be able + # to cancel directly the timer + hub = hubs.get_hub() + greenthread = eventlet.greenthread.GreenThread(hub.greenlet) + print("schedule in %s seconds" % delay, when) + timer = hub.schedule_call_global(delay, greenthread.switch, + self._run_once, (), {}) + self._run_once_timer = (when, timer) + + def _reschedule(self): + if self._ready: + self._schedule() + elif self._scheduled: + handle = self._scheduled[0] + self._schedule_timer(handle._when) def call_soon(self, callback, *args): handle = trollius.Handle(callback, args, self) @@ -132,50 +226,45 @@ class EventLoop(BaseEventLoop): self._queue.put(handle) return handle - def call_later(self, delay, callback, *args): - if 0: - handle = TimerHandle(callback, args, self) - - # inline spawn_after() to get the timer object, to be able - # to cancel directly the timer - hub = hubs.get_hub() - greenthread = eventlet.greenthread.GreenThread(hub.greenlet) - timer = hub.schedule_call_global(delay, greenthread.switch, - handle._run) + def call_at(self, when, callback, *args): + timer = trollius.TimerHandle(when, callback, args, self) + heapq.heappush(self._scheduled, timer) + self._schedule_timer(when) + return timer - handle._timer = timer - return handle - else: - handle = trollius.Handle(callback, args, self) - greenthread = eventlet.spawn_after(delay, self._call, handle) - return handle + def call_later(self, delay, callback, *args): + when = self.time() + delay + return self.call_at(when, callback, *args) - def call_at(self, when, callback, *args): - delay = when - self.time() - return self.call_later(delay, callback, *args) + def _timer_handle_cancelled(self, handle): + super(EventLoop, self)._timer_handle_cancelled(handle) + # FIXME: reschedule the _run_once() timer # FIXME: run_in_executor(): use eventlet.tpool as the default executor? # It avoids the dependency to concurrent.futures, but later it would be # better to use concurrent.futures. So... What is the best? def stop(self): - if self._run is None: + if self._stop_event is None: # not running or stop already scheduled return - self._run.send("stop") - self._run = None + self._stop_event.send("stop") + self._stop_event = None def run_forever(self): - if self._run is not None: + if self._stop_event is not None: raise RuntimeError("reentrant call to run_forever()") + self._reschedule() try: - self._run = eventlet.event.Event() + self._stop_event = eventlet.event.Event() # use a local copy because stop() clears the attribute - run = self._run + run = self._stop_event run.wait() finally: - self._run = None + self._stop_event = None + self._unschedule() + self._unschedule_timer() def close(self): super(EventLoop, self).close() |