summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md2
-rw-r--r--docs/index.md2
-rw-r--r--requirements.txt2
-rw-r--r--rq/cli/helpers.py4
-rw-r--r--rq/compat/connections.py41
-rw-r--r--rq/connections.py11
-rw-r--r--rq/contrib/legacy.py2
-rw-r--r--rq/defaults.py2
-rw-r--r--rq/job.py5
-rw-r--r--rq/queue.py14
-rw-r--r--rq/registry.py4
-rw-r--r--rq/worker.py10
-rw-r--r--rq/worker_registration.py2
-rw-r--r--setup.cfg2
-rw-r--r--setup.py2
-rw-r--r--tests/__init__.py4
-rw-r--r--tests/test_decorator.py4
-rw-r--r--tests/test_registry.py26
-rw-r--r--tests/test_worker.py10
19 files changed, 55 insertions, 94 deletions
diff --git a/README.md b/README.md
index 2ba90e7..7a26ef8 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@ them in the background with workers. It is backed by Redis and it is designed
to have a low barrier to entry. It should be integrated in your web stack
easily.
-RQ requires Redis >= 2.7.0.
+RQ requires Redis >= 3.0.0.
[![Build status](https://travis-ci.org/rq/rq.svg?branch=master)](https://secure.travis-ci.org/rq/rq)
[![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq)
diff --git a/docs/index.md b/docs/index.md
index 15e77ed..64fddcf 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -7,7 +7,7 @@ RQ (_Redis Queue_) is a simple Python library for queueing jobs and processing
them in the background with workers. It is backed by Redis and it is designed
to have a low barrier to entry. It can be integrated in your web stack easily.
-RQ requires Redis >= 2.7.0.
+RQ requires Redis >= 3.0.0.
## Getting started
diff --git a/requirements.txt b/requirements.txt
index 291ee5b..7c061a7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,2 @@
-redis>=2.7
+redis>=3.0
click>=3.0.0
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index 33a267b..0e964f9 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -9,7 +9,7 @@ from functools import partial
import click
import redis
-from redis import StrictRedis
+from redis import Redis
from redis.sentinel import Sentinel
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
@@ -30,7 +30,7 @@ def read_config_file(module):
if k.upper() == k])
-def get_redis_from_config(settings, connection_class=StrictRedis):
+def get_redis_from_config(settings, connection_class=Redis):
"""Returns a StrictRedis instance from a dictionary of settings.
To use redis sentinel, you must specify a dictionary in the configuration file.
Example of a dictionary with keys without values:
diff --git a/rq/compat/connections.py b/rq/compat/connections.py
index 7d8798b..8e2b511 100644
--- a/rq/compat/connections.py
+++ b/rq/compat/connections.py
@@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
from functools import partial
-from redis import Redis, StrictRedis
+from redis import Redis
def fix_return_type(func):
@@ -16,42 +16,3 @@ def fix_return_type(func):
value = -1
return value
return _inner
-
-
-PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl']
-
-
-def _hset(self, key, field_name, value, pipeline=None):
- connection = pipeline if pipeline is not None else self
- connection.hset(key, field_name, value)
-
-
-def patch_connection(connection):
- # Don't patch already patches objects
- if all([hasattr(connection, attr) for attr in PATCHED_METHODS]):
- return connection
-
- connection._hset = partial(_hset, connection)
-
- if isinstance(connection, Redis):
- connection._setex = partial(StrictRedis.setex, connection)
- connection._lrem = partial(StrictRedis.lrem, connection)
- connection._zadd = partial(StrictRedis.zadd, connection)
- connection._pipeline = partial(StrictRedis.pipeline, connection)
- connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection))
- if hasattr(connection, 'pttl'):
- connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection))
-
- # add support for mock redis objects
- elif hasattr(connection, 'setex'):
- connection._setex = connection.setex
- connection._lrem = connection.lrem
- connection._zadd = connection.zadd
- connection._pipeline = connection.pipeline
- connection._ttl = connection.ttl
- if hasattr(connection, 'pttl'):
- connection._pttl = connection.pttl
- else:
- raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection)))
-
- return connection
diff --git a/rq/connections.py b/rq/connections.py
index d7f6a61..7186056 100644
--- a/rq/connections.py
+++ b/rq/connections.py
@@ -4,9 +4,8 @@ from __future__ import (absolute_import, division, print_function,
from contextlib import contextmanager
-from redis import StrictRedis
+from redis import Redis
-from .compat.connections import patch_connection
from .local import LocalStack, release_local
@@ -17,7 +16,7 @@ class NoRedisConnectionException(Exception):
@contextmanager
def Connection(connection=None): # noqa
if connection is None:
- connection = StrictRedis()
+ connection = Redis()
push_connection(connection)
try:
yield
@@ -30,7 +29,7 @@ def Connection(connection=None): # noqa
def push_connection(redis):
"""Pushes the given connection on the stack."""
- _connection_stack.push(patch_connection(redis))
+ _connection_stack.push(redis)
def pop_connection():
@@ -47,7 +46,7 @@ def use_connection(redis=None):
release_local(_connection_stack)
if redis is None:
- redis = StrictRedis()
+ redis = Redis()
push_connection(redis)
@@ -63,7 +62,7 @@ def resolve_connection(connection=None):
Raises an exception if it cannot resolve a connection now.
"""
if connection is not None:
- return patch_connection(connection)
+ return connection
connection = get_current_connection()
if connection is None:
diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py
index 710c2e3..880d865 100644
--- a/rq/contrib/legacy.py
+++ b/rq/contrib/legacy.py
@@ -23,7 +23,7 @@ def cleanup_ghosts(conn=None):
"""
conn = conn if conn else get_current_connection()
for worker in Worker.all(connection=conn):
- if conn._ttl(worker.key) == -1:
+ if conn.ttl(worker.key) == -1:
ttl = worker.default_worker_ttl
conn.expire(worker.key, ttl)
logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl))
diff --git a/rq/defaults.py b/rq/defaults.py
index 517144c..89792c0 100644
--- a/rq/defaults.py
+++ b/rq/defaults.py
@@ -1,7 +1,7 @@
DEFAULT_JOB_CLASS = 'rq.job.Job'
DEFAULT_QUEUE_CLASS = 'rq.Queue'
DEFAULT_WORKER_CLASS = 'rq.Worker'
-DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis'
+DEFAULT_CONNECTION_CLASS = 'redis.Redis'
DEFAULT_WORKER_TTL = 420
DEFAULT_JOB_MONITORING_INTERVAL = 30
DEFAULT_RESULT_TTL = 500
diff --git a/rq/job.py b/rq/job.py
index 9432813..e77b085 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -155,7 +155,8 @@ class Job(object):
def set_status(self, status, pipeline=None):
self._status = status
- self.connection._hset(self.key, 'status', self._status, pipeline)
+ connection = pipeline or self.connection
+ connection.hset(self.key, 'status', self._status)
def _set_status(self, status):
warnings.warn(
@@ -529,7 +530,7 @@ class Job(object):
cancellation.
"""
from .queue import Queue
- pipeline = pipeline or self.connection._pipeline()
+ pipeline = pipeline or self.connection.pipeline()
if self.origin:
q = Queue(name=self.origin, connection=self.connection)
q.remove(self, pipeline=pipeline)
diff --git a/rq/queue.py b/rq/queue.py
index b8b5b47..2116e50 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -121,7 +121,7 @@ class Queue(object):
if delete_jobs:
self.empty()
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
pipeline.srem(self.redis_queues_keys, self._key)
pipeline.delete(self._key)
pipeline.execute()
@@ -183,7 +183,7 @@ class Queue(object):
pipeline.lrem(self.key, 1, job_id)
return
- return self.connection._lrem(self.key, 1, job_id)
+ return self.connection.lrem(self.key, 1, job_id)
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
@@ -237,7 +237,7 @@ class Queue(object):
if not isinstance(depends_on, self.job_class):
depends_on = self.job_class(id=depends_on,
connection=self.connection)
- with self.connection._pipeline() as pipe:
+ with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(depends_on.key)
@@ -320,7 +320,7 @@ class Queue(object):
If Queue is instantiated with is_async=False, job is executed immediately.
"""
- pipe = pipeline if pipeline is not None else self.connection._pipeline()
+ pipe = pipeline if pipeline is not None else self.connection.pipeline()
# Add Queue key set
pipe.sadd(self.redis_queues_keys, self.key)
@@ -354,7 +354,7 @@ class Queue(object):
"""
from .registry import DeferredJobRegistry
- pipe = pipeline if pipeline is not None else self.connection._pipeline()
+ pipe = pipeline if pipeline is not None else self.connection.pipeline()
dependents_key = job.dependents_key
while True:
@@ -522,7 +522,7 @@ class FailedQueue(Queue):
queue).
"""
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
@@ -530,7 +530,7 @@ class FailedQueue(Queue):
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire
- self.push_job_id(job.id, pipeline=pipeline)
+ self.push_job_id(str(job.id), pipeline=pipeline)
pipeline.execute()
return job
diff --git a/rq/registry.py b/rq/registry.py
index 2b8b992..b482c30 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -44,9 +44,9 @@ class BaseRegistry(object):
if score == -1:
score = '+inf'
if pipeline is not None:
- return pipeline.zadd(self.key, score, job.id)
+ return pipeline.zadd(self.key, {job.id: score})
- return self.connection._zadd(self.key, score, job.id)
+ return self.connection.zadd(self.key, {job.id: score})
def remove(self, job, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
diff --git a/rq/worker.py b/rq/worker.py
index cdfa33f..3e21cf4 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -277,7 +277,7 @@ class Worker(object):
raise ValueError(msg.format(self.name))
key = self.key
queues = ','.join(self.queue_names())
- with self.connection._pipeline() as p:
+ with self.connection.pipeline() as p:
p.delete(key)
now = utcnow()
now_in_string = utcformat(utcnow())
@@ -292,7 +292,7 @@ class Worker(object):
def register_death(self):
"""Registers its own death."""
self.log.debug('Registering death')
- with self.connection._pipeline() as p:
+ with self.connection.pipeline() as p:
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
worker_registration.unregister(self, p)
@@ -703,7 +703,7 @@ class Worker(object):
if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 5
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
@@ -723,7 +723,7 @@ class Worker(object):
2. Removing the job from the started_job_registry
3. Setting the workers current job to None
"""
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(job.origin,
self.connection,
@@ -745,7 +745,7 @@ class Worker(object):
def handle_job_success(self, job, queue, started_job_registry):
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
while True:
try:
# if dependencies are inserted after enqueue_dependents
diff --git a/rq/worker_registration.py b/rq/worker_registration.py
index 73cb0ef..d24fac3 100644
--- a/rq/worker_registration.py
+++ b/rq/worker_registration.py
@@ -17,7 +17,7 @@ def register(worker, pipeline=None):
def unregister(worker, pipeline=None):
"""Remove worker key from Redis."""
if pipeline is None:
- connection = worker.connection._pipeline()
+ connection = worker.connection.pipeline()
else:
connection = pipeline
diff --git a/setup.cfg b/setup.cfg
index 5752f88..912a1b4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,5 +1,5 @@
[bdist_rpm]
-requires = redis >= 2.7.0
+requires = redis >= 3.0.0
click >= 3.0
[wheel]
diff --git a/setup.py b/setup.py
index 3c425c7..2d9756f 100644
--- a/setup.py
+++ b/setup.py
@@ -32,7 +32,7 @@ setup(
zip_safe=False,
platforms='any',
install_requires=[
- 'redis >= 2.7.0',
+ 'redis >= 3.0.0',
'click >= 5.0'
],
python_requires='>=2.7',
diff --git a/tests/__init__.py b/tests/__init__.py
index fea12b5..c916a30 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import logging
-from redis import StrictRedis
+from redis import Redis
from rq import pop_connection, push_connection
from rq.compat import is_python_version
@@ -19,7 +19,7 @@ def find_empty_redis_database():
will use/connect it when no keys are in there.
"""
for dbnum in range(4, 17):
- testconn = StrictRedis(db=dbnum)
+ testconn = Redis(db=dbnum)
empty = testconn.dbsize() == 0
if empty:
return testconn
diff --git a/tests/test_decorator.py b/tests/test_decorator.py
index 1b9e974..5f74935 100644
--- a/tests/test_decorator.py
+++ b/tests/test_decorator.py
@@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import mock
-from redis import StrictRedis
+from redis import Redis
from rq.decorators import job
from rq.job import Job
from rq.worker import DEFAULT_RESULT_TTL
@@ -152,7 +152,7 @@ class TestDecorator(RQTestCase):
def test_decorator_connection_laziness(self, resolve_connection):
"""Ensure that job decorator resolve connection in `lazy` way """
- resolve_connection.return_value = StrictRedis()
+ resolve_connection.return_value = Redis()
@job(queue='queue_name')
def foo():
diff --git a/tests/test_registry.py b/tests/test_registry.py
index 41d0c1c..ed2dbd2 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -62,17 +62,17 @@ class TestRegistry(RQTestCase):
def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry."""
timestamp = current_timestamp()
- self.testconn.zadd(self.registry.key, timestamp + 10, 'foo')
- self.testconn.zadd(self.registry.key, timestamp + 20, 'bar')
+ self.testconn.zadd(self.registry.key, {'foo': timestamp + 10})
+ self.testconn.zadd(self.registry.key, {'bar': timestamp + 20})
self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar'])
def test_get_expired_job_ids(self):
"""Getting expired job ids form StartedJobRegistry."""
timestamp = current_timestamp()
- self.testconn.zadd(self.registry.key, 1, 'foo')
- self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
- self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
+ self.testconn.zadd(self.registry.key, {'foo': 1})
+ self.testconn.zadd(self.registry.key, {'bar': timestamp + 10})
+ self.testconn.zadd(self.registry.key, {'baz': timestamp + 30})
self.assertEqual(self.registry.get_expired_job_ids(), ['foo'])
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
@@ -86,7 +86,7 @@ class TestRegistry(RQTestCase):
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
- self.testconn.zadd(self.registry.key, 2, job.id)
+ self.testconn.zadd(self.registry.key, {job.id: 2})
self.registry.cleanup(1)
self.assertNotIn(job.id, failed_queue.job_ids)
@@ -142,8 +142,8 @@ class TestRegistry(RQTestCase):
def test_get_job_count(self):
"""StartedJobRegistry returns the right number of job count."""
timestamp = current_timestamp() + 10
- self.testconn.zadd(self.registry.key, timestamp, 'foo')
- self.testconn.zadd(self.registry.key, timestamp, 'bar')
+ self.testconn.zadd(self.registry.key, {'foo': timestamp})
+ self.testconn.zadd(self.registry.key, {'bar': timestamp})
self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2)
@@ -153,10 +153,10 @@ class TestRegistry(RQTestCase):
queue = Queue(connection=self.testconn)
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
- self.testconn.zadd(finished_job_registry.key, 1, 'foo')
+ self.testconn.zadd(finished_job_registry.key, {'foo': 1})
started_job_registry = StartedJobRegistry(connection=self.testconn)
- self.testconn.zadd(started_job_registry.key, 1, 'foo')
+ self.testconn.zadd(started_job_registry.key, {'foo': 1})
clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
@@ -175,9 +175,9 @@ class TestFinishedJobRegistry(RQTestCase):
def test_cleanup(self):
"""Finished job registry removes expired jobs."""
timestamp = current_timestamp()
- self.testconn.zadd(self.registry.key, 1, 'foo')
- self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
- self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
+ self.testconn.zadd(self.registry.key, {'foo': 1})
+ self.testconn.zadd(self.registry.key, {'bar': timestamp + 10})
+ self.testconn.zadd(self.registry.key, {'baz': timestamp + 30})
self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz'])
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 5d2370e..d56b5cb 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -415,7 +415,7 @@ class TestWorker(RQTestCase):
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
- self.assertNotEqual(self.testconn._ttl(job.key), 0)
+ self.assertNotEqual(self.testconn.ttl(job.key), 0)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
# Job with -1 result_ttl don't expire
@@ -423,7 +423,7 @@ class TestWorker(RQTestCase):
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
- self.assertEqual(self.testconn._ttl(job.key), -1)
+ self.assertEqual(self.testconn.ttl(job.key), -1)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
# Job with result_ttl = 0 gets deleted immediately
@@ -676,12 +676,12 @@ class TestWorker(RQTestCase):
"""worker.clean_registries sets last_cleaned_at and cleans registries."""
foo_queue = Queue('foo', connection=self.testconn)
foo_registry = StartedJobRegistry('foo', connection=self.testconn)
- self.testconn.zadd(foo_registry.key, 1, 'foo')
+ self.testconn.zadd(foo_registry.key, {'foo': 1})
self.assertEqual(self.testconn.zcard(foo_registry.key), 1)
bar_queue = Queue('bar', connection=self.testconn)
bar_registry = StartedJobRegistry('bar', connection=self.testconn)
- self.testconn.zadd(bar_registry.key, 1, 'bar')
+ self.testconn.zadd(bar_registry.key, {'bar': 1})
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)
worker = Worker([foo_queue, bar_queue])
@@ -706,7 +706,7 @@ class TestWorker(RQTestCase):
"""Worker calls clean_registries when run."""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
- self.testconn.zadd(registry.key, 1, 'foo')
+ self.testconn.zadd(registry.key, {'foo': 1})
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)