diff options
Diffstat (limited to 'tests/test_dependencies.py')
-rw-r--r-- | tests/test_dependencies.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py new file mode 100644 index 0000000..6d2f776 --- /dev/null +++ b/tests/test_dependencies.py @@ -0,0 +1,99 @@ +from tests import RQTestCase +from tests.fixtures import div_by_zero, say_hello + +from rq import Queue, SimpleWorker +from rq.job import Job, JobStatus, Dependency + + +class TestDependencies(RQTestCase): + + def test_allow_failure_is_persisted(self): + """Ensure that job.allow_dependency_failures is properly set + when providing Dependency object to depends_on.""" + dep_job = Job.create(func=say_hello) + + # default to False, maintaining current behavior + job = Job.create(func=say_hello, depends_on=Dependency([dep_job])) + job.save() + Job.fetch(job.id, connection=self.testconn) + self.assertFalse(job.allow_dependency_failures) + + job = Job.create(func=say_hello, depends_on=Dependency([dep_job], allow_failure=True)) + job.save() + job = Job.fetch(job.id, connection=self.testconn) + self.assertTrue(job.allow_dependency_failures) + + jobs = Job.fetch_many([job.id], connection=self.testconn) + self.assertTrue(jobs[0].allow_dependency_failures) + + def test_job_dependency(self): + """Enqueue dependent jobs only when appropriate""" + q = Queue(connection=self.testconn) + w = SimpleWorker([q], connection=q.connection) + + # enqueue dependent job when parent successfully finishes + parent_job = q.enqueue(say_hello) + job = q.enqueue_call(say_hello, depends_on=parent_job) + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + q.empty() + + # don't enqueue dependent job when parent fails + parent_job = q.enqueue(div_by_zero) + job = q.enqueue_call(say_hello, depends_on=parent_job) + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertNotEqual(job.get_status(), JobStatus.FINISHED) + q.empty() + + # don't enqueue dependent job when Dependency.allow_failure=False (the default) + parent_job = q.enqueue(div_by_zero) + dependency = Dependency(jobs=parent_job) + job = q.enqueue_call(say_hello, depends_on=dependency) + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertNotEqual(job.get_status(), JobStatus.FINISHED) + + # enqueue dependent job when Dependency.allow_failure=True + parent_job = q.enqueue(div_by_zero) + dependency = Dependency(jobs=parent_job, allow_failure=True) + job = q.enqueue_call(say_hello, depends_on=dependency) + + job = Job.fetch(job.id, connection=self.testconn) + self.assertTrue(job.allow_dependency_failures) + + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + # When a failing job has multiple dependents, only enqueue those + # with allow_failure=True + parent_job = q.enqueue(div_by_zero) + job_allow_failure = q.enqueue(say_hello, + depends_on=Dependency(jobs=parent_job, allow_failure=True)) + job = q.enqueue(say_hello, + depends_on=Dependency(jobs=parent_job, allow_failure=False)) + w.work(burst=True, max_jobs=1) + self.assertEqual(parent_job.get_status(), JobStatus.FAILED) + self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + q.empty() + + # only enqueue dependent job when all dependencies have finished/failed + first_parent_job = q.enqueue(div_by_zero) + second_parent_job = q.enqueue(say_hello) + dependencies = Dependency(jobs=[first_parent_job, second_parent_job], allow_failure=True) + job = q.enqueue_call(say_hello, depends_on=dependencies) + w.work(burst=True, max_jobs=1) + self.assertEqual(first_parent_job.get_status(), JobStatus.FAILED) + self.assertEqual(second_parent_job.get_status(), JobStatus.QUEUED) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + + # When second job finishes, dependent job should be queued + w.work(burst=True, max_jobs=1) + self.assertEqual(second_parent_job.get_status(), JobStatus.FINISHED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.get_status(), JobStatus.FINISHED) |