diff options
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | aiogreen.py | 236 |
2 files changed, 17 insertions, 221 deletions
@@ -35,6 +35,8 @@ Version 0.2 (development version) * Support eventlet with monkey-patching * Rewrite the code handling file descriptors to ensure that the listener is only called once per loop iteration, to respect asyncio specification. +* Simplify the loop iteration: remove custom code to reuse instead the + asyncio/trollius code (_run_once) * sock_connect() is now asynchronous * Add a suite of automated unit tests * Fix EventLoop.stop(): don't stop immediatly, but schedule stopping the event diff --git a/aiogreen.py b/aiogreen.py index d3e02ed..6ad901e 100644 --- a/aiogreen.py +++ b/aiogreen.py @@ -15,8 +15,7 @@ try: import asyncio from asyncio import selector_events from asyncio import selectors - from asyncio.base_events import BaseEventLoop - from asyncio.base_events import _check_resolved_address + from asyncio import base_events _FUTURE_CLASSES = (asyncio.Future,) @@ -38,8 +37,7 @@ except ImportError: import trollius as asyncio from trollius import selector_events from trollius import selectors - from trollius.base_events import BaseEventLoop - from trollius.base_events import _check_resolved_address + from trollius import base_events if hasattr(asyncio.tasks, '_FUTURE_CLASSES'): # Trollius 1.0.0 @@ -72,10 +70,6 @@ _WRITE = eventlet.hubs.hub.WRITE # Eventlet 0.15 or newer? _EVENTLET15 = hasattr(eventlet.hubs.hub.noop, 'mark_as_closed') -# tulip >= 3.4.2 and trollius >= 1.0.2 implement an optimization -# for cancelled timer handles -_OPTIMIZE_CANCELLED_TIMERS = hasattr(asyncio.TimerHandle, '_scheduled') - # Error numbers catched by Python 3.3 BlockingIOError exception _BLOCKING_IO_ERRNOS = set(( errno.EAGAIN, @@ -171,95 +165,6 @@ def noop(*args, **kw): pass -class _Scheduler(object): - """Schedule a call to loop._run_once(). - - - schedule() calls loop._run_once() as soon as possible. - If schedule_at() was called, replace this previous scheduled call. - - schedule_at(when) calls it at the requested timestamp. - If schedule() was called, do nothing. - - stop() cancels the scheduled call. - - The scheduler is protected by an eventlet semaphore. - """ - - def __init__(self, loop): - self._loop = loop - self._greenthread = None - self._timer = None - self._lock = eventlet.semaphore.Semaphore() - - def schedule(self): - with self._lock: - self._schedule_unlocked() - - def _schedule_unlocked(self): - if self._greenthread is not None: - # already scheduled - return - - # the greenthread will be called before the next timer, - # cancel the timer if any - self._unschedule_timer_unlocked() - - # it's safe to call spawn_n() with the lock: - # it doesn't call _run_once() immediatly - self._greenthread = eventlet.spawn_n(self._loop._run_once) - - def _unschedule_unlocked(self): - if (self._greenthread is not None - # If the greenthread is running, there is not need to cancel it. - # Only cancel the greenthread if it didn't start yet. - and not self._greenthread - and not self._greenthread.dead): - self._greenthread.run = noop - self._greenthread = None - - def schedule_timer(self, when): - with self._lock: - if self._greenthread is not None: - # already scheduled - return - - delay = when - self._loop.time() - if delay <= 0: - self._schedule_unlocked() - return - - if self._timer is not None: - if self._timer[0] <= when: - # the existing timer will be triggered earlier, - # nothing to do - return - - # the existing timer was scheduled later, replace it - self._timer[1].cancel() - self._timer = None - - hub = self._loop._hub - greenthread = eventlet.greenthread.GreenThread(hub.greenlet) - # it is safe to call schedule_call_global() with the lock: - # it does not switch to _run_once() immediatly (it creates a timer - # and adds it a the "next timers"). - greentimer = hub.schedule_call_global(delay, - greenthread.switch, - self._loop._run_once, (), {}) - self._timer = (when, greentimer) - - def _unschedule_timer_unlocked(self): - if self._timer is None: - return - - when, greentimer = self._timer - self._timer = None - greentimer.cancel() - - def stop(self): - with self._lock: - self._unschedule_unlocked() - self._unschedule_timer_unlocked() - - class _TpoolExecutor(object): def __init__(self, loop): import eventlet.tpool @@ -311,7 +216,6 @@ class _Selector(object): if self._event is not None and not self._event.ready(): # wakeup the select() method self._event.send("ready") - self._loop._scheduler.schedule() def notify_read(self, fd): self._notify(_READ, fd) @@ -344,18 +248,22 @@ class _Selector(object): self._event = eventlet.event.Event() try: - # FIXME: don't use polling - endtime = self._loop.time() + timeout - while self._loop.time() <= endtime: - if self._event.ready(): - break - eventlet.sleep(0.010) + if timeout is not None: + # FIXME: don't use polling + endtime = self._loop.time() + timeout + while self._loop.time() <= endtime: + if self._event.ready(): + break + eventlet.sleep(0.010) + else: + # blocking call + self._event.wait() return self._read_events() finally: self._event = None -class EventLoop(BaseEventLoop): +class EventLoop(base_events.BaseEventLoop): def __init__(self): super(EventLoop, self).__init__() self._selector = _Selector(self) @@ -369,13 +277,6 @@ class EventLoop(BaseEventLoop): # Queue used by call_soon_threadsafe() self._thread_queue = _ThreadQueue(self) - # Event used to stop the event loop and to check if the event loop is - # running? - self._stop_event = None - - # Scheduler used to schedule a call to the loop._run_once() method - self._scheduler = _Scheduler(self) - if eventlet.patcher.is_monkey_patched('thread'): self._default_executor = _TpoolExecutor(self) @@ -389,61 +290,11 @@ class EventLoop(BaseEventLoop): def _call_soon_handle(self, handle): self._ready.append(handle) - self._scheduler.schedule() - - def is_running(self): - return (self._stop_event is not None) def _process_events(self, events): for fd, handle in events: self._ready.append(handle) - def _run_once(self): - assert self.is_running() - - if _OPTIMIZE_CANCELLED_TIMERS: - # FIXME: copy optimization from asyncio to remove cancelled timers - pass - - events = self._selector.select(0) - self._process_events(events) - - # Handle 'later' callbacks that are ready. - end_time = self.time() + self._clock_resolution - while self._scheduled: - handle = self._scheduled[0] - if handle._when >= end_time: - break - handle = heapq.heappop(self._scheduled) - if _OPTIMIZE_CANCELLED_TIMERS: - handle._scheduled = False - if handle._cancelled: - continue - self._ready.append(handle) - - # use a local copy because stop() clears the attribute - stop_event = self._stop_event - - ntodo = len(self._ready) - for i in range(ntodo): - if stop_event.ready(): - # stop() has been called - break - handle = self._ready.popleft() - if handle._cancelled: - continue - handle._run() - - self._scheduler.stop() - self._reschedule() - - def _reschedule(self): - if self._ready: - self._scheduler.schedule() - elif self._scheduled: - handle = self._scheduled[0] - self._scheduler.schedule_timer(handle._when) - def call_soon(self, callback, *args): handle = asyncio.Handle(callback, args, self) self._call_soon_handle(handle) @@ -457,52 +308,19 @@ class EventLoop(BaseEventLoop): def call_at(self, when, callback, *args): timer = asyncio.TimerHandle(when, callback, args, self) heapq.heappush(self._scheduled, timer) - self._scheduler.schedule_timer(when) return timer def call_later(self, delay, callback, *args): when = self.time() + delay return self.call_at(when, callback, *args) - def _timer_handle_cancelled(self, handle): - super(EventLoop, self)._timer_handle_cancelled(handle) - # FIXME: optimization, reschedule _run_once() if the cancelled timer - # was the next timer - - def _stop(self): - if self._stop_event is None: - # not running - return - if self._stop_event.ready(): - # stop already scheduled - return - self._stop_event.send("stop") - - def stop(self): - self.call_soon(self._stop) - def run_forever(self): - if self._stop_event is not None: - raise RuntimeError("Cannot close a running event loop") - # Start to thread queue in run_forever() to create a greenthread linked # to the current greenthread self._thread_queue.start() - self._reschedule() try: - self._stop_event = eventlet.event.Event() - # use a local copy because stop() clears the attribute - stop_event = self._stop_event - - # sleep until the stop() method is called - stop_event.wait() + super(EventLoop, self).run_forever() finally: - # First ensure that _run_once() will not be called - self._scheduler.stop() - - # The event loop stopped - self._stop_event = None - # Stop the greenthread of the thread queue. # call_soon_threadsafe() can still be called, handles will be # stored in the thread queue. @@ -512,30 +330,6 @@ class EventLoop(BaseEventLoop): super(EventLoop, self).close() self._thread_queue.close() - # FIXME: don't copy/paste asyncio code, but fix asyncio to call self.stop? - def run_until_complete(self, future): - # only available since tulip >= 3.4.2 and trollius >= 0.4 - if hasattr(self, '_check_closed'): - self._check_closed() - - new_task = not isinstance(future, _FUTURE_CLASSES) - future = asyncio.async(future, loop=self) - if new_task: - # An exception is raised if the future didn't complete, so there - # is no need to log the "destroy pending task" message - future._log_destroy_pending = False - - def stop(fut): - self.stop() - - future.add_done_callback(stop) - self.run_forever() - future.remove_done_callback(stop) - if not future.done(): - raise RuntimeError('Event loop stopped before Future completed.') - - return future.result() - def _add_fd(self, event_type, fd, callback, args): fd = selectors._fileobj_to_fd(fd) handle = asyncio.Handle(callback, args, self) @@ -597,7 +391,7 @@ class EventLoop(BaseEventLoop): raise ValueError("the socket must be non-blocking") fut = asyncio.Future(loop=self) try: - _check_resolved_address(sock, address) + base_events._check_resolved_address(sock, address) except ValueError as err: fut.set_exception(err) else: |