summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2022-07-31 15:35:10 +0700
committerGitHub <noreply@github.com>2022-07-31 15:35:10 +0700
commit5b95725dc4410634a658bda9aa59188eaeaf87ea (patch)
tree33cdb7f60b5c5835c11b948fd1f6cb6a27af08e2
parentd82af1469f66ecfecd557aa7be1c00e419df0123 (diff)
downloadrq-5b95725dc4410634a658bda9aa59188eaeaf87ea.tar.gz
Dependency with failures (#1681)
* added Dependency class with allow_failures * Requested changes * Check type before setting `job.dependency_allow_fail` within `Job.create` * Set `job.dependency_allow_fail` within `Job.create` * Added test to ensure persistence of `dependency_allow_fail` * Removed typing and allow mixed list of ints and Job objects * Convert dependency_allow_fail boolean to integer during serialization to avoid redis DataError * Updated `test_multiple_dependencies_are_accepted_and_persisted` test to include `Dependency` cases * Adding placeholder test to test actual behavior of new `Dependency` usage in `depends_on` * Updated `test_job_dependency` to include cases using `Dependency` * Added dependency_allow_fail logic to `Job.restore` * Renamed `dependency_allow_fail` to a simpler `allow_failure` * Update docs to add section about the new `Dependency` class and use-case * Updated `Job.dependencies_are_met` logic to take `FAILED` and `STOPPED` jobs into account when `allow_failure=True` * Updated `test_job_dependency` test. Still failing with `Dependency` case. * Fix `allow_failure` type coercion in `Job.restore` * Re-arrange tests, so default `Dependency.allow_failure` is before explicit `allow_failure=True` * Fixed Dependency, so it works correctly when allow_failure=True * Attempt to execute pipeline prior to queueing a failed job's dependents. test_create_and_cancel_job_enqueue_dependents_in_registry test now passes. * Added `Depedency` test utilizing multiple dependencies * Removed irrelevant on_success and on_failure keyword arguments in example * Replaced use of long_running_job * Add test to verify `Dependency.jobs` contraints * Suppress connection error in handle_job_failure * test_dependencies have passed * All tests pass if enqueue_dependents called without pipeline.watch() * All tests now pass * Removed print statements * Cleanup Dependency implementation * Renamed job.allow_failure to job.allow_dependency_failures Co-authored-by: mattchan <mattchan@tencent.com> Co-authored-by: Mike Hill <mhilluniversal@gmail.com>
-rw-r--r--docs/docs/index.md26
-rw-r--r--rq/job.py78
-rw-r--r--rq/queue.py35
-rw-r--r--rq/worker.py18
-rw-r--r--tests/test_dependencies.py99
-rw-r--r--tests/test_job.py30
-rw-r--r--tests/test_worker.py20
7 files changed, 254 insertions, 52 deletions
diff --git a/docs/docs/index.md b/docs/docs/index.md
index e39f9b8..0748846 100644
--- a/docs/docs/index.md
+++ b/docs/docs/index.md
@@ -155,9 +155,33 @@ baz_job = queue.enqueue(baz, depends_on=[foo_job, bar_job])
```
The ability to handle job dependencies allows you to split a big job into
-several smaller ones. A job that is dependent on another is enqueued only when
+several smaller ones. By default, a job that is dependent on another is enqueued only when
its dependency finishes *successfully*.
+_New in 1.11.0._
+
+If you want a job's dependencies to execute regardless if the job completes or fails, RQ provides
+the `Dependency` class that will allow you to dictate how to handle job failures.
+
+The `Dependency(jobs=...)` parameter accepts:
+- a string representing a single job id
+- a Job object
+- an iteratable of job id strings and/or Job objects
+
+Example:
+
+```python
+from redis import Redis
+from rq.job import Dependency
+from rq import Queue
+
+queue = Queue(connection=Redis())
+job_1 = queue.enqueue(div_by_zero)
+dependency = Dependency(jobs=[job_1], allow_failure=True) # allow_failure defaults to False
+job_2 = queue.enqueue(say_hello, depends_on=dependency)
+# job_2 will execute even though its dependency (job_1) fails
+```
+
## Job Callbacks
_New in version 1.9.0._
diff --git a/rq/job.py b/rq/job.py
index 5ac40dd..fe8df0d 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -38,6 +38,22 @@ class JobStatus(str, Enum):
CANCELED = 'canceled'
+class Dependency:
+ def __init__(self, jobs, allow_failure: bool = False):
+ jobs = ensure_list(jobs)
+ if not all(
+ isinstance(job, Job) or isinstance(job, str)
+ for job in jobs
+ if job
+ ):
+ raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids")
+ elif len(jobs) < 1:
+ raise ValueError("jobs: cannot be empty.")
+
+ self.dependencies = jobs
+ self.allow_failure = allow_failure
+
+
# Sentinel value to mark that some of our lazily evaluated properties have not
# yet been evaluated.
UNEVALUATED = object()
@@ -134,8 +150,16 @@ class Job:
# dependency could be job instance or id, or iterable thereof
if depends_on is not None:
- job._dependency_ids = [dep.id if isinstance(dep, Job) else dep
- for dep in ensure_list(depends_on)]
+ if isinstance(depends_on, Dependency):
+ job.allow_dependency_failures = depends_on.allow_failure
+ depends_on_list = depends_on.dependencies
+ else:
+ depends_on_list = ensure_list(depends_on)
+ job._dependency_ids = [
+ dep.id if isinstance(dep, Job) else dep
+ for dep in depends_on_list
+ ]
+
return job
def get_position(self):
@@ -404,6 +428,7 @@ class Job:
self.retry_intervals = None
self.redis_server_version = None
self.last_heartbeat = None
+ self.allow_dependency_failures = None
def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@@ -560,7 +585,7 @@ class Job:
dep_id = obj.get('dependency_id') # for backwards compatibility
self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids
else [dep_id.decode()] if dep_id else [])
-
+ self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
@@ -640,6 +665,10 @@ class Job:
if self.ttl:
obj['ttl'] = self.ttl
+ if self.allow_dependency_failures is not None:
+ # convert boolean to integer to avoid redis.exception.DataError
+ obj["allow_dependency_failures"] = int(self.allow_dependency_failures)
+
return obj
def save(self, pipeline=None, include_meta=True):
@@ -685,12 +714,12 @@ class Job:
You can enqueue the jobs dependents optionally,
Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in.
"""
-
if self.is_canceled:
raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id()))
from .registry import CanceledJobRegistry
from .queue import Queue
pipe = pipeline or self.connection.pipeline()
+
while True:
try:
q = Queue(
@@ -699,6 +728,8 @@ class Job:
job_class=self.__class__,
serializer=self.serializer
)
+
+ self.set_status(JobStatus.CANCELED, pipeline=pipe)
if enqueue_dependents:
# Only WATCH if no pipeline passed, otherwise caller is responsible
if pipeline is None:
@@ -709,13 +740,11 @@ class Job:
remove_from_queue=True
)
- self.set_status(JobStatus.CANCELED, pipeline=pipe)
-
registry = CanceledJobRegistry(
self.origin,
self.connection,
job_class=self.__class__,
- serializer=self.serializer
+ serializer=self.serializer
)
registry.add(self, pipeline=pipe)
if pipeline is None:
@@ -726,7 +755,7 @@ class Job:
continue
else:
# if the pipeline comes from the caller, we re-raise the
- # exception as it it the responsibility of the caller to
+ # exception as it is the responsibility of the caller to
# handle it
raise
@@ -953,17 +982,16 @@ class Job:
return [Job.key_for(_id.decode())
for _id in dependencies]
- def dependencies_are_met(self, exclude_job_id=None, pipeline=None):
- """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_
+ def dependencies_are_met(self, parent_job=None, pipeline=None):
+ """Returns a boolean indicating if all of this job's dependencies are _FINISHED_
If a pipeline is passed, all dependencies are WATCHed.
- `exclude` allows us to exclude some job id from the status check. This is useful
- when enqueueing the dependents of a _successful_ job -- that status of
+ `parent_job` allows us to directly pass parent_job for the status check.
+ This is useful when enqueueing the dependents of a _successful_ job -- that status of
`FINISHED` may not be yet set in redis, but said job is indeed _done_ and this
- method is _called_ in the _stack_ of it's dependents are being enqueued.
+ method is _called_ in the _stack_ of its dependents are being enqueued.
"""
-
connection = pipeline if pipeline is not None else self.connection
if pipeline is not None:
@@ -973,8 +1001,19 @@ class Job:
dependencies_ids = {_id.decode()
for _id in connection.smembers(self.dependencies_key)}
- if exclude_job_id:
- dependencies_ids.discard(exclude_job_id)
+ if parent_job:
+ # If parent job is canceled, no need to check for status
+ # If parent job is not finished, we should only continue
+ # if this job allows parent job to fail
+ dependencies_ids.discard(parent_job.id)
+ if parent_job._status == JobStatus.CANCELED:
+ pass
+ elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures:
+ return False
+
+ # If the only dependency is parent job, dependency has been met
+ if not dependencies_ids:
+ return True
with connection.pipeline() as pipeline:
for key in dependencies_ids:
@@ -982,8 +1021,13 @@ class Job:
dependencies_statuses = pipeline.execute()
+ if self.allow_dependency_failures:
+ allowed_statuses = [JobStatus.FINISHED, JobStatus.FAILED]
+ else:
+ allowed_statuses = [JobStatus.FINISHED]
+
return all(
- status.decode() == JobStatus.FINISHED
+ status.decode() in allowed_statuses
for status
in dependencies_statuses
if status
diff --git a/rq/queue.py b/rq/queue.py
index 67c91a7..f7e6e3f 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -11,7 +11,7 @@ from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
-from .job import Job, JobStatus
+from .job import Job, JobStatus, Dependency
from .serializers import resolve_serializer
from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow
@@ -48,6 +48,7 @@ class Queue:
return cls.from_queue_key(as_text(queue_key),
connection=connection,
job_class=job_class, serializer=serializer)
+
return [to_queue(rq_key)
for rq_key in connection.smembers(cls.redis_queues_keys)
if rq_key]
@@ -324,6 +325,9 @@ class Queue:
job.retries_left = retry.max
job.retry_intervals = retry.intervals
+ if isinstance(depends_on, Dependency):
+ job.allow_dependency_failures = depends_on.allow_failure
+
return job
def setup_dependencies(
@@ -386,9 +390,8 @@ class Queue:
result_ttl=None, ttl=None, failure_ttl=None, description=None,
depends_on=None, job_id=None, at_front=False, meta=None,
retry=None, on_success=None, on_failure=None, pipeline=None):
- """Creates a job to represent the delayed function call and enqueues
- it.
-nd
+ """Creates a job to represent the delayed function call and enqueues it.
+
It is much like `.enqueue()`, except that it takes the function's args
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself.
@@ -611,14 +614,19 @@ nd
dependents_key = job.dependents_key
while True:
+
try:
# if a pipeline is passed, the caller is responsible for calling WATCH
# to ensure all jobs are enqueued
if pipeline is None:
pipe.watch(dependents_key)
- dependent_job_ids = [as_text(_id)
- for _id in pipe.smembers(dependents_key)]
+ dependent_job_ids = {as_text(_id)
+ for _id in pipe.smembers(dependents_key)}
+
+ # There's no dependents
+ if not dependent_job_ids:
+ break
jobs_to_enqueue = [
dependent_job for dependent_job
@@ -627,13 +635,16 @@ nd
connection=self.connection,
serializer=self.serializer
) if dependent_job and dependent_job.dependencies_are_met(
- exclude_job_id=job.id,
- pipeline=pipe
+ parent_job=job,
+ pipeline=pipe,
)
]
pipe.multi()
+ if not jobs_to_enqueue:
+ break
+
for dependent in jobs_to_enqueue:
registry = DeferredJobRegistry(dependent.origin,
self.connection,
@@ -646,11 +657,15 @@ nd
queue = self.__class__(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipe)
- pipe.delete(dependents_key)
+ # Only delete dependents_key if all dependents have been enqueued
+ if len(jobs_to_enqueue) == len(dependent_job_ids):
+ pipe.delete(dependents_key)
+ else:
+ enqueued_job_ids = [job.id for job in jobs_to_enqueue]
+ pipe.srem(dependents_key, *enqueued_job_ids)
if pipeline is None:
pipe.execute()
-
break
except WatchError:
if pipeline is None:
diff --git a/rq/worker.py b/rq/worker.py
index ff8f071..11a0acd 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -18,7 +18,7 @@ try:
from signal import SIGKILL
except ImportError:
from signal import SIGTERM as SIGKILL
-
+from contextlib import suppress
import redis.exceptions
from . import worker_registration
@@ -54,7 +54,6 @@ green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
-
logger = logging.getLogger(__name__)
@@ -226,7 +225,11 @@ class Worker:
)
self.ip_address = 'unknown'
else:
- self.ip_address = [client['addr'] for client in connection.client_list() if client['name'] == self.name][0]
+ self.ip_address = [
+ client['addr']
+ for client in connection.client_list()
+ if client['name'] == self.name
+ ][0]
else:
self.hostname = None
self.pid = None
@@ -971,6 +974,8 @@ class Worker:
job_class=self.job_class, serializer=job.serializer)
failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline)
+ with suppress(redis.exceptions.ConnectionError):
+ pipeline.execute()
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
@@ -981,9 +986,14 @@ class Worker:
if retry:
job.retry(queue, pipeline)
+ enqueue_dependents = False
+ else:
+ enqueue_dependents = True
try:
pipeline.execute()
+ if enqueue_dependents:
+ queue.enqueue_dependents(job)
except Exception:
# Ensure that custom exception handlers are called
# even if Redis is down
@@ -991,6 +1001,7 @@ class Worker:
def handle_job_success(self, job, queue, started_job_registry):
self.log.debug('Handling successful execution of job %s', job.id)
+
with self.connection.pipeline() as pipeline:
while True:
try:
@@ -1252,6 +1263,7 @@ class RoundRobinWorker(Worker):
"""
Modified version of Worker that dequeues jobs from the queues using a round-robin strategy.
"""
+
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1]
diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py
new file mode 100644
index 0000000..6d2f776
--- /dev/null
+++ b/tests/test_dependencies.py
@@ -0,0 +1,99 @@
+from tests import RQTestCase
+from tests.fixtures import div_by_zero, say_hello
+
+from rq import Queue, SimpleWorker
+from rq.job import Job, JobStatus, Dependency
+
+
+class TestDependencies(RQTestCase):
+
+ def test_allow_failure_is_persisted(self):
+ """Ensure that job.allow_dependency_failures is properly set
+ when providing Dependency object to depends_on."""
+ dep_job = Job.create(func=say_hello)
+
+ # default to False, maintaining current behavior
+ job = Job.create(func=say_hello, depends_on=Dependency([dep_job]))
+ job.save()
+ Job.fetch(job.id, connection=self.testconn)
+ self.assertFalse(job.allow_dependency_failures)
+
+ job = Job.create(func=say_hello, depends_on=Dependency([dep_job], allow_failure=True))
+ job.save()
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertTrue(job.allow_dependency_failures)
+
+ jobs = Job.fetch_many([job.id], connection=self.testconn)
+ self.assertTrue(jobs[0].allow_dependency_failures)
+
+ def test_job_dependency(self):
+ """Enqueue dependent jobs only when appropriate"""
+ q = Queue(connection=self.testconn)
+ w = SimpleWorker([q], connection=q.connection)
+
+ # enqueue dependent job when parent successfully finishes
+ parent_job = q.enqueue(say_hello)
+ job = q.enqueue_call(say_hello, depends_on=parent_job)
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+ q.empty()
+
+ # don't enqueue dependent job when parent fails
+ parent_job = q.enqueue(div_by_zero)
+ job = q.enqueue_call(say_hello, depends_on=parent_job)
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
+ q.empty()
+
+ # don't enqueue dependent job when Dependency.allow_failure=False (the default)
+ parent_job = q.enqueue(div_by_zero)
+ dependency = Dependency(jobs=parent_job)
+ job = q.enqueue_call(say_hello, depends_on=dependency)
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
+
+ # enqueue dependent job when Dependency.allow_failure=True
+ parent_job = q.enqueue(div_by_zero)
+ dependency = Dependency(jobs=parent_job, allow_failure=True)
+ job = q.enqueue_call(say_hello, depends_on=dependency)
+
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertTrue(job.allow_dependency_failures)
+
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+
+ # When a failing job has multiple dependents, only enqueue those
+ # with allow_failure=True
+ parent_job = q.enqueue(div_by_zero)
+ job_allow_failure = q.enqueue(say_hello,
+ depends_on=Dependency(jobs=parent_job, allow_failure=True))
+ job = q.enqueue(say_hello,
+ depends_on=Dependency(jobs=parent_job, allow_failure=False))
+ w.work(burst=True, max_jobs=1)
+ self.assertEqual(parent_job.get_status(), JobStatus.FAILED)
+ self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED)
+ self.assertEqual(job.get_status(), JobStatus.DEFERRED)
+ q.empty()
+
+ # only enqueue dependent job when all dependencies have finished/failed
+ first_parent_job = q.enqueue(div_by_zero)
+ second_parent_job = q.enqueue(say_hello)
+ dependencies = Dependency(jobs=[first_parent_job, second_parent_job], allow_failure=True)
+ job = q.enqueue_call(say_hello, depends_on=dependencies)
+ w.work(burst=True, max_jobs=1)
+ self.assertEqual(first_parent_job.get_status(), JobStatus.FAILED)
+ self.assertEqual(second_parent_job.get_status(), JobStatus.QUEUED)
+ self.assertEqual(job.get_status(), JobStatus.DEFERRED)
+
+ # When second job finishes, dependent job should be queued
+ w.work(burst=True, max_jobs=1)
+ self.assertEqual(second_parent_job.get_status(), JobStatus.FINISHED)
+ self.assertEqual(job.get_status(), JobStatus.QUEUED)
+ w.work(burst=True)
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
diff --git a/tests/test_job.py b/tests/test_job.py
index 7dccda3..6106afb 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -9,7 +9,7 @@ from redis import WatchError
from rq.compat import as_text
from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
-from rq.job import Job, JobStatus, cancel_job, get_current_job
+from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job
from rq.queue import Queue
from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry,
@@ -434,6 +434,14 @@ class TestJob(RQTestCase):
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
+ def test_dependency_parameter_constraints(self):
+ """Ensures the proper constraints are in place for values passed in as job references."""
+ dep_job = Job.create(func=fixtures.say_hello)
+ # raise error on empty jobs
+ self.assertRaises(ValueError, Dependency, jobs=[])
+ # raise error on non-str/Job value in jobs iterable
+ self.assertRaises(ValueError, Dependency, jobs=[dep_job, 1])
+
def test_multiple_dependencies_are_accepted_and_persisted(self):
"""Ensure job._dependency_ids accepts different input formats, and
is set and restored properly"""
@@ -456,6 +464,14 @@ class TestJob(RQTestCase):
[("A", "B"), ["A", "B"]],
[(job_A, job_B), ["A", "B"]],
[(job_A, "B"), ["A", "B"]],
+ [Dependency("A"), ["A"]],
+ [Dependency(job_A), ["A"]],
+ [Dependency(["A", "B"]), ["A", "B"]],
+ [Dependency([job_A, job_B]), ["A", "B"]],
+ [Dependency(["A", job_B]), ["A", "B"]],
+ [Dependency(("A", "B")), ["A", "B"]],
+ [Dependency((job_A, job_B)), ["A", "B"]],
+ [Dependency((job_A, "B")), ["A", "B"]],
]
for given, expected in cases:
job = Job.create(func=fixtures.say_hello, depends_on=given)
@@ -865,19 +881,27 @@ class TestJob(RQTestCase):
queue = Queue(connection=self.testconn)
dependency = queue.enqueue(fixtures.raise_exc)
dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency)
+ print('# Post enqueue', self.testconn.smembers(dependency.dependents_key))
+ self.assertTrue(dependency.dependent_ids)
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(1, len(queue.deferred_job_registry))
w = Worker([queue])
w.work(burst=True, max_jobs=1)
+ self.assertTrue(dependency.dependent_ids)
+ print('# Post work', self.testconn.smembers(dependency.dependents_key))
dependency.refresh()
dependent.refresh()
self.assertEqual(0, len(queue.get_jobs()))
self.assertEqual(1, len(queue.deferred_job_registry))
self.assertEqual(1, len(queue.failed_job_registry))
+
+ print('# Pre cancel', self.testconn.smembers(dependency.dependents_key))
cancel_job(dependency.id, enqueue_dependents=True)
dependency.refresh()
dependent.refresh()
+ print('#Post cancel', self.testconn.smembers(dependency.dependents_key))
+
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(0, len(queue.deferred_job_registry))
self.assertEqual(0, len(queue.failed_job_registry))
@@ -1119,14 +1143,14 @@ class TestJob(RQTestCase):
def test_dependencies_are_met_at_execution_time(self):
queue = Queue(connection=self.testconn)
-
+ queue.empty()
queue.enqueue(fixtures.say_hello, job_id="A")
queue.enqueue(fixtures.say_hello, job_id="B")
job_C = queue.enqueue(fixtures.check_dependencies_are_met, job_id="C", depends_on=["A", "B"])
+ job_C.dependencies_are_met()
w = Worker([queue])
w.work(burst=True)
-
assert job_C.result
def test_execution_order_with_sole_dependency(self):
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 7bc8fc0..936658e 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -29,7 +29,7 @@ from tests.fixtures import (
from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.compat import as_text, PY2
-from rq.job import Job, JobStatus, Retry
+from rq.job import Job, JobStatus, Dependency, Retry
from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.suspension import resume, suspend
from rq.utils import utcnow
@@ -468,7 +468,7 @@ class TestWorker(RQTestCase):
worker = Worker([queue])
- # If job if configured to retry, it will be put back in the queue
+ # If job is configured to retry, it will be put back in the queue
# and not put in the FailedJobRegistry.
# This is the original execution
queue.empty()
@@ -695,22 +695,6 @@ class TestWorker(RQTestCase):
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True)
- def test_job_dependency(self):
- """Enqueue dependent jobs only if their parents don't fail"""
- q = Queue()
- w = Worker([q])
- parent_job = q.enqueue(say_hello, result_ttl=0)
- job = q.enqueue_call(say_hello, depends_on=parent_job)
- w.work(burst=True)
- job = Job.fetch(job.id)
- self.assertEqual(job.get_status(), JobStatus.FINISHED)
-
- parent_job = q.enqueue(div_by_zero)
- job = q.enqueue_call(say_hello, depends_on=parent_job)
- w.work(burst=True)
- job = Job.fetch(job.id)
- self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
-
def test_get_current_job(self):
"""Ensure worker.get_current_job() works properly"""
q = Queue()