1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
import mock
import time
from raven.utils.testutils import TestCase
from raven.base import Client
from raven.transport.threaded_requests import ThreadedRequestsHTTPTransport
from raven.utils.urlparse import urlparse
class DummyThreadedScheme(ThreadedRequestsHTTPTransport):
def __init__(self, *args, **kwargs):
super(ThreadedRequestsHTTPTransport, self).__init__(*args, **kwargs)
self.events = []
self.send_delay = 0
def send_sync(self, url, data, headers, success_cb, failure_cb):
# delay sending the message, to allow us to test that the shutdown
# hook waits correctly
time.sleep(self.send_delay)
self.events.append((url, data, headers, success_cb, failure_cb))
class ThreadedTransportTest(TestCase):
def setUp(self):
self.url = "http://some_username:some_password@localhost:8143/1"
self.client = Client(
dsn=self.url,
transport=ThreadedRequestsHTTPTransport
)
@mock.patch('raven.transport.requests.post')
def test_does_send(self, send):
self.client.captureMessage(message='foo')
time.sleep(0.1)
self.assertEqual(send.call_count, 1)
expected_url = 'http://localhost:8143/api/1/store/'
self.assertEqual(expected_url, send.call_args[0][0])
def test_shutdown_waits_for_send(self):
url = urlparse(self.url)
transport = DummyThreadedScheme()
transport.send_delay = 0.5
data = self.client.build_msg('raven.events.Message', message='foo')
transport.async_send(url, data, None, None, None)
time.sleep(0.1)
# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()
self.assertEqual(len(transport.events), 1)
|