summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArmin Ronacher <armin.ronacher@active-4.com>2015-12-10 20:09:56 +0100
committerArmin Ronacher <armin.ronacher@active-4.com>2015-12-10 20:09:56 +0100
commit6f0ad8232ad3bddb95c6d5b7ce5890ce409e8db6 (patch)
treec9526a555224feb0c11613b34b2fe931eb5fda82
parent2fc64e8056f8117c93633b26dfa76945523040e6 (diff)
downloadraven-6f0ad8232ad3bddb95c6d5b7ce5890ce409e8db6.tar.gz
Spawn new thread for different pid
This spawns a thread again first time on queue if the process was forked by the time the enqueueing happened.
-rw-r--r--raven/transport/threaded.py17
-rw-r--r--tests/transport/threaded/tests.py23
2 files changed, 37 insertions, 3 deletions
diff --git a/raven/transport/threaded.py b/raven/transport/threaded.py
index d56c59a..b46e786 100644
--- a/raven/transport/threaded.py
+++ b/raven/transport/threaded.py
@@ -30,18 +30,26 @@ class AsyncWorker(object):
self._queue = Queue(-1)
self._lock = threading.Lock()
self._thread = None
+ self._thread_for_pid = None
self.options = {
'shutdown_timeout': shutdown_timeout,
}
self.start()
def is_alive(self):
- return self._thread.is_alive()
+ if self._thread_for_pid != os.getpid():
+ return False
+ return self._thread and self._thread.is_alive()
+
+ def _ensure_thread(self):
+ if self.is_alive():
+ return
+ self.start()
def main_thread_terminated(self):
self._lock.acquire()
try:
- if not self._thread:
+ if not self.is_alive():
# thread not started or already stopped - nothing to do
return
@@ -107,10 +115,11 @@ class AsyncWorker(object):
"""
self._lock.acquire()
try:
- if not self._thread:
+ if not self.is_alive():
self._thread = threading.Thread(target=self._target)
self._thread.setDaemon(True)
self._thread.start()
+ self._thread_for_pid = os.getpid()
finally:
self._lock.release()
atexit.register(self.main_thread_terminated)
@@ -125,10 +134,12 @@ class AsyncWorker(object):
self._queue.put_nowait(self._terminator)
self._thread.join(timeout=timeout)
self._thread = None
+ self._thread_for_pid = None
finally:
self._lock.release()
def queue(self, callback, *args, **kwargs):
+ self._ensure_thread()
self._queue.put_nowait((callback, args, kwargs))
def _target(self):
diff --git a/tests/transport/threaded/tests.py b/tests/transport/threaded/tests.py
index cbd74f0..f63dc71 100644
--- a/tests/transport/threaded/tests.py
+++ b/tests/transport/threaded/tests.py
@@ -64,6 +64,29 @@ class ThreadedTransportTest(TestCase):
self.assertEqual(len(transport.events), 1)
+ def test_fork_spawns_anew(self):
+ url = urlparse(self.url)
+ transport = DummyThreadedScheme(url)
+ transport.send_delay = 0.5
+
+ data = self.client.build_msg('raven.events.Message', message='foo')
+
+ pid = os.fork()
+ if pid == 0:
+ time.sleep(0.1)
+
+ transport.async_send(data, None, None, None)
+
+ # this should wait for the message to get sent
+ transport.get_worker().main_thread_terminated()
+
+ self.assertEqual(len(transport.events), 1)
+ # Use os._exit here so that py.test gets not confused about
+ # what the hell we're doing here.
+ os._exit(0)
+ else:
+ os.waitpid(pid, 0)
+
def test_fork_with_active_worker(self):
# Test threaded transport when forking with an active worker.
# Forking a process doesn't clone the worker thread - make sure