summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorthomas <thomas.matecki@gmail.com>2020-03-14 12:22:46 -0400
committerThomas Matecki <thomas@marianatek.com>2020-04-16 23:40:27 -0400
commitc0119a8a19c84add14a38bdfec9ed1f86aff1f9e (patch)
tree99b25dd56c6c855258130b6ed39c097745ec69ea
parent01ebe25f5626f2b433932683fefbc00afd81cf5e (diff)
downloadrq-c0119a8a19c84add14a38bdfec9ed1f86aff1f9e.tar.gz
Undo formatting for coverage stats
-rw-r--r--rq/job.py21
-rw-r--r--rq/queue.py20
2 files changed, 15 insertions, 26 deletions
diff --git a/rq/job.py b/rq/job.py
index 3cad5ab..6ae0633 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -21,6 +21,7 @@ try:
except ImportError: # noqa # pragma: no cover
import pickle
+
# Serialize pickle dumps using the highest pickle protocol (binary, default
# uses ascii)
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
@@ -103,13 +104,11 @@ class Job(object):
job._func_name = '{0}.{1}'.format(func.__module__, func.__name__)
elif isinstance(func, string_types):
job._func_name = as_text(func)
- elif not inspect.isclass(func) and hasattr(func,
- '__call__'): # a callable class instance
+ elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance
job._instance = func
job._func_name = '__call__'
else:
- raise TypeError(
- 'Expected a callable or a string, but got: {0}'.format(func))
+ raise TypeError('Expected a callable or a string, but got: {0}'.format(func))
job._args = args
job._kwargs = kwargs
@@ -124,8 +123,7 @@ class Job(object):
# dependency could be job instance or id
if depends_on is not None:
- job._dependency_ids = [
- depends_on.id if isinstance(depends_on, Job) else depends_on]
+ job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on]
return job
def get_status(self, refresh=True):
@@ -411,8 +409,8 @@ class Job(object):
if watch and self._dependency_ids:
connection.watch(*self._dependency_ids)
- jobs = [job for
- job in self.fetch_many(self._dependency_ids, connection=self.connection)
+ jobs = [job
+ for job in self.fetch_many(self._dependency_ids, connection=self.connection)
if job]
return jobs
@@ -471,10 +469,8 @@ class Job(object):
except Exception as e:
self._result = "Unserializable return value"
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
- self.result_ttl = int(obj.get('result_ttl')) if obj.get(
- 'result_ttl') else None # noqa
- self.failure_ttl = int(obj.get('failure_ttl')) if obj.get(
- 'failure_ttl') else None # noqa
+ self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
+ self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = as_text(obj.get('status')) if obj.get('status') else None
dependency_id = obj.get('dependency_id', None)
@@ -792,5 +788,4 @@ class Job(object):
in dependencies_statuses
if dependency_id not in exclude)
-
_job_stack = LocalStack()
diff --git a/rq/queue.py b/rq/queue.py
index 6dba72f..6a0ed05 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -13,8 +13,8 @@ from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus
-from .utils import backend_class, import_attribute, parse_timeout, utcnow
from .serializers import resolve_serializer
+from .utils import backend_class, import_attribute, parse_timeout, utcnow
def compact(lst):
@@ -65,8 +65,7 @@ class Queue(object):
if 'async' in kwargs:
self._is_async = kwargs['async']
- warnings.warn('The `async` keyword is deprecated. Use `is_async` instead',
- DeprecationWarning)
+ warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning)
# override class attribute job_class if one was passed
if job_class is not None:
@@ -317,8 +316,7 @@ class Queue(object):
pipe.multi()
for dependency in dependencies:
- if dependency.get_status(
- refresh=False) != JobStatus.FINISHED:
+ if dependency.get_status(refresh=False) != JobStatus.FINISHED:
job.set_status(JobStatus.DEFERRED, pipeline=pipe)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
@@ -380,8 +378,7 @@ class Queue(object):
"""Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args,
- **kwargs)
+ depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout,
@@ -395,8 +392,7 @@ class Queue(object):
from .registry import ScheduledJobRegistry
(f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args,
- **kwargs)
+ depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description,
@@ -488,8 +484,7 @@ class Queue(object):
if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipe)
else:
- queue = self.__class__(name=dependent.origin,
- connection=self.connection)
+ queue = self.__class__(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipe)
pipe.delete(dependents_key)
@@ -528,8 +523,7 @@ class Queue(object):
connection = resolve_connection(connection)
if timeout is not None: # blocking variant
if timeout == 0:
- raise ValueError(
- 'RQ does not support indefinite timeouts. Please pick a timeout value > 0')
+ raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
result = connection.blpop(queue_keys, timeout)
if result is None:
raise DequeueTimeout(timeout, queue_keys)