summaryrefslogtreecommitdiff
path: root/rq/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'rq/queue.py')
-rw-r--r--rq/queue.py35
1 files changed, 25 insertions, 10 deletions
diff --git a/rq/queue.py b/rq/queue.py
index 67c91a7..f7e6e3f 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -11,7 +11,7 @@ from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
-from .job import Job, JobStatus
+from .job import Job, JobStatus, Dependency
from .serializers import resolve_serializer
from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow
@@ -48,6 +48,7 @@ class Queue:
return cls.from_queue_key(as_text(queue_key),
connection=connection,
job_class=job_class, serializer=serializer)
+
return [to_queue(rq_key)
for rq_key in connection.smembers(cls.redis_queues_keys)
if rq_key]
@@ -324,6 +325,9 @@ class Queue:
job.retries_left = retry.max
job.retry_intervals = retry.intervals
+ if isinstance(depends_on, Dependency):
+ job.allow_dependency_failures = depends_on.allow_failure
+
return job
def setup_dependencies(
@@ -386,9 +390,8 @@ class Queue:
result_ttl=None, ttl=None, failure_ttl=None, description=None,
depends_on=None, job_id=None, at_front=False, meta=None,
retry=None, on_success=None, on_failure=None, pipeline=None):
- """Creates a job to represent the delayed function call and enqueues
- it.
-nd
+ """Creates a job to represent the delayed function call and enqueues it.
+
It is much like `.enqueue()`, except that it takes the function's args
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself.
@@ -611,14 +614,19 @@ nd
dependents_key = job.dependents_key
while True:
+
try:
# if a pipeline is passed, the caller is responsible for calling WATCH
# to ensure all jobs are enqueued
if pipeline is None:
pipe.watch(dependents_key)
- dependent_job_ids = [as_text(_id)
- for _id in pipe.smembers(dependents_key)]
+ dependent_job_ids = {as_text(_id)
+ for _id in pipe.smembers(dependents_key)}
+
+ # There's no dependents
+ if not dependent_job_ids:
+ break
jobs_to_enqueue = [
dependent_job for dependent_job
@@ -627,13 +635,16 @@ nd
connection=self.connection,
serializer=self.serializer
) if dependent_job and dependent_job.dependencies_are_met(
- exclude_job_id=job.id,
- pipeline=pipe
+ parent_job=job,
+ pipeline=pipe,
)
]
pipe.multi()
+ if not jobs_to_enqueue:
+ break
+
for dependent in jobs_to_enqueue:
registry = DeferredJobRegistry(dependent.origin,
self.connection,
@@ -646,11 +657,15 @@ nd
queue = self.__class__(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipe)
- pipe.delete(dependents_key)
+ # Only delete dependents_key if all dependents have been enqueued
+ if len(jobs_to_enqueue) == len(dependent_job_ids):
+ pipe.delete(dependents_key)
+ else:
+ enqueued_job_ids = [job.id for job in jobs_to_enqueue]
+ pipe.srem(dependents_key, *enqueued_job_ids)
if pipeline is None:
pipe.execute()
-
break
except WatchError:
if pipeline is None: