diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-11-24 22:06:38 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-11-24 22:06:38 +0100 |
commit | e60de6aa7507c39651297e5f0e9ea8ff75b564d9 (patch) | |
tree | 56f7344cc552fe9f792e2a7400bb44fcd2240181 | |
parent | 93df5750c17894169787e024f9e986a6ac522bc3 (diff) | |
download | aioeventlet-e60de6aa7507c39651297e5f0e9ea8ff75b564d9.tar.gz |
Implement _GeventSelector
-rw-r--r-- | aiogreen.py | 268 |
1 files changed, 173 insertions, 95 deletions
diff --git a/aiogreen.py b/aiogreen.py index c49dd2f..511e010 100644 --- a/aiogreen.py +++ b/aiogreen.py @@ -1,5 +1,6 @@ import eventlet.hubs.hub -import gevent +import gevent.core +import gevent.event import greenlet import logging import signal @@ -83,12 +84,11 @@ class _TpoolExecutor(object): class _Selector(asyncio.selectors._BaseSelectorImpl): - def __init__(self, loop, hub): + def __init__(self, loop): super(_Selector, self).__init__() # fd => events self._notified = {} self._loop = loop - self._hub = hub # eventlet.event.Event() used by FD notifiers to wake up select() self._event = None @@ -98,45 +98,6 @@ class _Selector(asyncio.selectors._BaseSelectorImpl): self.unregister(key.fd) super(_Selector, self).close() - def _add(self, fd, event): - if event == _EVENT_READ: - event_type = _HUB_READ - func = self._notify_read - else: - event_type = _HUB_WRITE - func = self._notify_write - - if _EVENTLET15: - self._hub.add(event_type, fd, func, self._throwback, None) - else: - self._hub.add(event_type, fd, func) - - def register(self, fileobj, events, data=None): - key = super(_Selector, self).register(fileobj, events, data) - if events & _EVENT_READ: - self._add(key.fd, _EVENT_READ) - if events & _EVENT_WRITE: - self._add(key.fd, _EVENT_WRITE) - return key - - def _remove(self, fd, event): - if event == _EVENT_READ: - event_type = _HUB_READ - else: - event_type = _HUB_WRITE - try: - listener = self._hub.listeners[event_type][fd] - except KeyError: - pass - else: - self._hub.remove(listener) - - def unregister(self, fileobj): - key = super(_Selector, self).unregister(fileobj) - self._remove(key.fd, _EVENT_READ) - self._remove(key.fd, _EVENT_WRITE) - return key - def _notify(self, fd, event): if fd in self._notified: self._notified[fd] |= event @@ -165,6 +126,46 @@ class _Selector(asyncio.selectors._BaseSelectorImpl): ready.append((key, events & key.events)) return ready + +class _EventletSelector(_Selector): + def __init__(self, hub, loop): + super(_EventletSelector, self).__init__(loop=loop) + self._hub = hub + + def register(self, fileobj, events, data=None): + key = super(_Selector, self).register(fileobj, events, data) + for event in (_EVENT_READ, _EVENT_WRITE): + if not(events & event): + continue + + if event == _EVENT_READ: + event_type = _HUB_READ + func = self._notify_read + else: + event_type = _HUB_WRITE + func = self._notify_write + + if _EVENTLET15: + self._hub.add(event_type, key.fd, func, self._throwback, None) + else: + self._hub.add(event_type, key.fd, func) + return key + + def unregister(self, fileobj): + key = super(_Selector, self).unregister(fileobj) + for event in (_EVENT_READ, _EVENT_WRITE): + if event == _EVENT_READ: + event_type = _HUB_READ + else: + event_type = _HUB_WRITE + try: + listener = self._hub.listeners[event_type][key.fd] + except KeyError: + pass + else: + self._hub.remove(listener) + return key + def select(self, timeout): events = self._read_events() if events: @@ -190,21 +191,83 @@ class _Selector(asyncio.selectors._BaseSelectorImpl): self._event = None -class _EventLoop(asyncio.SelectorEventLoop): - def __init__(self, hub, selector): - self._greenthread = None +class _GeventSelector(_Selector): + def __init__(self, loop): + super(_GeventSelector, self).__init__(loop=loop) + self._gevent_events = {} - # Store a reference to the hub to ensure - # that we always use the same hub - self._hub = hub + def _notify(self, fd, event): + if fd in self._notified: + self._notified[fd] |= event + else: + self._notified[fd] = event + if self._event is not None: + # wakeup the select() method + self._event.set() - super(_EventLoop, self).__init__(selector=selector) + # FIXME: what is x? + def _notify_read(self, event, x): + self._notify(event.fd, _EVENT_READ) - # Force a call to set_debug() to set hub.debug_blocking - self.set_debug(self.get_debug()) + def _notify_write(self, event, x): + self._notify(event.fd, _EVENT_WRITE) - if eventlet.patcher.is_monkey_patched('thread'): - self._default_executor = _TpoolExecutor(self) + def register(self, fileobj, events, data=None): + key = super(_GeventSelector, self).register(fileobj, events, data) + event_dict = {} + for event in (_EVENT_READ, _EVENT_WRITE): + if not(events & event): + continue + + if event == _EVENT_READ: + gevent_event = gevent.core.read_event(key.fd, self._notify_read) + else: + gevent_event = gevent.core.write_event(key.fd, self._notify_write) + event_dict[event] = gevent_event + + self._gevent_events[key.fd] = event_dict + return key + + def unregister(self, fileobj): + key = super(_GeventSelector, self).unregister(fileobj) + event_dict = self._gevent_events[key.fd].pop(key.fd, {}) + for event in (_EVENT_READ, _EVENT_WRITE): + try: + gevent_event = event_dict[event] + except KeyError: + continue + gevent_event.cancel() + return key + + def select(self, timeout): + events = self._read_events() + if events: + return events + + self._event = gevent.event.Event() + try: + if timeout is not None: + def timeout_cb(event): + if event.ready(): + return + event.set() + + gevent.spawn_later(timeout, timeout_cb, self._event) + + self._event.wait() + # FIXME: cancel the timeout_cb if wait() returns 'ready'? + else: + # blocking call + self._event.wait() + return self._read_events() + finally: + self._event = None + + +class _EventLoop(asyncio.SelectorEventLoop): + def __init__(self, selector): + self._greenthread = None + super(_EventLoop, self).__init__(selector=selector) def call_soon(self, callback, *args): handle = super(_EventLoop, self).call_soon(callback, *args) @@ -222,39 +285,6 @@ class _EventLoop(asyncio.SelectorEventLoop): self._write_to_self() return handle - def set_debug(self, debug): - super(_EventLoop, self).set_debug(debug) - - self._hub.debug_exceptions = debug - - # Detect blocking eventlet functions. The feature is implemented with - # signal.alarm() which is is not available on Windows. Signal handlers - # can only be set from the main loop. So detecting blocking functions - # cannot be used on Windows nor from a thread different than the main - # thread. - self._hub.debug_blocking = ( - debug - and (sys.platform != 'win32') - and isinstance(threading.current_thread(), threading._MainThread)) - - if (self._hub.debug_blocking - and hasattr(self, 'slow_callback_duration')): - self._hub.debug_blocking_resolution = self.slow_callback_duration - - def run_forever(self): - self._greenthread = eventlet.getcurrent() - try: - super(_EventLoop, self).run_forever() - finally: - if self._hub.debug_blocking: - # eventlet event loop is still running: cancel the current - # detection of blocking tasks - signal.alarm(0) - self._greenthread = None - - def time(self): - return self._hub.clock() - # FIXME: mark as abstract def link_future(self, future): pass @@ -264,10 +294,22 @@ class _EventLoop(asyncio.SelectorEventLoop): class EventletEventLoop(_EventLoop): def __init__(self): - hub = eventlet.hubs.get_hub() - selector = _Selector(self, hub) + # Store a reference to the hub to ensure + # that we always use the same hub + self._hub = eventlet.hubs.get_hub() + + selector = _EventletSelector(self._hub, self) super(EventletEventLoop, self).__init__(hub, selector) + # Force a call to set_debug() to set hub.debug_blocking + self.set_debug(self.get_debug()) + + if eventlet.patcher.is_monkey_patched('thread'): + self._default_executor = _TpoolExecutor(self) + + def time(self): + return self._hub.clock() + def link_future(self, future): """Wait for a future, a task, or a coroutine object from a greenthread. @@ -343,12 +385,42 @@ class EventletEventLoop(_EventLoop): gt.run = wrap_func return fut + def run_forever(self): + self._greenthread = eventlet.getcurrent() + try: + super(_EventLoop, self).run_forever() + finally: + if self._hub.debug_blocking: + # eventlet event loop is still running: cancel the current + # detection of blocking tasks + signal.alarm(0) + self._greenthread = None + + def set_debug(self, debug): + super(_EventLoop, self).set_debug(debug) + + self._hub.debug_exceptions = debug + # Detect blocking eventlet functions. The feature is implemented with + # signal.alarm() which is is not available on Windows. Signal handlers + # can only be set from the main loop. So detecting blocking functions + # cannot be used on Windows nor from a thread different than the main + # thread. + self._hub.debug_blocking = ( + debug + and (sys.platform != 'win32') + and isinstance(threading.current_thread(), threading._MainThread)) + + if (self._hub.debug_blocking + and hasattr(self, 'slow_callback_duration')): + self._hub.debug_blocking_resolution = self.slow_callback_duration + + +# FIXME: support gevent 1.0 class GeventEventLoop(_EventLoop): def __init__(self): - hub = eventlet.hubs.get_hub() - selector = _Selector(self, hub) - super(GeventEventLoop, self).__init__(hub, selector) + selector = _GeventSelector(self) + super(GeventEventLoop, self).__init__(selector=selector) def link_future(self, future): """Wait for a future, a task, or a coroutine object from a greenthread. @@ -399,16 +471,15 @@ class GeventEventLoop(_EventLoop): raise RuntimeError("wrap_greenthread: the greenthread already finished") if isinstance(gt, gevent.Greenlet): - orig_main = gt.run + orig_func = gt._run def wrap_func(*args, **kw): try: - orig_main(*args, **kw) + result = orig_func(*args, **kw) except Exception as exc: fut.set_exception(exc) else: - result = gt.wait() fut.set_result(result) - gt.run = wrap_func + gt._run = wrap_func else: try: orig_func = gt.run @@ -425,6 +496,13 @@ class GeventEventLoop(_EventLoop): gt.run = wrap_func return fut + def run_forever(self): + self._greenthread = gevent.getcurrent() + try: + super(_EventLoop, self).run_forever() + finally: + self._greenthread = None + class EventletEventLoopPolicy(asyncio.DefaultEventLoopPolicy): _loop_factory = EventletEventLoop |