blob: 95a3b745ac06a318fc1e0bd78c48208341dfe3c8 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
from __future__ import absolute_import
import celery
from raven.contrib.celery import SentryCeleryHandler
from raven.utils.testutils import InMemoryClient, TestCase
class CeleryTestCase(TestCase):
def setUp(self):
super(CeleryTestCase, self).setUp()
self.celery = celery.Celery(__name__)
self.celery.conf.CELERY_ALWAYS_EAGER = True
self.client = InMemoryClient()
self.handler = SentryCeleryHandler(self.client, ignore_expected=True)
self.handler.install()
self.addCleanup(self.handler.uninstall)
def test_simple(self):
@self.celery.task(name='dummy_task')
def dummy_task(x, y):
return x / y
dummy_task.delay(1, 2)
dummy_task.delay(1, 0)
assert len(self.client.events) == 1
event = self.client.events[0]
exception = event['exception']['values'][-1]
assert event['culprit'] == 'dummy_task'
assert exception['type'] == 'ZeroDivisionError'
def test_ignore_expected(self):
@self.celery.task(name='dummy_task', throws=(ZeroDivisionError,))
def dummy_task(x, y):
return x / y
dummy_task.delay(1, 2)
dummy_task.delay(1, 0)
assert len(self.client.events) == 0
|