summaryrefslogtreecommitdiff
path: root/tests/test_timeouts.py
blob: 1f392a38ccfa4d5f78ef6e388ec5787a0f84da05 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import time

from rq import Queue, SimpleWorker
from rq.timeouts import TimerDeathPenalty
from rq.registry import FailedJobRegistry, FinishedJobRegistry
from tests import RQTestCase


class TimerBasedWorker(SimpleWorker):
    death_penalty_class = TimerDeathPenalty


def thread_friendly_sleep_func(seconds):
    end_at = time.time() + seconds
    while True:
        if time.time() > end_at:
            break


class TestTimeouts(RQTestCase):
    def test_timer_death_penalty(self):
        """Ensure TimerDeathPenalty works correctly."""
        q = Queue(connection=self.testconn)
        q.empty()
        finished_job_registry = FinishedJobRegistry(connection=self.testconn)
        failed_job_registry = FailedJobRegistry(connection=self.testconn)

        # make sure death_penalty_class persists
        w = TimerBasedWorker([q], connection=self.testconn)
        self.assertIsNotNone(w)
        self.assertEqual(w.death_penalty_class, TimerDeathPenalty)

        # Test short-running job doesn't raise JobTimeoutException
        job = q.enqueue(thread_friendly_sleep_func, args=(1,), job_timeout=3)
        w.work(burst=True)
        job.refresh()
        self.assertIn(job, finished_job_registry)

        # Test long-running job raises JobTimeoutException
        job = q.enqueue(thread_friendly_sleep_func, args=(5,), job_timeout=3)
        w.work(burst=True)
        self.assertIn(job, failed_job_registry)
        job.refresh()
        self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info)

        # Test negative timeout doesn't raise JobTimeoutException,
        # which implies an unintended immediate timeout.
        job = q.enqueue(thread_friendly_sleep_func, args=(1,), job_timeout=-1)
        w.work(burst=True)
        job.refresh()
        self.assertIn(job, finished_job_registry)