summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README2
-rw-r--r--aiogreen.py236
2 files changed, 17 insertions, 221 deletions
diff --git a/README b/README
index 5997beb..59065d5 100644
--- a/README
+++ b/README
@@ -35,6 +35,8 @@ Version 0.2 (development version)
* Support eventlet with monkey-patching
* Rewrite the code handling file descriptors to ensure that the listener is
only called once per loop iteration, to respect asyncio specification.
+* Simplify the loop iteration: remove custom code to reuse instead the
+ asyncio/trollius code (_run_once)
* sock_connect() is now asynchronous
* Add a suite of automated unit tests
* Fix EventLoop.stop(): don't stop immediatly, but schedule stopping the event
diff --git a/aiogreen.py b/aiogreen.py
index d3e02ed..6ad901e 100644
--- a/aiogreen.py
+++ b/aiogreen.py
@@ -15,8 +15,7 @@ try:
import asyncio
from asyncio import selector_events
from asyncio import selectors
- from asyncio.base_events import BaseEventLoop
- from asyncio.base_events import _check_resolved_address
+ from asyncio import base_events
_FUTURE_CLASSES = (asyncio.Future,)
@@ -38,8 +37,7 @@ except ImportError:
import trollius as asyncio
from trollius import selector_events
from trollius import selectors
- from trollius.base_events import BaseEventLoop
- from trollius.base_events import _check_resolved_address
+ from trollius import base_events
if hasattr(asyncio.tasks, '_FUTURE_CLASSES'):
# Trollius 1.0.0
@@ -72,10 +70,6 @@ _WRITE = eventlet.hubs.hub.WRITE
# Eventlet 0.15 or newer?
_EVENTLET15 = hasattr(eventlet.hubs.hub.noop, 'mark_as_closed')
-# tulip >= 3.4.2 and trollius >= 1.0.2 implement an optimization
-# for cancelled timer handles
-_OPTIMIZE_CANCELLED_TIMERS = hasattr(asyncio.TimerHandle, '_scheduled')
-
# Error numbers catched by Python 3.3 BlockingIOError exception
_BLOCKING_IO_ERRNOS = set((
errno.EAGAIN,
@@ -171,95 +165,6 @@ def noop(*args, **kw):
pass
-class _Scheduler(object):
- """Schedule a call to loop._run_once().
-
- - schedule() calls loop._run_once() as soon as possible.
- If schedule_at() was called, replace this previous scheduled call.
- - schedule_at(when) calls it at the requested timestamp.
- If schedule() was called, do nothing.
- - stop() cancels the scheduled call.
-
- The scheduler is protected by an eventlet semaphore.
- """
-
- def __init__(self, loop):
- self._loop = loop
- self._greenthread = None
- self._timer = None
- self._lock = eventlet.semaphore.Semaphore()
-
- def schedule(self):
- with self._lock:
- self._schedule_unlocked()
-
- def _schedule_unlocked(self):
- if self._greenthread is not None:
- # already scheduled
- return
-
- # the greenthread will be called before the next timer,
- # cancel the timer if any
- self._unschedule_timer_unlocked()
-
- # it's safe to call spawn_n() with the lock:
- # it doesn't call _run_once() immediatly
- self._greenthread = eventlet.spawn_n(self._loop._run_once)
-
- def _unschedule_unlocked(self):
- if (self._greenthread is not None
- # If the greenthread is running, there is not need to cancel it.
- # Only cancel the greenthread if it didn't start yet.
- and not self._greenthread
- and not self._greenthread.dead):
- self._greenthread.run = noop
- self._greenthread = None
-
- def schedule_timer(self, when):
- with self._lock:
- if self._greenthread is not None:
- # already scheduled
- return
-
- delay = when - self._loop.time()
- if delay <= 0:
- self._schedule_unlocked()
- return
-
- if self._timer is not None:
- if self._timer[0] <= when:
- # the existing timer will be triggered earlier,
- # nothing to do
- return
-
- # the existing timer was scheduled later, replace it
- self._timer[1].cancel()
- self._timer = None
-
- hub = self._loop._hub
- greenthread = eventlet.greenthread.GreenThread(hub.greenlet)
- # it is safe to call schedule_call_global() with the lock:
- # it does not switch to _run_once() immediatly (it creates a timer
- # and adds it a the "next timers").
- greentimer = hub.schedule_call_global(delay,
- greenthread.switch,
- self._loop._run_once, (), {})
- self._timer = (when, greentimer)
-
- def _unschedule_timer_unlocked(self):
- if self._timer is None:
- return
-
- when, greentimer = self._timer
- self._timer = None
- greentimer.cancel()
-
- def stop(self):
- with self._lock:
- self._unschedule_unlocked()
- self._unschedule_timer_unlocked()
-
-
class _TpoolExecutor(object):
def __init__(self, loop):
import eventlet.tpool
@@ -311,7 +216,6 @@ class _Selector(object):
if self._event is not None and not self._event.ready():
# wakeup the select() method
self._event.send("ready")
- self._loop._scheduler.schedule()
def notify_read(self, fd):
self._notify(_READ, fd)
@@ -344,18 +248,22 @@ class _Selector(object):
self._event = eventlet.event.Event()
try:
- # FIXME: don't use polling
- endtime = self._loop.time() + timeout
- while self._loop.time() <= endtime:
- if self._event.ready():
- break
- eventlet.sleep(0.010)
+ if timeout is not None:
+ # FIXME: don't use polling
+ endtime = self._loop.time() + timeout
+ while self._loop.time() <= endtime:
+ if self._event.ready():
+ break
+ eventlet.sleep(0.010)
+ else:
+ # blocking call
+ self._event.wait()
return self._read_events()
finally:
self._event = None
-class EventLoop(BaseEventLoop):
+class EventLoop(base_events.BaseEventLoop):
def __init__(self):
super(EventLoop, self).__init__()
self._selector = _Selector(self)
@@ -369,13 +277,6 @@ class EventLoop(BaseEventLoop):
# Queue used by call_soon_threadsafe()
self._thread_queue = _ThreadQueue(self)
- # Event used to stop the event loop and to check if the event loop is
- # running?
- self._stop_event = None
-
- # Scheduler used to schedule a call to the loop._run_once() method
- self._scheduler = _Scheduler(self)
-
if eventlet.patcher.is_monkey_patched('thread'):
self._default_executor = _TpoolExecutor(self)
@@ -389,61 +290,11 @@ class EventLoop(BaseEventLoop):
def _call_soon_handle(self, handle):
self._ready.append(handle)
- self._scheduler.schedule()
-
- def is_running(self):
- return (self._stop_event is not None)
def _process_events(self, events):
for fd, handle in events:
self._ready.append(handle)
- def _run_once(self):
- assert self.is_running()
-
- if _OPTIMIZE_CANCELLED_TIMERS:
- # FIXME: copy optimization from asyncio to remove cancelled timers
- pass
-
- events = self._selector.select(0)
- self._process_events(events)
-
- # Handle 'later' callbacks that are ready.
- end_time = self.time() + self._clock_resolution
- while self._scheduled:
- handle = self._scheduled[0]
- if handle._when >= end_time:
- break
- handle = heapq.heappop(self._scheduled)
- if _OPTIMIZE_CANCELLED_TIMERS:
- handle._scheduled = False
- if handle._cancelled:
- continue
- self._ready.append(handle)
-
- # use a local copy because stop() clears the attribute
- stop_event = self._stop_event
-
- ntodo = len(self._ready)
- for i in range(ntodo):
- if stop_event.ready():
- # stop() has been called
- break
- handle = self._ready.popleft()
- if handle._cancelled:
- continue
- handle._run()
-
- self._scheduler.stop()
- self._reschedule()
-
- def _reschedule(self):
- if self._ready:
- self._scheduler.schedule()
- elif self._scheduled:
- handle = self._scheduled[0]
- self._scheduler.schedule_timer(handle._when)
-
def call_soon(self, callback, *args):
handle = asyncio.Handle(callback, args, self)
self._call_soon_handle(handle)
@@ -457,52 +308,19 @@ class EventLoop(BaseEventLoop):
def call_at(self, when, callback, *args):
timer = asyncio.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, timer)
- self._scheduler.schedule_timer(when)
return timer
def call_later(self, delay, callback, *args):
when = self.time() + delay
return self.call_at(when, callback, *args)
- def _timer_handle_cancelled(self, handle):
- super(EventLoop, self)._timer_handle_cancelled(handle)
- # FIXME: optimization, reschedule _run_once() if the cancelled timer
- # was the next timer
-
- def _stop(self):
- if self._stop_event is None:
- # not running
- return
- if self._stop_event.ready():
- # stop already scheduled
- return
- self._stop_event.send("stop")
-
- def stop(self):
- self.call_soon(self._stop)
-
def run_forever(self):
- if self._stop_event is not None:
- raise RuntimeError("Cannot close a running event loop")
-
# Start to thread queue in run_forever() to create a greenthread linked
# to the current greenthread
self._thread_queue.start()
- self._reschedule()
try:
- self._stop_event = eventlet.event.Event()
- # use a local copy because stop() clears the attribute
- stop_event = self._stop_event
-
- # sleep until the stop() method is called
- stop_event.wait()
+ super(EventLoop, self).run_forever()
finally:
- # First ensure that _run_once() will not be called
- self._scheduler.stop()
-
- # The event loop stopped
- self._stop_event = None
-
# Stop the greenthread of the thread queue.
# call_soon_threadsafe() can still be called, handles will be
# stored in the thread queue.
@@ -512,30 +330,6 @@ class EventLoop(BaseEventLoop):
super(EventLoop, self).close()
self._thread_queue.close()
- # FIXME: don't copy/paste asyncio code, but fix asyncio to call self.stop?
- def run_until_complete(self, future):
- # only available since tulip >= 3.4.2 and trollius >= 0.4
- if hasattr(self, '_check_closed'):
- self._check_closed()
-
- new_task = not isinstance(future, _FUTURE_CLASSES)
- future = asyncio.async(future, loop=self)
- if new_task:
- # An exception is raised if the future didn't complete, so there
- # is no need to log the "destroy pending task" message
- future._log_destroy_pending = False
-
- def stop(fut):
- self.stop()
-
- future.add_done_callback(stop)
- self.run_forever()
- future.remove_done_callback(stop)
- if not future.done():
- raise RuntimeError('Event loop stopped before Future completed.')
-
- return future.result()
-
def _add_fd(self, event_type, fd, callback, args):
fd = selectors._fileobj_to_fd(fd)
handle = asyncio.Handle(callback, args, self)
@@ -597,7 +391,7 @@ class EventLoop(BaseEventLoop):
raise ValueError("the socket must be non-blocking")
fut = asyncio.Future(loop=self)
try:
- _check_resolved_address(sock, address)
+ base_events._check_resolved_address(sock, address)
except ValueError as err:
fut.set_exception(err)
else: