diff options
Diffstat (limited to 'rq/queue.py')
-rw-r--r-- | rq/queue.py | 35 |
1 files changed, 25 insertions, 10 deletions
diff --git a/rq/queue.py b/rq/queue.py index 67c91a7..f7e6e3f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -11,7 +11,7 @@ from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError -from .job import Job, JobStatus +from .job import Job, JobStatus, Dependency from .serializers import resolve_serializer from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow @@ -48,6 +48,7 @@ class Queue: return cls.from_queue_key(as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer) + return [to_queue(rq_key) for rq_key in connection.smembers(cls.redis_queues_keys) if rq_key] @@ -324,6 +325,9 @@ class Queue: job.retries_left = retry.max job.retry_intervals = retry.intervals + if isinstance(depends_on, Dependency): + job.allow_dependency_failures = depends_on.allow_failure + return job def setup_dependencies( @@ -386,9 +390,8 @@ class Queue: result_ttl=None, ttl=None, failure_ttl=None, description=None, depends_on=None, job_id=None, at_front=False, meta=None, retry=None, on_success=None, on_failure=None, pipeline=None): - """Creates a job to represent the delayed function call and enqueues - it. -nd + """Creates a job to represent the delayed function call and enqueues it. + It is much like `.enqueue()`, except that it takes the function's args and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. @@ -611,14 +614,19 @@ nd dependents_key = job.dependents_key while True: + try: # if a pipeline is passed, the caller is responsible for calling WATCH # to ensure all jobs are enqueued if pipeline is None: pipe.watch(dependents_key) - dependent_job_ids = [as_text(_id) - for _id in pipe.smembers(dependents_key)] + dependent_job_ids = {as_text(_id) + for _id in pipe.smembers(dependents_key)} + + # There's no dependents + if not dependent_job_ids: + break jobs_to_enqueue = [ dependent_job for dependent_job @@ -627,13 +635,16 @@ nd connection=self.connection, serializer=self.serializer ) if dependent_job and dependent_job.dependencies_are_met( - exclude_job_id=job.id, - pipeline=pipe + parent_job=job, + pipeline=pipe, ) ] pipe.multi() + if not jobs_to_enqueue: + break + for dependent in jobs_to_enqueue: registry = DeferredJobRegistry(dependent.origin, self.connection, @@ -646,11 +657,15 @@ nd queue = self.__class__(name=dependent.origin, connection=self.connection) queue.enqueue_job(dependent, pipeline=pipe) - pipe.delete(dependents_key) + # Only delete dependents_key if all dependents have been enqueued + if len(jobs_to_enqueue) == len(dependent_job_ids): + pipe.delete(dependents_key) + else: + enqueued_job_ids = [job.id for job in jobs_to_enqueue] + pipe.srem(dependents_key, *enqueued_job_ids) if pipeline is None: pipe.execute() - break except WatchError: if pipeline is None: |