summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-11-19 00:06:55 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-11-19 00:06:55 +0100
commitdc285d328c116e8c1521f684e54c026a4027030f (patch)
tree1d76242292b26c666362de3064fb5083413e15bc
parent51a79679f8b12f7fb91abffe7455ec1e411dd6e3 (diff)
downloadaioeventlet-dc285d328c116e8c1521f684e54c026a4027030f.tar.gz
rewrite _run_once() to respect asyncio contract
* call_soon() now schedules a call to _run_once() and queue the callback in the _ready list * call_at() now schedules a call to _run_once() later using a timer, and queue the callback in the _scheduled heap queue * _run_once() is interrupted as soon as stop() is called Calling loop.stop() now ensure that all callbacks scheduled after will *not* be executed
-rw-r--r--README1
-rw-r--r--aiogreen/__init__.py171
2 files changed, 130 insertions, 42 deletions
diff --git a/README b/README
index 53aa001..c702e8a 100644
--- a/README
+++ b/README
@@ -21,7 +21,6 @@ Not supported:
Todo:
-* call_soon(func1); stop(); call_soon(func2) must not executed func2
* Support eventlet without monkey-patching
* Test with Python 2 and 3
* Test with Trollius and asyncio
diff --git a/aiogreen/__init__.py b/aiogreen/__init__.py
index 5106e77..4edf2bd 100644
--- a/aiogreen/__init__.py
+++ b/aiogreen/__init__.py
@@ -7,6 +7,7 @@ from trollius.base_events import BaseEventLoop
import errno
import eventlet.greenio
import eventlet.hubs.hub
+import heapq
import socket
import sys
import trollius
@@ -77,19 +78,6 @@ class ThreadQueue:
self._greenthread.wait()
-class TimerHandle(trollius.Handle):
- def __init__(self, callback, args, loop):
- super(TimerHandle, self).__init__(callback, args, loop)
- self._timer = None
-
- def _run(self):
- super(TimerHandle, self)._run()
-
- def cancel(self):
- super(TimerHandle, self).cancel()
- self._timer.cancel()
-
-
class SocketTransport(selector_events._SelectorSocketTransport):
def __repr__(self):
# override repr because _SelectorSocketTransport depends on
@@ -103,10 +91,12 @@ class EventLoop(BaseEventLoop):
self._pool = eventlet.GreenPool()
# Queue used by call_soon_threadsafe()
self._queue = ThreadQueue(self)
- self._run = None
+ self._stop_event = None
if self.get_debug():
hub = hubs.get_hub()
hub.debug_blocking = True
+ self._run_once_scheduled = False
+ self._run_once_timer = None
def time(self):
# FIXME: is it safe to store the hub in an attribute of the event loop?
@@ -120,7 +110,111 @@ class EventLoop(BaseEventLoop):
handle._run()
def _call_soon_handle(self, handle):
- self._pool.spawn(self._call, handle)
+ self._ready.append(handle)
+ self._schedule()
+
+ def is_running(self):
+ return (self._stop_event is not None)
+
+ def _run_once_soon(self):
+ if not self._run_once_scheduled:
+ # _unschedule() was called, the event loop is no more running
+ assert self._stop_event is None
+ return
+
+ self._unschedule_timer()
+ self._run_once()
+
+ def _run_once(self):
+ run = self._stop_event
+ # if run is None, the event loop is not running
+ assert run is not None
+
+ # FIXME: copy optimization from trollius to remove cancelled timers
+
+ # Handle 'later' callbacks that are ready.
+ end_time = self.time() + self._clock_resolution
+ while self._scheduled:
+ handle = self._scheduled[0]
+ if handle._when >= end_time:
+ break
+ handle = heapq.heappop(self._scheduled)
+ handle._scheduled = False
+ if handle._cancelled:
+ continue
+ self._ready.append(handle)
+
+ ntodo = len(self._ready)
+ for i in range(ntodo):
+ if run.ready():
+ break
+ handle = self._ready.popleft()
+ if handle._cancelled:
+ continue
+ # FIXME: what happens if this method is interrupted,
+ # and _schedule() is called?
+ handle._run()
+
+ self._run_once_scheduled = False
+ self._run_once_timer = None
+
+ self._reschedule()
+
+ def _unschedule_timer(self):
+ if not self._run_once_timer:
+ return
+ when, timer = self._run_once_timer
+ self._run_once_timer = None
+ timer.cancel()
+
+ def _schedule(self):
+ """Schedule a call to _run_once()."""
+ if self._run_once_scheduled:
+ return
+ # FIXME: is it thread-safe and greenthread-safe?
+ self._run_once_scheduled = True
+ self._unschedule_timer()
+ self._pool.spawn(self._run_once_soon)
+
+ # FIXME: unschedule the greenthread if the loop is interrupted or something like that?
+
+ def _unschedule(self):
+ self._run_once_scheduled = False
+ # FIXME: cancel greenthread scheduled by _schedule()
+
+ def _schedule_timer(self, when):
+ # FIXME: is this function greenthread-safe?
+ if self._run_once_scheduled:
+ return
+
+ delay = when - self.time()
+ if delay <= 0:
+ self._schedule()
+ return
+
+ if self._run_once_timer is not None:
+ if self._run_once_timer[0] <= when:
+ # a timer will ring earlier
+ return
+ # a timer was scheduled later
+ self._run_once_timer[1].cancel()
+ self._run_once_timer = None
+
+ # inline spawn_after() to get the timer object, to be able
+ # to cancel directly the timer
+ hub = hubs.get_hub()
+ greenthread = eventlet.greenthread.GreenThread(hub.greenlet)
+ print("schedule in %s seconds" % delay, when)
+ timer = hub.schedule_call_global(delay, greenthread.switch,
+ self._run_once, (), {})
+ self._run_once_timer = (when, timer)
+
+ def _reschedule(self):
+ if self._ready:
+ self._schedule()
+ elif self._scheduled:
+ handle = self._scheduled[0]
+ self._schedule_timer(handle._when)
def call_soon(self, callback, *args):
handle = trollius.Handle(callback, args, self)
@@ -132,50 +226,45 @@ class EventLoop(BaseEventLoop):
self._queue.put(handle)
return handle
- def call_later(self, delay, callback, *args):
- if 0:
- handle = TimerHandle(callback, args, self)
-
- # inline spawn_after() to get the timer object, to be able
- # to cancel directly the timer
- hub = hubs.get_hub()
- greenthread = eventlet.greenthread.GreenThread(hub.greenlet)
- timer = hub.schedule_call_global(delay, greenthread.switch,
- handle._run)
+ def call_at(self, when, callback, *args):
+ timer = trollius.TimerHandle(when, callback, args, self)
+ heapq.heappush(self._scheduled, timer)
+ self._schedule_timer(when)
+ return timer
- handle._timer = timer
- return handle
- else:
- handle = trollius.Handle(callback, args, self)
- greenthread = eventlet.spawn_after(delay, self._call, handle)
- return handle
+ def call_later(self, delay, callback, *args):
+ when = self.time() + delay
+ return self.call_at(when, callback, *args)
- def call_at(self, when, callback, *args):
- delay = when - self.time()
- return self.call_later(delay, callback, *args)
+ def _timer_handle_cancelled(self, handle):
+ super(EventLoop, self)._timer_handle_cancelled(handle)
+ # FIXME: reschedule the _run_once() timer
# FIXME: run_in_executor(): use eventlet.tpool as the default executor?
# It avoids the dependency to concurrent.futures, but later it would be
# better to use concurrent.futures. So... What is the best?
def stop(self):
- if self._run is None:
+ if self._stop_event is None:
# not running or stop already scheduled
return
- self._run.send("stop")
- self._run = None
+ self._stop_event.send("stop")
+ self._stop_event = None
def run_forever(self):
- if self._run is not None:
+ if self._stop_event is not None:
raise RuntimeError("reentrant call to run_forever()")
+ self._reschedule()
try:
- self._run = eventlet.event.Event()
+ self._stop_event = eventlet.event.Event()
# use a local copy because stop() clears the attribute
- run = self._run
+ run = self._stop_event
run.wait()
finally:
- self._run = None
+ self._stop_event = None
+ self._unschedule()
+ self._unschedule_timer()
def close(self):
super(EventLoop, self).close()