summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-11-19 02:10:07 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-11-19 02:10:07 +0100
commit6d4367e018173557ec11de6805b4800c240ad1ba (patch)
tree95f44488481474e5e77ac0236af7af8957fae55b
parent3ac9ba8992890f5a6897f4a0b06b335783e36596 (diff)
downloadaioeventlet-6d4367e018173557ec11de6805b4800c240ad1ba.tar.gz
ensure that the greenthread run by the thread queue is a child of run_forever()
greenthread
-rw-r--r--aiogreen/__init__.py36
1 files changed, 27 insertions, 9 deletions
diff --git a/aiogreen/__init__.py b/aiogreen/__init__.py
index 80f5e30..5eff72f 100644
--- a/aiogreen/__init__.py
+++ b/aiogreen/__init__.py
@@ -49,12 +49,14 @@ class EventLoopPolicy(trollius.AbstractEventLoopPolicy):
# FIXME: is there a more efficient way to exchange data between two threads?
-class ThreadQueue:
+class _ThreadQueue:
+ """Queue to schedule handles """
def __init__(self, loop):
self._loop = loop
self._queue = queue.Queue()
- # FIXME: only schedule the consumer at the first call
- # to consume?
+ self._greenthread = None
+
+ def start(self):
self._greenthread = eventlet.spawn(self._consume)
def _consume(self):
@@ -70,12 +72,15 @@ class ThreadQueue:
break
self._loop._call_soon_handle(handle)
- def put(self, item):
- self._queue.put((False, item))
+ def put(self, handle):
+ self._queue.put((False, handle))
def stop(self):
+ if self._greenthread is None:
+ return
self._queue.put((True, None))
self._greenthread.wait()
+ self._greenthread = None
class SocketTransport(selector_events._SelectorSocketTransport):
@@ -178,14 +183,21 @@ class _Scheduler(object):
class EventLoop(BaseEventLoop):
def __init__(self):
super(EventLoop, self).__init__()
+
# Store a reference to the hub to ensure
# that we always use the same hub
self._hub = eventlet.hubs.get_hub()
- # Queue used by call_soon_threadsafe()
- self._thread_queue = ThreadQueue(self)
- self._stop_event = None
if self.get_debug():
self._hub.debug_blocking = True
+
+ # Queue used by call_soon_threadsafe()
+ self._thread_queue = _ThreadQueue(self)
+
+ # Event used to stop the event loop and to check if the event loop is
+ # running?
+ self._stop_event = None
+
+ # Scheduler used to schedule a call to the loop._run_once() method
self._scheduler = _Scheduler(self)
def time(self):
@@ -284,6 +296,9 @@ class EventLoop(BaseEventLoop):
if self._stop_event is not None:
raise RuntimeError("reentrant call to run_forever()")
+ # Start to thread queue in run_forever() to create a greenthread linked
+ # to the current greenthread
+ self._thread_queue.start()
self._reschedule()
try:
self._stop_event = eventlet.event.Event()
@@ -293,10 +308,13 @@ class EventLoop(BaseEventLoop):
finally:
self._stop_event = None
self._scheduler.stop()
+ # Stop the greenthread of the thread queue.
+ # call_soon_threadsafe() can still be called, handles will be
+ # stored in the thread queue.
+ self._thread_queue.stop()
def close(self):
super(EventLoop, self).close()
- self._thread_queue.stop()
def run_until_complete(self, future):
# FIXME: don't copy/paste Trollius code, but