summaryrefslogtreecommitdiff
path: root/tests/test_dependencies.py
blob: 6d2f776cd72d75fcba5d7916110f435c778b0351 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello

from rq import Queue, SimpleWorker
from rq.job import Job, JobStatus, Dependency


class TestDependencies(RQTestCase):

    def test_allow_failure_is_persisted(self):
        """Ensure that job.allow_dependency_failures is properly set
        when providing Dependency object to depends_on."""
        dep_job = Job.create(func=say_hello)

        # default to False, maintaining current behavior
        job = Job.create(func=say_hello, depends_on=Dependency([dep_job]))
        job.save()
        Job.fetch(job.id, connection=self.testconn)
        self.assertFalse(job.allow_dependency_failures)

        job = Job.create(func=say_hello, depends_on=Dependency([dep_job], allow_failure=True))
        job.save()
        job = Job.fetch(job.id, connection=self.testconn)
        self.assertTrue(job.allow_dependency_failures)

        jobs = Job.fetch_many([job.id], connection=self.testconn)
        self.assertTrue(jobs[0].allow_dependency_failures)

    def test_job_dependency(self):
        """Enqueue dependent jobs only when appropriate"""
        q = Queue(connection=self.testconn)
        w = SimpleWorker([q], connection=q.connection)

        # enqueue dependent job when parent successfully finishes
        parent_job = q.enqueue(say_hello)
        job = q.enqueue_call(say_hello, depends_on=parent_job)
        w.work(burst=True)
        job = Job.fetch(job.id, connection=self.testconn)
        self.assertEqual(job.get_status(), JobStatus.FINISHED)
        q.empty()

        # don't enqueue dependent job when parent fails
        parent_job = q.enqueue(div_by_zero)
        job = q.enqueue_call(say_hello, depends_on=parent_job)
        w.work(burst=True)
        job = Job.fetch(job.id, connection=self.testconn)
        self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
        q.empty()

        # don't enqueue dependent job when Dependency.allow_failure=False (the default)
        parent_job = q.enqueue(div_by_zero)
        dependency = Dependency(jobs=parent_job)
        job = q.enqueue_call(say_hello, depends_on=dependency)
        w.work(burst=True)
        job = Job.fetch(job.id, connection=self.testconn)
        self.assertNotEqual(job.get_status(), JobStatus.FINISHED)

        # enqueue dependent job when Dependency.allow_failure=True
        parent_job = q.enqueue(div_by_zero)
        dependency = Dependency(jobs=parent_job, allow_failure=True)
        job = q.enqueue_call(say_hello, depends_on=dependency)

        job = Job.fetch(job.id, connection=self.testconn)
        self.assertTrue(job.allow_dependency_failures)

        w.work(burst=True)
        job = Job.fetch(job.id, connection=self.testconn)
        self.assertEqual(job.get_status(), JobStatus.FINISHED)

        # When a failing job has multiple dependents, only enqueue those
        # with allow_failure=True
        parent_job = q.enqueue(div_by_zero)
        job_allow_failure = q.enqueue(say_hello,
                                      depends_on=Dependency(jobs=parent_job, allow_failure=True))
        job = q.enqueue(say_hello,
                        depends_on=Dependency(jobs=parent_job, allow_failure=False))
        w.work(burst=True, max_jobs=1)
        self.assertEqual(parent_job.get_status(), JobStatus.FAILED)
        self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED)
        self.assertEqual(job.get_status(), JobStatus.DEFERRED)
        q.empty()

        # only enqueue dependent job when all dependencies have finished/failed
        first_parent_job = q.enqueue(div_by_zero)
        second_parent_job = q.enqueue(say_hello)
        dependencies = Dependency(jobs=[first_parent_job, second_parent_job], allow_failure=True)
        job = q.enqueue_call(say_hello, depends_on=dependencies)
        w.work(burst=True, max_jobs=1)
        self.assertEqual(first_parent_job.get_status(), JobStatus.FAILED)
        self.assertEqual(second_parent_job.get_status(), JobStatus.QUEUED)
        self.assertEqual(job.get_status(), JobStatus.DEFERRED)

        # When second job finishes, dependent job should be queued
        w.work(burst=True, max_jobs=1)
        self.assertEqual(second_parent_job.get_status(), JobStatus.FINISHED)
        self.assertEqual(job.get_status(), JobStatus.QUEUED)
        w.work(burst=True)
        job = Job.fetch(job.id, connection=self.testconn)
        self.assertEqual(job.get_status(), JobStatus.FINISHED)