diff options
author | Armin Ronacher <armin.ronacher@active-4.com> | 2015-12-10 20:09:56 +0100 |
---|---|---|
committer | Armin Ronacher <armin.ronacher@active-4.com> | 2015-12-10 20:09:56 +0100 |
commit | 6f0ad8232ad3bddb95c6d5b7ce5890ce409e8db6 (patch) | |
tree | c9526a555224feb0c11613b34b2fe931eb5fda82 | |
parent | 2fc64e8056f8117c93633b26dfa76945523040e6 (diff) | |
download | raven-6f0ad8232ad3bddb95c6d5b7ce5890ce409e8db6.tar.gz |
Spawn new thread for different pid
This spawns a thread again first time on queue if the process was forked
by the time the enqueueing happened.
-rw-r--r-- | raven/transport/threaded.py | 17 | ||||
-rw-r--r-- | tests/transport/threaded/tests.py | 23 |
2 files changed, 37 insertions, 3 deletions
diff --git a/raven/transport/threaded.py b/raven/transport/threaded.py index d56c59a..b46e786 100644 --- a/raven/transport/threaded.py +++ b/raven/transport/threaded.py @@ -30,18 +30,26 @@ class AsyncWorker(object): self._queue = Queue(-1) self._lock = threading.Lock() self._thread = None + self._thread_for_pid = None self.options = { 'shutdown_timeout': shutdown_timeout, } self.start() def is_alive(self): - return self._thread.is_alive() + if self._thread_for_pid != os.getpid(): + return False + return self._thread and self._thread.is_alive() + + def _ensure_thread(self): + if self.is_alive(): + return + self.start() def main_thread_terminated(self): self._lock.acquire() try: - if not self._thread: + if not self.is_alive(): # thread not started or already stopped - nothing to do return @@ -107,10 +115,11 @@ class AsyncWorker(object): """ self._lock.acquire() try: - if not self._thread: + if not self.is_alive(): self._thread = threading.Thread(target=self._target) self._thread.setDaemon(True) self._thread.start() + self._thread_for_pid = os.getpid() finally: self._lock.release() atexit.register(self.main_thread_terminated) @@ -125,10 +134,12 @@ class AsyncWorker(object): self._queue.put_nowait(self._terminator) self._thread.join(timeout=timeout) self._thread = None + self._thread_for_pid = None finally: self._lock.release() def queue(self, callback, *args, **kwargs): + self._ensure_thread() self._queue.put_nowait((callback, args, kwargs)) def _target(self): diff --git a/tests/transport/threaded/tests.py b/tests/transport/threaded/tests.py index cbd74f0..f63dc71 100644 --- a/tests/transport/threaded/tests.py +++ b/tests/transport/threaded/tests.py @@ -64,6 +64,29 @@ class ThreadedTransportTest(TestCase): self.assertEqual(len(transport.events), 1) + def test_fork_spawns_anew(self): + url = urlparse(self.url) + transport = DummyThreadedScheme(url) + transport.send_delay = 0.5 + + data = self.client.build_msg('raven.events.Message', message='foo') + + pid = os.fork() + if pid == 0: + time.sleep(0.1) + + transport.async_send(data, None, None, None) + + # this should wait for the message to get sent + transport.get_worker().main_thread_terminated() + + self.assertEqual(len(transport.events), 1) + # Use os._exit here so that py.test gets not confused about + # what the hell we're doing here. + os._exit(0) + else: + os.waitpid(pid, 0) + def test_fork_with_active_worker(self): # Test threaded transport when forking with an active worker. # Forking a process doesn't clone the worker thread - make sure |