summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorthomas <thomas.matecki@gmail.com>2020-03-12 22:31:42 -0400
committerThomas Matecki <thomas@marianatek.com>2020-04-16 23:39:44 -0400
commit83fa6b23868e9e8dd7776768489f8521ab5fa021 (patch)
treeec420ee5cf4845ea6be1ed7194498c9e326aafa5
parent9f15df2d5567d6697e4f7bddb68400f6f9d845c5 (diff)
downloadrq-83fa6b23868e9e8dd7776768489f8521ab5fa021.tar.gz
Revert move of status update in `Worker#handle_job_success`
When a job with dependents is _successful_ it's dependents are enqueued. Only if the FINISHing job's `result_ttl` is non-zero is the change in status persisted in Redis - that is, when each dependent job is enqueued, the _FINISHing_ job (,triggering the enqueueing,) has an _outdated_ status in redis. This avoids redundant call because if `result_ttl=0` then the job is deleted then deleted in `Job#cleanup`. In order to enqueue the dependents, we therefore _exclude_ the FINISHing job from the check if each dependents' dependencies have been met.
-rw-r--r--rq/job.py52
-rw-r--r--rq/queue.py32
-rw-r--r--rq/worker.py3
3 files changed, 62 insertions, 25 deletions
diff --git a/rq/job.py b/rq/job.py
index cf8948e..8eaa7a3 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -5,10 +5,10 @@ from __future__ import (absolute_import, division, print_function,
import inspect
import warnings
import zlib
+from functools import partial
from uuid import uuid4
from rq.compat import as_text, decode_redis_hash, string_types, text_type
-
from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .local import LocalStack
@@ -16,6 +16,15 @@ from .utils import (enum, import_attribute, parse_timeout, str_to_date,
utcformat, utcnow)
from .serializers import resolve_serializer
+try:
+ import cPickle as pickle
+except ImportError: # noqa # pragma: no cover
+ import pickle
+
+# Serialize pickle dumps using the highest pickle protocol (binary, default
+# uses ascii)
+dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
+loads = pickle.loads
JobStatus = enum(
'JobStatus',
@@ -94,11 +103,13 @@ class Job(object):
job._func_name = '{0}.{1}'.format(func.__module__, func.__name__)
elif isinstance(func, string_types):
job._func_name = as_text(func)
- elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance
+ elif not inspect.isclass(func) and hasattr(func,
+ '__call__'): # a callable class instance
job._instance = func
job._func_name = '__call__'
else:
- raise TypeError('Expected a callable or a string, but got: {0}'.format(func))
+ raise TypeError(
+ 'Expected a callable or a string, but got: {0}'.format(func))
job._args = args
job._kwargs = kwargs
@@ -113,7 +124,8 @@ class Job(object):
# dependency could be job instance or id
if depends_on is not None:
- job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on]
+ job._dependency_ids = [
+ depends_on.id if isinstance(depends_on, Job) else depends_on]
return job
def get_status(self, refresh=True):
@@ -401,7 +413,8 @@ class Job(object):
for i, job in enumerate(jobs):
if not job:
- raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i]))
+ raise NoSuchJobError(
+ 'Dependency {0} does not exist'.format(self._dependency_ids[i]))
return jobs
@@ -459,8 +472,10 @@ class Job(object):
except Exception as e:
self._result = "Unserializable return value"
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
- self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
- self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
+ self.result_ttl = int(obj.get('result_ttl')) if obj.get(
+ 'result_ttl') else None # noqa
+ self.failure_ttl = int(obj.get('failure_ttl')) if obj.get(
+ 'failure_ttl') else None # noqa
self._status = as_text(obj.get('status')) if obj.get('status') else None
dependency_id = obj.get('dependency_id', None)
@@ -725,19 +740,29 @@ class Job(object):
connection.sadd(self.dependencies_key, dependency_id)
def dependencies_are_met(
- self,
- pipeline=None
+ self,
+ pipeline=None,
+ exclude=None
+
):
"""Returns a boolean indicating if all of this jobs dependencies are _FINISHED_
If a pipeline is passed, all dependencies are WATCHed.
+
+ `exclude` allows us to exclude some job id from the status check. This is useful
+ when enqueueing the dependents of a _successful_ job -- that status of
+ `FINISHED` may not be yet set in redis, but said job is indeed _done_ and this
+ method is _called_ in the _stack_ of it's dependents are being enqueued.
"""
+ exclude = exclude or []
pipe = pipeline if pipeline is not None else self.connection
+ dependencies = self.connection.smembers(self.dependencies_key)
+
if pipeline is not None:
pipe.watch(*[Job.key_for(as_text(_id))
- for _id in self.connection.smembers(self.dependencies_key)])
+ for _id in dependencies])
sort_by = self.redis_job_namespace_prefix + '*->ended_at'
get_field = self.redis_job_namespace_prefix + '*->status'
@@ -751,11 +776,14 @@ class Job(object):
dependencies_statuses = [
(as_text(_id), as_text(status))
for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by,
- get=['#', get_field], alpha=True, groups=True, )
+ get=['#', get_field], alpha=True,
+ groups=True, )
]
return all(status == JobStatus.FINISHED
for job_id, status
- in dependencies_statuses)
+ in dependencies_statuses
+ if job_id not in exclude)
+
_job_stack = LocalStack()
diff --git a/rq/queue.py b/rq/queue.py
index 4fe1b44..6dba72f 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function,
import uuid
import warnings
-
from datetime import datetime
from redis import WatchError
@@ -66,7 +65,8 @@ class Queue(object):
if 'async' in kwargs:
self._is_async = kwargs['async']
- warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning)
+ warnings.warn('The `async` keyword is deprecated. Use `is_async` instead',
+ DeprecationWarning)
# override class attribute job_class if one was passed
if job_class is not None:
@@ -317,7 +317,8 @@ class Queue(object):
pipe.multi()
for dependency in dependencies:
- if dependency.get_status(refresh=False) != JobStatus.FINISHED:
+ if dependency.get_status(
+ refresh=False) != JobStatus.FINISHED:
job.set_status(JobStatus.DEFERRED, pipeline=pipe)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
@@ -379,8 +380,9 @@ class Queue(object):
"""Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
-
+ depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args,
+ **kwargs)
+
return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
@@ -393,7 +395,8 @@ class Queue(object):
from .registry import ScheduledJobRegistry
(f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
+ depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args,
+ **kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description,
@@ -465,9 +468,14 @@ class Queue(object):
for _id in pipe.smembers(dependents_key)]
dependent_jobs = [
- job for job in self.job_class.fetch_many(dependent_job_ids,
- connection=self.connection)
- if job.dependencies_are_met(pipeline=pipe)
+ dependent_job for dependent_job
+ in self.job_class.fetch_many(
+ dependent_job_ids,
+ connection=self.connection
+ ) if dependent_job.dependencies_are_met(
+ pipeline=pipe,
+ exclude={job.id}
+ )
]
pipe.multi()
@@ -480,7 +488,8 @@ class Queue(object):
if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipe)
else:
- queue = self.__class__(name=dependent.origin, connection=self.connection)
+ queue = self.__class__(name=dependent.origin,
+ connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipe)
pipe.delete(dependents_key)
@@ -519,7 +528,8 @@ class Queue(object):
connection = resolve_connection(connection)
if timeout is not None: # blocking variant
if timeout == 0:
- raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
+ raise ValueError(
+ 'RQ does not support indefinite timeouts. Please pick a timeout value > 0')
result = connection.blpop(queue_keys, timeout)
if result is None:
raise DequeueTimeout(timeout, queue_keys)
diff --git a/rq/worker.py b/rq/worker.py
index 5161a24..7e090f5 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -845,8 +845,6 @@ class Worker(object):
# if dependencies are inserted after enqueue_dependents
# a WatchError is thrown by execute()
pipeline.watch(job.dependents_key)
- # TODO: This was moved
- job.set_status(JobStatus.FINISHED, pipeline=pipeline)
# enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline)
@@ -858,6 +856,7 @@ class Worker(object):
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
+ job.set_status(JobStatus.FINISHED, pipeline=pipeline)
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)