summaryrefslogtreecommitdiff
path: root/tests/test_dependencies.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_dependencies.py')
-rw-r--r--tests/test_dependencies.py99
1 files changed, 99 insertions, 0 deletions
diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py
new file mode 100644
index 0000000..6d2f776
--- /dev/null
+++ b/tests/test_dependencies.py
@@ -0,0 +1,99 @@
+from tests import RQTestCase
+from tests.fixtures import div_by_zero, say_hello
+
+from rq import Queue, SimpleWorker
+from rq.job import Job, JobStatus, Dependency
+
+
+class TestDependencies(RQTestCase):
+
+ def test_allow_failure_is_persisted(self):
+ """Ensure that job.allow_dependency_failures is properly set
+ when providing Dependency object to depends_on."""
+ dep_job = Job.create(func=say_hello)
+
+ # default to False, maintaining current behavior
+ job = Job.create(func=say_hello, depends_on=Dependency([dep_job]))
+ job.save()
+ Job.fetch(job.id, connection=self.testconn)
+ self.assertFalse(job.allow_dependency_failures)
+
+ job = Job.create(func=say_hello, depends_on=Dependency([dep_job], allow_failure=True))
+ job.save()
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertTrue(job.allow_dependency_failures)
+
+ jobs = Job.fetch_many([job.id], connection=self.testconn)
+ self.assertTrue(jobs[0].allow_dependency_failures)
+
+ def test_job_dependency(self):
+ """Enqueue dependent jobs only when appropriate"""
+ q = Queue(connection=self.testconn)
+ w = SimpleWorker([q], connection=q.connection)
+
+ # enqueue dependent job when parent successfully finishes
+ parent_job = q.enqueue(say_hello)
+ job = q.enqueue_call(say_hello, depends_on=parent_job)
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+ q.empty()
+
+ # don't enqueue dependent job when parent fails
+ parent_job = q.enqueue(div_by_zero)
+ job = q.enqueue_call(say_hello, depends_on=parent_job)
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
+ q.empty()
+
+ # don't enqueue dependent job when Dependency.allow_failure=False (the default)
+ parent_job = q.enqueue(div_by_zero)
+ dependency = Dependency(jobs=parent_job)
+ job = q.enqueue_call(say_hello, depends_on=dependency)
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
+
+ # enqueue dependent job when Dependency.allow_failure=True
+ parent_job = q.enqueue(div_by_zero)
+ dependency = Dependency(jobs=parent_job, allow_failure=True)
+ job = q.enqueue_call(say_hello, depends_on=dependency)
+
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertTrue(job.allow_dependency_failures)
+
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+
+ # When a failing job has multiple dependents, only enqueue those
+ # with allow_failure=True
+ parent_job = q.enqueue(div_by_zero)
+ job_allow_failure = q.enqueue(say_hello,
+ depends_on=Dependency(jobs=parent_job, allow_failure=True))
+ job = q.enqueue(say_hello,
+ depends_on=Dependency(jobs=parent_job, allow_failure=False))
+ w.work(burst=True, max_jobs=1)
+ self.assertEqual(parent_job.get_status(), JobStatus.FAILED)
+ self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED)
+ self.assertEqual(job.get_status(), JobStatus.DEFERRED)
+ q.empty()
+
+ # only enqueue dependent job when all dependencies have finished/failed
+ first_parent_job = q.enqueue(div_by_zero)
+ second_parent_job = q.enqueue(say_hello)
+ dependencies = Dependency(jobs=[first_parent_job, second_parent_job], allow_failure=True)
+ job = q.enqueue_call(say_hello, depends_on=dependencies)
+ w.work(burst=True, max_jobs=1)
+ self.assertEqual(first_parent_job.get_status(), JobStatus.FAILED)
+ self.assertEqual(second_parent_job.get_status(), JobStatus.QUEUED)
+ self.assertEqual(job.get_status(), JobStatus.DEFERRED)
+
+ # When second job finishes, dependent job should be queued
+ w.work(burst=True, max_jobs=1)
+ self.assertEqual(second_parent_job.get_status(), JobStatus.FINISHED)
+ self.assertEqual(job.get_status(), JobStatus.QUEUED)
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)