diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-11-19 02:10:07 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-11-19 02:10:07 +0100 |
commit | 6d4367e018173557ec11de6805b4800c240ad1ba (patch) | |
tree | 95f44488481474e5e77ac0236af7af8957fae55b | |
parent | 3ac9ba8992890f5a6897f4a0b06b335783e36596 (diff) | |
download | aioeventlet-6d4367e018173557ec11de6805b4800c240ad1ba.tar.gz |
ensure that the greenthread run by the thread queue is a child of run_forever()
greenthread
-rw-r--r-- | aiogreen/__init__.py | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/aiogreen/__init__.py b/aiogreen/__init__.py index 80f5e30..5eff72f 100644 --- a/aiogreen/__init__.py +++ b/aiogreen/__init__.py @@ -49,12 +49,14 @@ class EventLoopPolicy(trollius.AbstractEventLoopPolicy): # FIXME: is there a more efficient way to exchange data between two threads? -class ThreadQueue: +class _ThreadQueue: + """Queue to schedule handles """ def __init__(self, loop): self._loop = loop self._queue = queue.Queue() - # FIXME: only schedule the consumer at the first call - # to consume? + self._greenthread = None + + def start(self): self._greenthread = eventlet.spawn(self._consume) def _consume(self): @@ -70,12 +72,15 @@ class ThreadQueue: break self._loop._call_soon_handle(handle) - def put(self, item): - self._queue.put((False, item)) + def put(self, handle): + self._queue.put((False, handle)) def stop(self): + if self._greenthread is None: + return self._queue.put((True, None)) self._greenthread.wait() + self._greenthread = None class SocketTransport(selector_events._SelectorSocketTransport): @@ -178,14 +183,21 @@ class _Scheduler(object): class EventLoop(BaseEventLoop): def __init__(self): super(EventLoop, self).__init__() + # Store a reference to the hub to ensure # that we always use the same hub self._hub = eventlet.hubs.get_hub() - # Queue used by call_soon_threadsafe() - self._thread_queue = ThreadQueue(self) - self._stop_event = None if self.get_debug(): self._hub.debug_blocking = True + + # Queue used by call_soon_threadsafe() + self._thread_queue = _ThreadQueue(self) + + # Event used to stop the event loop and to check if the event loop is + # running? + self._stop_event = None + + # Scheduler used to schedule a call to the loop._run_once() method self._scheduler = _Scheduler(self) def time(self): @@ -284,6 +296,9 @@ class EventLoop(BaseEventLoop): if self._stop_event is not None: raise RuntimeError("reentrant call to run_forever()") + # Start to thread queue in run_forever() to create a greenthread linked + # to the current greenthread + self._thread_queue.start() self._reschedule() try: self._stop_event = eventlet.event.Event() @@ -293,10 +308,13 @@ class EventLoop(BaseEventLoop): finally: self._stop_event = None self._scheduler.stop() + # Stop the greenthread of the thread queue. + # call_soon_threadsafe() can still be called, handles will be + # stored in the thread queue. + self._thread_queue.stop() def close(self): super(EventLoop, self).close() - self._thread_queue.stop() def run_until_complete(self, future): # FIXME: don't copy/paste Trollius code, but |