summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-11-24 22:06:38 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-11-24 22:06:38 +0100
commite60de6aa7507c39651297e5f0e9ea8ff75b564d9 (patch)
tree56f7344cc552fe9f792e2a7400bb44fcd2240181
parent93df5750c17894169787e024f9e986a6ac522bc3 (diff)
downloadaioeventlet-e60de6aa7507c39651297e5f0e9ea8ff75b564d9.tar.gz
Implement _GeventSelector
-rw-r--r--aiogreen.py268
1 files changed, 173 insertions, 95 deletions
diff --git a/aiogreen.py b/aiogreen.py
index c49dd2f..511e010 100644
--- a/aiogreen.py
+++ b/aiogreen.py
@@ -1,5 +1,6 @@
import eventlet.hubs.hub
-import gevent
+import gevent.core
+import gevent.event
import greenlet
import logging
import signal
@@ -83,12 +84,11 @@ class _TpoolExecutor(object):
class _Selector(asyncio.selectors._BaseSelectorImpl):
- def __init__(self, loop, hub):
+ def __init__(self, loop):
super(_Selector, self).__init__()
# fd => events
self._notified = {}
self._loop = loop
- self._hub = hub
# eventlet.event.Event() used by FD notifiers to wake up select()
self._event = None
@@ -98,45 +98,6 @@ class _Selector(asyncio.selectors._BaseSelectorImpl):
self.unregister(key.fd)
super(_Selector, self).close()
- def _add(self, fd, event):
- if event == _EVENT_READ:
- event_type = _HUB_READ
- func = self._notify_read
- else:
- event_type = _HUB_WRITE
- func = self._notify_write
-
- if _EVENTLET15:
- self._hub.add(event_type, fd, func, self._throwback, None)
- else:
- self._hub.add(event_type, fd, func)
-
- def register(self, fileobj, events, data=None):
- key = super(_Selector, self).register(fileobj, events, data)
- if events & _EVENT_READ:
- self._add(key.fd, _EVENT_READ)
- if events & _EVENT_WRITE:
- self._add(key.fd, _EVENT_WRITE)
- return key
-
- def _remove(self, fd, event):
- if event == _EVENT_READ:
- event_type = _HUB_READ
- else:
- event_type = _HUB_WRITE
- try:
- listener = self._hub.listeners[event_type][fd]
- except KeyError:
- pass
- else:
- self._hub.remove(listener)
-
- def unregister(self, fileobj):
- key = super(_Selector, self).unregister(fileobj)
- self._remove(key.fd, _EVENT_READ)
- self._remove(key.fd, _EVENT_WRITE)
- return key
-
def _notify(self, fd, event):
if fd in self._notified:
self._notified[fd] |= event
@@ -165,6 +126,46 @@ class _Selector(asyncio.selectors._BaseSelectorImpl):
ready.append((key, events & key.events))
return ready
+
+class _EventletSelector(_Selector):
+ def __init__(self, hub, loop):
+ super(_EventletSelector, self).__init__(loop=loop)
+ self._hub = hub
+
+ def register(self, fileobj, events, data=None):
+ key = super(_Selector, self).register(fileobj, events, data)
+ for event in (_EVENT_READ, _EVENT_WRITE):
+ if not(events & event):
+ continue
+
+ if event == _EVENT_READ:
+ event_type = _HUB_READ
+ func = self._notify_read
+ else:
+ event_type = _HUB_WRITE
+ func = self._notify_write
+
+ if _EVENTLET15:
+ self._hub.add(event_type, key.fd, func, self._throwback, None)
+ else:
+ self._hub.add(event_type, key.fd, func)
+ return key
+
+ def unregister(self, fileobj):
+ key = super(_Selector, self).unregister(fileobj)
+ for event in (_EVENT_READ, _EVENT_WRITE):
+ if event == _EVENT_READ:
+ event_type = _HUB_READ
+ else:
+ event_type = _HUB_WRITE
+ try:
+ listener = self._hub.listeners[event_type][key.fd]
+ except KeyError:
+ pass
+ else:
+ self._hub.remove(listener)
+ return key
+
def select(self, timeout):
events = self._read_events()
if events:
@@ -190,21 +191,83 @@ class _Selector(asyncio.selectors._BaseSelectorImpl):
self._event = None
-class _EventLoop(asyncio.SelectorEventLoop):
- def __init__(self, hub, selector):
- self._greenthread = None
+class _GeventSelector(_Selector):
+ def __init__(self, loop):
+ super(_GeventSelector, self).__init__(loop=loop)
+ self._gevent_events = {}
- # Store a reference to the hub to ensure
- # that we always use the same hub
- self._hub = hub
+ def _notify(self, fd, event):
+ if fd in self._notified:
+ self._notified[fd] |= event
+ else:
+ self._notified[fd] = event
+ if self._event is not None:
+ # wakeup the select() method
+ self._event.set()
- super(_EventLoop, self).__init__(selector=selector)
+ # FIXME: what is x?
+ def _notify_read(self, event, x):
+ self._notify(event.fd, _EVENT_READ)
- # Force a call to set_debug() to set hub.debug_blocking
- self.set_debug(self.get_debug())
+ def _notify_write(self, event, x):
+ self._notify(event.fd, _EVENT_WRITE)
- if eventlet.patcher.is_monkey_patched('thread'):
- self._default_executor = _TpoolExecutor(self)
+ def register(self, fileobj, events, data=None):
+ key = super(_GeventSelector, self).register(fileobj, events, data)
+ event_dict = {}
+ for event in (_EVENT_READ, _EVENT_WRITE):
+ if not(events & event):
+ continue
+
+ if event == _EVENT_READ:
+ gevent_event = gevent.core.read_event(key.fd, self._notify_read)
+ else:
+ gevent_event = gevent.core.write_event(key.fd, self._notify_write)
+ event_dict[event] = gevent_event
+
+ self._gevent_events[key.fd] = event_dict
+ return key
+
+ def unregister(self, fileobj):
+ key = super(_GeventSelector, self).unregister(fileobj)
+ event_dict = self._gevent_events[key.fd].pop(key.fd, {})
+ for event in (_EVENT_READ, _EVENT_WRITE):
+ try:
+ gevent_event = event_dict[event]
+ except KeyError:
+ continue
+ gevent_event.cancel()
+ return key
+
+ def select(self, timeout):
+ events = self._read_events()
+ if events:
+ return events
+
+ self._event = gevent.event.Event()
+ try:
+ if timeout is not None:
+ def timeout_cb(event):
+ if event.ready():
+ return
+ event.set()
+
+ gevent.spawn_later(timeout, timeout_cb, self._event)
+
+ self._event.wait()
+ # FIXME: cancel the timeout_cb if wait() returns 'ready'?
+ else:
+ # blocking call
+ self._event.wait()
+ return self._read_events()
+ finally:
+ self._event = None
+
+
+class _EventLoop(asyncio.SelectorEventLoop):
+ def __init__(self, selector):
+ self._greenthread = None
+ super(_EventLoop, self).__init__(selector=selector)
def call_soon(self, callback, *args):
handle = super(_EventLoop, self).call_soon(callback, *args)
@@ -222,39 +285,6 @@ class _EventLoop(asyncio.SelectorEventLoop):
self._write_to_self()
return handle
- def set_debug(self, debug):
- super(_EventLoop, self).set_debug(debug)
-
- self._hub.debug_exceptions = debug
-
- # Detect blocking eventlet functions. The feature is implemented with
- # signal.alarm() which is is not available on Windows. Signal handlers
- # can only be set from the main loop. So detecting blocking functions
- # cannot be used on Windows nor from a thread different than the main
- # thread.
- self._hub.debug_blocking = (
- debug
- and (sys.platform != 'win32')
- and isinstance(threading.current_thread(), threading._MainThread))
-
- if (self._hub.debug_blocking
- and hasattr(self, 'slow_callback_duration')):
- self._hub.debug_blocking_resolution = self.slow_callback_duration
-
- def run_forever(self):
- self._greenthread = eventlet.getcurrent()
- try:
- super(_EventLoop, self).run_forever()
- finally:
- if self._hub.debug_blocking:
- # eventlet event loop is still running: cancel the current
- # detection of blocking tasks
- signal.alarm(0)
- self._greenthread = None
-
- def time(self):
- return self._hub.clock()
-
# FIXME: mark as abstract
def link_future(self, future):
pass
@@ -264,10 +294,22 @@ class _EventLoop(asyncio.SelectorEventLoop):
class EventletEventLoop(_EventLoop):
def __init__(self):
- hub = eventlet.hubs.get_hub()
- selector = _Selector(self, hub)
+ # Store a reference to the hub to ensure
+ # that we always use the same hub
+ self._hub = eventlet.hubs.get_hub()
+
+ selector = _EventletSelector(self._hub, self)
super(EventletEventLoop, self).__init__(hub, selector)
+ # Force a call to set_debug() to set hub.debug_blocking
+ self.set_debug(self.get_debug())
+
+ if eventlet.patcher.is_monkey_patched('thread'):
+ self._default_executor = _TpoolExecutor(self)
+
+ def time(self):
+ return self._hub.clock()
+
def link_future(self, future):
"""Wait for a future, a task, or a coroutine object from a greenthread.
@@ -343,12 +385,42 @@ class EventletEventLoop(_EventLoop):
gt.run = wrap_func
return fut
+ def run_forever(self):
+ self._greenthread = eventlet.getcurrent()
+ try:
+ super(_EventLoop, self).run_forever()
+ finally:
+ if self._hub.debug_blocking:
+ # eventlet event loop is still running: cancel the current
+ # detection of blocking tasks
+ signal.alarm(0)
+ self._greenthread = None
+
+ def set_debug(self, debug):
+ super(_EventLoop, self).set_debug(debug)
+
+ self._hub.debug_exceptions = debug
+ # Detect blocking eventlet functions. The feature is implemented with
+ # signal.alarm() which is is not available on Windows. Signal handlers
+ # can only be set from the main loop. So detecting blocking functions
+ # cannot be used on Windows nor from a thread different than the main
+ # thread.
+ self._hub.debug_blocking = (
+ debug
+ and (sys.platform != 'win32')
+ and isinstance(threading.current_thread(), threading._MainThread))
+
+ if (self._hub.debug_blocking
+ and hasattr(self, 'slow_callback_duration')):
+ self._hub.debug_blocking_resolution = self.slow_callback_duration
+
+
+# FIXME: support gevent 1.0
class GeventEventLoop(_EventLoop):
def __init__(self):
- hub = eventlet.hubs.get_hub()
- selector = _Selector(self, hub)
- super(GeventEventLoop, self).__init__(hub, selector)
+ selector = _GeventSelector(self)
+ super(GeventEventLoop, self).__init__(selector=selector)
def link_future(self, future):
"""Wait for a future, a task, or a coroutine object from a greenthread.
@@ -399,16 +471,15 @@ class GeventEventLoop(_EventLoop):
raise RuntimeError("wrap_greenthread: the greenthread already finished")
if isinstance(gt, gevent.Greenlet):
- orig_main = gt.run
+ orig_func = gt._run
def wrap_func(*args, **kw):
try:
- orig_main(*args, **kw)
+ result = orig_func(*args, **kw)
except Exception as exc:
fut.set_exception(exc)
else:
- result = gt.wait()
fut.set_result(result)
- gt.run = wrap_func
+ gt._run = wrap_func
else:
try:
orig_func = gt.run
@@ -425,6 +496,13 @@ class GeventEventLoop(_EventLoop):
gt.run = wrap_func
return fut
+ def run_forever(self):
+ self._greenthread = gevent.getcurrent()
+ try:
+ super(_EventLoop, self).run_forever()
+ finally:
+ self._greenthread = None
+
class EventletEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
_loop_factory = EventletEventLoop