summaryrefslogtreecommitdiff
path: root/tests/transport/threaded/tests.py
blob: 2e96929a2bd52abe1374e542731da1e541acd0a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import mock
import os
import time
from tempfile import mkstemp

from raven.utils.testutils import TestCase

from raven.base import Client
from raven.transport.threaded import ThreadedHTTPTransport
from raven.utils.urlparse import urlparse


class DummyThreadedScheme(ThreadedHTTPTransport):
    def __init__(self, *args, **kwargs):
        super(ThreadedHTTPTransport, self).__init__(*args, **kwargs)
        self.events = []
        self.send_delay = 0

    def send_sync(self, url, data, headers, success_cb, failure_cb):
        # delay sending the message, to allow us to test that the shutdown
        # hook waits correctly
        time.sleep(self.send_delay)

        self.events.append((url, data, headers, success_cb, failure_cb))


class LoggingThreadedScheme(ThreadedHTTPTransport):
    def __init__(self, filename, *args, **kwargs):
        super(LoggingThreadedScheme, self).__init__(*args, **kwargs)
        self.filename = filename

    def send_sync(self, url, data, headers, success_cb, failure_cb):
        with open(self.filename, 'a') as log:
            log.write("{0} {1}\n".format(os.getpid(), data['message']))


class ThreadedTransportTest(TestCase):
    def setUp(self):
        self.url = "http://some_username:some_password@localhost:8143/1"
        self.client = Client(
            dsn=self.url,
            transport=ThreadedHTTPTransport
        )

    @mock.patch('raven.transport.http.HTTPTransport.send')
    def test_does_send(self, send):
        self.client.captureMessage(message='foo')

        time.sleep(0.1)

        # TODO: This test could be more precise by ensuring it's sending the same params that are sent
        # to the ThreadedHTTPTransport.send() method
        self.assertEqual(send.call_count, 1)

    def test_shutdown_waits_for_send(self):
        url = urlparse(self.url)
        transport = DummyThreadedScheme()
        transport.send_delay = 0.5

        data = self.client.build_msg('raven.events.Message', message='foo')
        transport.async_send(url, data, None, None, None)

        time.sleep(0.1)

        # this should wait for the message to get sent
        transport.get_worker().main_thread_terminated()

        self.assertEqual(len(transport.events), 1)

    def test_fork_spawns_anew(self):
        url = urlparse(self.url)
        transport = DummyThreadedScheme()
        transport.send_delay = 0.5

        data = self.client.build_msg('raven.events.Message', message='foo')

        pid = os.fork()
        if pid == 0:
            time.sleep(0.1)

            transport.async_send(url, data, None, None, None)

            # this should wait for the message to get sent
            transport.get_worker().main_thread_terminated()

            self.assertEqual(len(transport.events), 1)
            # Use os._exit here so that py.test gets not confused about
            # what the hell we're doing here.
            os._exit(0)
        else:
            os.waitpid(pid, 0)

    def test_fork_with_active_worker(self):
        # Test threaded transport when forking with an active worker.
        # Forking a process doesn't clone the worker thread - make sure
        # logging from both processes still works.
        event1 = self.client.build_msg('raven.events.Message', message='parent')
        event2 = self.client.build_msg('raven.events.Message', message='child')
        url = urlparse(self.url)
        fd, filename = mkstemp()
        try:
            os.close(fd)
            transport = LoggingThreadedScheme(filename)

            # Log from the parent process - starts the worker thread
            transport.async_send(url, event1, None, None, None)
            childpid = os.fork()

            if childpid == 0:
                # Log from the child process
                transport.async_send(url, event2, None, None, None)

                # Ensure threaded worker has finished
                transport.get_worker().stop()
                os._exit(0)

            # Wait for the child process to finish
            os.waitpid(childpid, 0)
            assert os.path.isfile(filename)

            # Ensure threaded worker has finished
            transport.get_worker().stop()

            with open(filename, 'r') as logfile:
                events = dict(x.strip().split() for x in logfile.readlines())

            # Check parent and child both logged successfully
            assert events == {
                str(os.getpid()): 'parent',
                str(childpid): 'child',
            }
        finally:
            os.remove(filename)