summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-05-23 19:32:38 +0700
committerSelwin Ong <selwin.ong@gmail.com>2020-05-23 19:32:38 +0700
commit21bf5890c0410f2d51f5f95eea60adfb092c61d8 (patch)
tree633645cd414bc4d864a4796db4d9cb373ae4e3e1
parent33e4beacf4118e6745543504b737d6db780e3e5d (diff)
parent52e426f40ffee046f06269fc2e7593707230e9f6 (diff)
downloadrq-21bf5890c0410f2d51f5f95eea60adfb092c61d8.tar.gz
Merge remote-tracking branch 'origin/master' into multi-dependencies
-rw-r--r--.deepsource.toml12
-rw-r--r--CHANGES.md12
-rw-r--r--MANIFEST.in1
-rw-r--r--docs/docs/scheduling.md2
-rw-r--r--rq/cli/helpers.py2
-rw-r--r--rq/compat/__init__.py11
-rw-r--r--rq/decorators.py3
-rw-r--r--rq/job.py9
-rw-r--r--rq/queue.py10
-rw-r--r--rq/scheduler.py4
-rw-r--r--rq/serializers.py8
-rw-r--r--rq/version.py2
-rw-r--r--rq/worker.py21
-rw-r--r--setup.py4
-rw-r--r--tests/test_serializers.py14
15 files changed, 81 insertions, 34 deletions
diff --git a/.deepsource.toml b/.deepsource.toml
new file mode 100644
index 0000000..def09e2
--- /dev/null
+++ b/.deepsource.toml
@@ -0,0 +1,12 @@
+version = 1
+
+test_patterns = ["tests/**"]
+
+exclude_patterns = ["examples/**"]
+
+[[analyzers]]
+name = "python"
+enabled = true
+
+ [analyzers.meta]
+ runtime_version = "3.x.x" \ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index 5b71d17..3204f99 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,15 @@
+### RQ 1.4.1 (2020-05-16)
+* Default serializer now uses `pickle.HIGHEST_PROTOCOL` for backward compatibility reasons. Thanks @bbayles!
+* Avoid deprecation warnings on redis-py >= 3.5.0. Thanks @bbayles!
+
+### RQ 1.4.0 (2020-05-13)
+* Custom serializer is now supported. Thanks @solababs!
+* `delay()` now accepts `job_id` argument. Thanks @grayshirt!
+* Fixed a bug that may cause early termination of scheduled or requeued jobs. Thanks @rmartin48!
+* When a job is scheduled, always add queue name to a set containing active RQ queue names. Thanks @mdawar!
+* Added `--sentry-ca-certs` and `--sentry-debug` parameters to `rq worker` CLI. Thanks @kichawa!
+* Jobs cleaned up by `StartedJobRegistry` are given an exception info. Thanks @selwin!
+
### RQ 1.3.0 (2020-03-09)
* Support for infinite job timeout. Thanks @theY4Kman!
* Added `__main__` file so you can now do `python -m rq.cli`. Thanks @bbayles!
diff --git a/MANIFEST.in b/MANIFEST.in
index 537813f..bba9d9a 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1,2 +1,3 @@
include LICENSE
+include *.toml
recursive-exclude tests *
diff --git a/docs/docs/scheduling.md b/docs/docs/scheduling.md
index 11fb7f1..7ecff03 100644
--- a/docs/docs/scheduling.md
+++ b/docs/docs/scheduling.md
@@ -53,7 +53,7 @@ from somewhere import say_hello
queue = Queue(name='default', connection=Redis())
# Schedules job to be run in 10 seconds
-job = queue.enqueue_at(timedelta(seconds=10), say_hello)
+job = queue.enqueue_in(timedelta(seconds=10), say_hello)
```
Jobs that are scheduled for execution are not placed in the queue, but they are
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index d27fbb0..8d3357c 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -143,7 +143,7 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
queue_dict[queue] = worker_class.all(queue=queue)
if queue_dict:
- max_length = max([len(q.name) for q, in queue_dict.keys()])
+ max_length = max(len(q.name) for q, in queue_dict.keys())
else:
max_length = 0
diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py
index 3f8b3aa..d62c89c 100644
--- a/rq/compat/__init__.py
+++ b/rq/compat/__init__.py
@@ -103,4 +103,13 @@ except ImportError:
def dst(self, dt):
return timedelta(0)
- utc = UTC() \ No newline at end of file
+ utc = UTC()
+
+
+def hmset(pipe_or_connection, name, mapping):
+ # redis-py versions 3.5.0 and above accept a mapping parameter for hset
+ try:
+ return pipe_or_connection.hset(name, mapping=mapping)
+ # earlier versions require hmset to be used
+ except TypeError:
+ return pipe_or_connection.hmset(name, mapping)
diff --git a/rq/decorators.py b/rq/decorators.py
index a84289f..e8c1f37 100644
--- a/rq/decorators.py
+++ b/rq/decorators.py
@@ -51,6 +51,7 @@ class job(object): # noqa
queue = self.queue
depends_on = kwargs.pop('depends_on', None)
+ job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False)
if not depends_on:
@@ -61,7 +62,7 @@ class job(object): # noqa
return queue.enqueue_call(f, args=args, kwargs=kwargs,
timeout=self.timeout, result_ttl=self.result_ttl,
- ttl=self.ttl, depends_on=depends_on, at_front=at_front,
+ ttl=self.ttl, depends_on=depends_on, job_id=job_id, at_front=at_front,
meta=self.meta, description=self.description, failure_ttl=self.failure_ttl)
f.delay = delay
return f
diff --git a/rq/job.py b/rq/job.py
index f0f28ec..c7a67d1 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -4,12 +4,13 @@ from __future__ import (absolute_import, division, print_function,
import inspect
import warnings
+import zlib
+
from functools import partial
from uuid import uuid4
-import zlib
-
-from rq.compat import as_text, decode_redis_hash, string_types, text_type
+from rq.compat import (as_text, decode_redis_hash, hmset, string_types,
+ text_type)
from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .local import LocalStack
@@ -557,7 +558,7 @@ class Job(object):
key = self.key
connection = pipeline if pipeline is not None else self.connection
- connection.hmset(key, self.to_dict(include_meta=include_meta))
+ hmset(connection, key, self.to_dict(include_meta=include_meta))
def save_meta(self):
"""Stores job meta from the job instance to the corresponding Redis key."""
diff --git a/rq/queue.py b/rq/queue.py
index bfaa0fa..0b7afd7 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -191,25 +191,25 @@ class Queue(object):
@property
def started_job_registry(self):
- """Returns this queue's FailedJobRegistry."""
+ """Returns this queue's StartedJobRegistry."""
from rq.registry import StartedJobRegistry
return StartedJobRegistry(queue=self, job_class=self.job_class)
@property
def finished_job_registry(self):
- """Returns this queue's FailedJobRegistry."""
+ """Returns this queue's FinishedJobRegistry."""
from rq.registry import FinishedJobRegistry
return FinishedJobRegistry(queue=self)
@property
def deferred_job_registry(self):
- """Returns this queue's FailedJobRegistry."""
+ """Returns this queue's DeferredJobRegistry."""
from rq.registry import DeferredJobRegistry
return DeferredJobRegistry(queue=self, job_class=self.job_class)
@property
def scheduled_job_registry(self):
- """Returns this queue's FailedJobRegistry."""
+ """Returns this queue's ScheduledJobRegistry."""
from rq.registry import ScheduledJobRegistry
return ScheduledJobRegistry(queue=self, job_class=self.job_class)
@@ -400,6 +400,8 @@ nd
registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline:
+ # Add Queue key set
+ pipeline.sadd(self.redis_queues_keys, self.key)
job.save(pipeline=pipeline)
registry.schedule(job, datetime, pipeline=pipeline)
pipeline.execute()
diff --git a/rq/scheduler.py b/rq/scheduler.py
index f269ea9..cc1b999 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -40,7 +40,7 @@ class RQScheduler(object):
def __init__(self, queues, connection, interval=1):
self._queue_names = set(parse_names(queues))
- self._acquired_locks = set([])
+ self._acquired_locks = set()
self._scheduled_job_registries = []
self.lock_acquisition_time = None
self.connection = connection
@@ -68,7 +68,7 @@ class RQScheduler(object):
def acquire_locks(self, auto_start=False):
"""Returns names of queue it successfully acquires lock on"""
- successful_locks = set([])
+ successful_locks = set()
pid = os.getpid()
logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
for name in self._queue_names:
diff --git a/rq/serializers.py b/rq/serializers.py
index c4b0e54..27f892f 100644
--- a/rq/serializers.py
+++ b/rq/serializers.py
@@ -1,9 +1,15 @@
+from functools import partial
import pickle
from .compat import string_types
from .utils import import_attribute
+class DefaultSerializer:
+ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
+ loads = pickle.loads
+
+
def resolve_serializer(serializer):
"""This function checks the user defined serializer for ('dumps', 'loads') methods
It returns a default pickle serializer if not found else it returns a MySerializer
@@ -11,7 +17,7 @@ def resolve_serializer(serializer):
Also accepts a string path to serializer that will be loaded as the serializer
"""
if not serializer:
- return pickle
+ return DefaultSerializer
if isinstance(serializer, string_types):
serializer = import_attribute(serializer)
diff --git a/rq/version.py b/rq/version.py
index f75766e..4b20c03 100644
--- a/rq/version.py
+++ b/rq/version.py
@@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
-VERSION = '1.3.0'
+VERSION = '1.4.1'
diff --git a/rq/worker.py b/rq/worker.py
index 7e090f5..91ffc23 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -23,7 +23,7 @@ except ImportError:
from redis import WatchError
from . import worker_registration
-from .compat import PY2, as_text, string_types, text_type
+from .compat import PY2, as_text, hmset, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (DEFAULT_RESULT_TTL,
@@ -268,7 +268,7 @@ class Worker(object):
now = utcnow()
now_in_string = utcformat(now)
self.birth_date = now
- p.hmset(key, {
+ hmset(p, key, mapping={
'birth': now_in_string,
'last_heartbeat': now_in_string,
'queues': queues,
@@ -680,7 +680,7 @@ class Worker(object):
"""
ret_val = None
- job.started_at = job.started_at or utcnow()
+ job.started_at = utcnow()
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
@@ -689,7 +689,7 @@ class Worker(object):
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
- self.heartbeat(self.job_monitoring_interval + 5)
+ self.heartbeat(self.job_monitoring_interval + 30)
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 1):
@@ -747,15 +747,10 @@ class Worker(object):
# that are different from the worker.
random.seed()
- try:
- self.setup_work_horse_signals()
- self._is_horse = True
- self.log = logger
- self.perform_job(job, queue)
- except Exception as e: # noqa
- # Horse does not terminate properly
- raise e
- os._exit(1)
+ self.setup_work_horse_signals()
+ self._is_horse = True
+ self.log = logger
+ self.perform_job(job, queue)
# os._exit() is the way to exit from childs after a fork(), in
# contrast to the regular sys.exit()
diff --git a/setup.py b/setup.py
index 8df1b72..c35e40d 100644
--- a/setup.py
+++ b/setup.py
@@ -35,7 +35,7 @@ setup(
'redis >= 3.0.0',
'click >= 5.0'
],
- python_requires='>=2.7',
+ python_requires='>=3.4',
entry_points={
'console_scripts': [
'rq = rq.cli:main',
@@ -65,8 +65,6 @@ setup(
'Operating System :: MacOS',
'Operating System :: Unix',
'Programming Language :: Python',
- 'Programming Language :: Python :: 2',
- 'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
diff --git a/tests/test_serializers.py b/tests/test_serializers.py
index 58d093f..1e3e671 100644
--- a/tests/test_serializers.py
+++ b/tests/test_serializers.py
@@ -4,10 +4,11 @@ from __future__ import (absolute_import, division, print_function,
import json
import pickle
+import pickletools
import queue
import unittest
-from rq.serializers import resolve_serializer
+from rq.serializers import DefaultSerializer, resolve_serializer
class TestSerializers(unittest.TestCase):
@@ -15,7 +16,16 @@ class TestSerializers(unittest.TestCase):
"""Ensure function resolve_serializer works correctly"""
serializer = resolve_serializer(None)
self.assertIsNotNone(serializer)
- self.assertEqual(serializer, pickle)
+ self.assertEqual(serializer, DefaultSerializer)
+
+ # Test round trip with default serializer
+ test_data = {'test': 'data'}
+ serialized_data = serializer.dumps(test_data)
+ self.assertEqual(serializer.loads(serialized_data), test_data)
+ self.assertEqual(
+ next(pickletools.genops(serialized_data))[1],
+ pickle.HIGHEST_PROTOCOL
+ )
# Test using json serializer
serializer = resolve_serializer(json)