1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
import mock
import os
import time
from tempfile import mkstemp
from raven.utils.testutils import TestCase
from raven.base import Client
from raven.transport.threaded import ThreadedHTTPTransport
from raven.utils.urlparse import urlparse
class DummyThreadedScheme(ThreadedHTTPTransport):
def __init__(self, *args, **kwargs):
super(ThreadedHTTPTransport, self).__init__(*args, **kwargs)
self.events = []
self.send_delay = 0
def send_sync(self, url, data, headers, success_cb, failure_cb):
# delay sending the message, to allow us to test that the shutdown
# hook waits correctly
time.sleep(self.send_delay)
self.events.append((url, data, headers, success_cb, failure_cb))
class LoggingThreadedScheme(ThreadedHTTPTransport):
def __init__(self, filename, *args, **kwargs):
super(LoggingThreadedScheme, self).__init__(*args, **kwargs)
self.filename = filename
def send_sync(self, url, data, headers, success_cb, failure_cb):
with open(self.filename, 'a') as log:
log.write("{0} {1}\n".format(os.getpid(), data['message']))
class ThreadedTransportTest(TestCase):
def setUp(self):
self.url = "http://some_username:some_password@localhost:8143/1"
self.client = Client(
dsn=self.url,
transport=ThreadedHTTPTransport
)
@mock.patch('raven.transport.http.HTTPTransport.send')
def test_does_send(self, send):
self.client.captureMessage(message='foo')
time.sleep(0.1)
# TODO: This test could be more precise by ensuring it's sending the same params that are sent
# to the ThreadedHTTPTransport.send() method
self.assertEqual(send.call_count, 1)
def test_shutdown_waits_for_send(self):
url = urlparse(self.url)
transport = DummyThreadedScheme()
transport.send_delay = 0.5
data = self.client.build_msg('raven.events.Message', message='foo')
transport.async_send(url, data, None, None, None)
time.sleep(0.1)
# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()
self.assertEqual(len(transport.events), 1)
def test_fork_spawns_anew(self):
url = urlparse(self.url)
transport = DummyThreadedScheme()
transport.send_delay = 0.5
data = self.client.build_msg('raven.events.Message', message='foo')
pid = os.fork()
if pid == 0:
time.sleep(0.1)
transport.async_send(url, data, None, None, None)
# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()
self.assertEqual(len(transport.events), 1)
# Use os._exit here so that py.test gets not confused about
# what the hell we're doing here.
os._exit(0)
else:
os.waitpid(pid, 0)
def test_fork_with_active_worker(self):
# Test threaded transport when forking with an active worker.
# Forking a process doesn't clone the worker thread - make sure
# logging from both processes still works.
event1 = self.client.build_msg('raven.events.Message', message='parent')
event2 = self.client.build_msg('raven.events.Message', message='child')
url = urlparse(self.url)
fd, filename = mkstemp()
try:
os.close(fd)
transport = LoggingThreadedScheme(filename)
# Log from the parent process - starts the worker thread
transport.async_send(url, event1, None, None, None)
childpid = os.fork()
if childpid == 0:
# Log from the child process
transport.async_send(url, event2, None, None, None)
# Ensure threaded worker has finished
transport.get_worker().stop()
os._exit(0)
# Wait for the child process to finish
os.waitpid(childpid, 0)
assert os.path.isfile(filename)
# Ensure threaded worker has finished
transport.get_worker().stop()
with open(filename, 'r') as logfile:
events = dict(x.strip().split() for x in logfile.readlines())
# Check parent and child both logged successfully
assert events == {
str(os.getpid()): 'parent',
str(childpid): 'child',
}
finally:
os.remove(filename)
|