summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2018-12-03 07:42:57 +0700
committerSelwin Ong <selwin.ong@gmail.com>2018-12-03 07:42:57 +0700
commit331327d7d750a6a6d8a761a3a2428b707d71b9c7 (patch)
tree17622a497c7f0782e239c7a097b92edc4d827ed5
parentd1537281453958012de635b99e991e4f7770da5e (diff)
parentada2ad03ca959e993a86ce7259d6d96ef91c8bfa (diff)
downloadrq-331327d7d750a6a6d8a761a3a2428b707d71b9c7.tar.gz
Merge branch 'master' into v1.0
-rw-r--r--README.md2
-rw-r--r--docs/docs/results.md2
-rw-r--r--docs/index.md2
-rw-r--r--requirements.txt2
-rw-r--r--rq/cli/helpers.py4
-rw-r--r--rq/compat/connections.py41
-rw-r--r--rq/connections.py11
-rw-r--r--rq/contrib/legacy.py2
-rw-r--r--rq/defaults.py2
-rw-r--r--rq/job.py5
-rw-r--r--rq/queue.py17
-rw-r--r--rq/registry.py4
-rw-r--r--rq/worker.py10
-rw-r--r--rq/worker_registration.py2
-rw-r--r--setup.cfg2
-rw-r--r--setup.py2
-rw-r--r--tests/__init__.py4
-rw-r--r--tests/test_decorator.py4
-rw-r--r--tests/test_queue.py13
-rw-r--r--tests/test_registry.py26
-rw-r--r--tests/test_worker.py16
21 files changed, 74 insertions, 99 deletions
diff --git a/README.md b/README.md
index 2ba90e7..7a26ef8 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@ them in the background with workers. It is backed by Redis and it is designed
to have a low barrier to entry. It should be integrated in your web stack
easily.
-RQ requires Redis >= 2.7.0.
+RQ requires Redis >= 3.0.0.
[![Build status](https://travis-ci.org/rq/rq.svg?branch=master)](https://secure.travis-ci.org/rq/rq)
[![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq)
diff --git a/docs/docs/results.md b/docs/docs/results.md
index e5cc9ca..a641bf8 100644
--- a/docs/docs/results.md
+++ b/docs/docs/results.md
@@ -73,7 +73,7 @@ possibly resubmit the job.
When workers get killed in the polite way (Ctrl+C or `kill`), RQ tries hard not
to lose any work. The current work is finished after which the worker will
stop further processing of jobs. This ensures that jobs always get a fair
-change to finish themselves.
+chance to finish themselves.
However, workers can be killed forcefully by `kill -9`, which will not give the
workers a chance to finish the job gracefully or to put the job on the `failed`
diff --git a/docs/index.md b/docs/index.md
index 15e77ed..64fddcf 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -7,7 +7,7 @@ RQ (_Redis Queue_) is a simple Python library for queueing jobs and processing
them in the background with workers. It is backed by Redis and it is designed
to have a low barrier to entry. It can be integrated in your web stack easily.
-RQ requires Redis >= 2.7.0.
+RQ requires Redis >= 3.0.0.
## Getting started
diff --git a/requirements.txt b/requirements.txt
index 291ee5b..7c061a7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,2 @@
-redis>=2.7
+redis>=3.0
click>=3.0.0
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index 33a267b..0e964f9 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -9,7 +9,7 @@ from functools import partial
import click
import redis
-from redis import StrictRedis
+from redis import Redis
from redis.sentinel import Sentinel
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
@@ -30,7 +30,7 @@ def read_config_file(module):
if k.upper() == k])
-def get_redis_from_config(settings, connection_class=StrictRedis):
+def get_redis_from_config(settings, connection_class=Redis):
"""Returns a StrictRedis instance from a dictionary of settings.
To use redis sentinel, you must specify a dictionary in the configuration file.
Example of a dictionary with keys without values:
diff --git a/rq/compat/connections.py b/rq/compat/connections.py
index 7d8798b..8e2b511 100644
--- a/rq/compat/connections.py
+++ b/rq/compat/connections.py
@@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
from functools import partial
-from redis import Redis, StrictRedis
+from redis import Redis
def fix_return_type(func):
@@ -16,42 +16,3 @@ def fix_return_type(func):
value = -1
return value
return _inner
-
-
-PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl']
-
-
-def _hset(self, key, field_name, value, pipeline=None):
- connection = pipeline if pipeline is not None else self
- connection.hset(key, field_name, value)
-
-
-def patch_connection(connection):
- # Don't patch already patches objects
- if all([hasattr(connection, attr) for attr in PATCHED_METHODS]):
- return connection
-
- connection._hset = partial(_hset, connection)
-
- if isinstance(connection, Redis):
- connection._setex = partial(StrictRedis.setex, connection)
- connection._lrem = partial(StrictRedis.lrem, connection)
- connection._zadd = partial(StrictRedis.zadd, connection)
- connection._pipeline = partial(StrictRedis.pipeline, connection)
- connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection))
- if hasattr(connection, 'pttl'):
- connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection))
-
- # add support for mock redis objects
- elif hasattr(connection, 'setex'):
- connection._setex = connection.setex
- connection._lrem = connection.lrem
- connection._zadd = connection.zadd
- connection._pipeline = connection.pipeline
- connection._ttl = connection.ttl
- if hasattr(connection, 'pttl'):
- connection._pttl = connection.pttl
- else:
- raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection)))
-
- return connection
diff --git a/rq/connections.py b/rq/connections.py
index d7f6a61..7186056 100644
--- a/rq/connections.py
+++ b/rq/connections.py
@@ -4,9 +4,8 @@ from __future__ import (absolute_import, division, print_function,
from contextlib import contextmanager
-from redis import StrictRedis
+from redis import Redis
-from .compat.connections import patch_connection
from .local import LocalStack, release_local
@@ -17,7 +16,7 @@ class NoRedisConnectionException(Exception):
@contextmanager
def Connection(connection=None): # noqa
if connection is None:
- connection = StrictRedis()
+ connection = Redis()
push_connection(connection)
try:
yield
@@ -30,7 +29,7 @@ def Connection(connection=None): # noqa
def push_connection(redis):
"""Pushes the given connection on the stack."""
- _connection_stack.push(patch_connection(redis))
+ _connection_stack.push(redis)
def pop_connection():
@@ -47,7 +46,7 @@ def use_connection(redis=None):
release_local(_connection_stack)
if redis is None:
- redis = StrictRedis()
+ redis = Redis()
push_connection(redis)
@@ -63,7 +62,7 @@ def resolve_connection(connection=None):
Raises an exception if it cannot resolve a connection now.
"""
if connection is not None:
- return patch_connection(connection)
+ return connection
connection = get_current_connection()
if connection is None:
diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py
index 710c2e3..880d865 100644
--- a/rq/contrib/legacy.py
+++ b/rq/contrib/legacy.py
@@ -23,7 +23,7 @@ def cleanup_ghosts(conn=None):
"""
conn = conn if conn else get_current_connection()
for worker in Worker.all(connection=conn):
- if conn._ttl(worker.key) == -1:
+ if conn.ttl(worker.key) == -1:
ttl = worker.default_worker_ttl
conn.expire(worker.key, ttl)
logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl))
diff --git a/rq/defaults.py b/rq/defaults.py
index 6b31a16..701e356 100644
--- a/rq/defaults.py
+++ b/rq/defaults.py
@@ -1,7 +1,7 @@
DEFAULT_JOB_CLASS = 'rq.job.Job'
DEFAULT_QUEUE_CLASS = 'rq.Queue'
DEFAULT_WORKER_CLASS = 'rq.Worker'
-DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis'
+DEFAULT_CONNECTION_CLASS = 'redis.Redis'
DEFAULT_WORKER_TTL = 420
DEFAULT_JOB_MONITORING_INTERVAL = 30
DEFAULT_RESULT_TTL = 500
diff --git a/rq/job.py b/rq/job.py
index 4ae0e44..b676228 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -146,7 +146,8 @@ class Job(object):
def set_status(self, status, pipeline=None):
self._status = status
- self.connection._hset(self.key, 'status', self._status, pipeline)
+ connection = pipeline or self.connection
+ connection.hset(self.key, 'status', self._status)
@property
def is_finished(self):
@@ -514,7 +515,7 @@ class Job(object):
cancellation.
"""
from .queue import Queue
- pipeline = pipeline or self.connection._pipeline()
+ pipeline = pipeline or self.connection.pipeline()
if self.origin:
q = Queue(name=self.origin, connection=self.connection)
q.remove(self, pipeline=pipeline)
diff --git a/rq/queue.py b/rq/queue.py
index d2fae1a..a1e6ec2 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -116,7 +116,7 @@ class Queue(object):
if delete_jobs:
self.empty()
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
pipeline.srem(self.redis_queues_keys, self._key)
pipeline.delete(self._key)
pipeline.execute()
@@ -183,7 +183,7 @@ class Queue(object):
pipeline.lrem(self.key, 1, job_id)
return
- return self.connection._lrem(self.key, 1, job_id)
+ return self.connection.lrem(self.key, 1, job_id)
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
@@ -240,7 +240,7 @@ class Queue(object):
if not isinstance(depends_on, self.job_class):
depends_on = self.job_class(id=depends_on,
connection=self.connection)
- with self.connection._pipeline() as pipe:
+ with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(depends_on.key)
@@ -294,7 +294,12 @@ class Queue(object):
# Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
- timeout = kwargs.pop('timeout', None)
+ timeout = kwargs.pop('job_timeout', None)
+ if timeout is None:
+ timeout = kwargs.pop('timeout', None)
+ if timeout:
+ warnings.warn('The `timeout` keyword is deprecated. Use `job_timeout` instead', DeprecationWarning)
+
description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None)
ttl = kwargs.pop('ttl', None)
@@ -321,7 +326,7 @@ class Queue(object):
If Queue is instantiated with is_async=False, job is executed immediately.
"""
- pipe = pipeline if pipeline is not None else self.connection._pipeline()
+ pipe = pipeline if pipeline is not None else self.connection.pipeline()
# Add Queue key set
pipe.sadd(self.redis_queues_keys, self.key)
@@ -355,7 +360,7 @@ class Queue(object):
"""
from .registry import DeferredJobRegistry
- pipe = pipeline if pipeline is not None else self.connection._pipeline()
+ pipe = pipeline if pipeline is not None else self.connection.pipeline()
dependents_key = job.dependents_key
while True:
diff --git a/rq/registry.py b/rq/registry.py
index 5cfdd43..ee56c58 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -54,9 +54,9 @@ class BaseRegistry(object):
if score == -1:
score = '+inf'
if pipeline is not None:
- return pipeline.zadd(self.key, score, job.id)
+ return pipeline.zadd(self.key, {job.id: score})
- return self.connection._zadd(self.key, score, job.id)
+ return self.connection.zadd(self.key, {job.id: score})
def remove(self, job, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
diff --git a/rq/worker.py b/rq/worker.py
index b018519..2574b42 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -270,7 +270,7 @@ class Worker(object):
raise ValueError(msg.format(self.name))
key = self.key
queues = ','.join(self.queue_names())
- with self.connection._pipeline() as p:
+ with self.connection.pipeline() as p:
p.delete(key)
now = utcnow()
now_in_string = utcformat(utcnow())
@@ -285,7 +285,7 @@ class Worker(object):
def register_death(self):
"""Registers its own death."""
self.log.debug('Registering death')
- with self.connection._pipeline() as p:
+ with self.connection.pipeline() as p:
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
worker_registration.unregister(self, p)
@@ -694,7 +694,7 @@ class Worker(object):
if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 5
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
@@ -716,7 +716,7 @@ class Worker(object):
3. Setting the workers current job to None
4. Add the job to FailedJobRegistry
"""
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
job.origin,
@@ -749,7 +749,7 @@ class Worker(object):
def handle_job_success(self, job, queue, started_job_registry):
- with self.connection._pipeline() as pipeline:
+ with self.connection.pipeline() as pipeline:
while True:
try:
# if dependencies are inserted after enqueue_dependents
diff --git a/rq/worker_registration.py b/rq/worker_registration.py
index 73cb0ef..d24fac3 100644
--- a/rq/worker_registration.py
+++ b/rq/worker_registration.py
@@ -17,7 +17,7 @@ def register(worker, pipeline=None):
def unregister(worker, pipeline=None):
"""Remove worker key from Redis."""
if pipeline is None:
- connection = worker.connection._pipeline()
+ connection = worker.connection.pipeline()
else:
connection = pipeline
diff --git a/setup.cfg b/setup.cfg
index 5752f88..912a1b4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,5 +1,5 @@
[bdist_rpm]
-requires = redis >= 2.7.0
+requires = redis >= 3.0.0
click >= 3.0
[wheel]
diff --git a/setup.py b/setup.py
index 3c425c7..2d9756f 100644
--- a/setup.py
+++ b/setup.py
@@ -32,7 +32,7 @@ setup(
zip_safe=False,
platforms='any',
install_requires=[
- 'redis >= 2.7.0',
+ 'redis >= 3.0.0',
'click >= 5.0'
],
python_requires='>=2.7',
diff --git a/tests/__init__.py b/tests/__init__.py
index fea12b5..c916a30 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import logging
-from redis import StrictRedis
+from redis import Redis
from rq import pop_connection, push_connection
from rq.compat import is_python_version
@@ -19,7 +19,7 @@ def find_empty_redis_database():
will use/connect it when no keys are in there.
"""
for dbnum in range(4, 17):
- testconn = StrictRedis(db=dbnum)
+ testconn = Redis(db=dbnum)
empty = testconn.dbsize() == 0
if empty:
return testconn
diff --git a/tests/test_decorator.py b/tests/test_decorator.py
index 1b9e974..5f74935 100644
--- a/tests/test_decorator.py
+++ b/tests/test_decorator.py
@@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import mock
-from redis import StrictRedis
+from redis import Redis
from rq.decorators import job
from rq.job import Job
from rq.worker import DEFAULT_RESULT_TTL
@@ -152,7 +152,7 @@ class TestDecorator(RQTestCase):
def test_decorator_connection_laziness(self, resolve_connection):
"""Ensure that job decorator resolve connection in `lazy` way """
- resolve_connection.return_value = StrictRedis()
+ resolve_connection.return_value = Redis()
@job(queue='queue_name')
def foo():
diff --git a/tests/test_queue.py b/tests/test_queue.py
index 0434c62..33f24e2 100644
--- a/tests/test_queue.py
+++ b/tests/test_queue.py
@@ -325,12 +325,21 @@ class TestQueue(RQTestCase):
job.refresh()
self.assertEqual(job.failure_ttl, 10)
+ def test_job_timeout(self):
+ """Timeout can be passed via job_timeout argument"""
+ queue = Queue()
+ job = queue.enqueue(echo, 1, timeout=15)
+ self.assertEqual(job.timeout, 15)
+
+ job = queue.enqueue(echo, 1, job_timeout=15)
+ self.assertEqual(job.timeout, 15)
+
def test_enqueue_explicit_args(self):
"""enqueue() works for both implicit/explicit args."""
q = Queue()
# Implicit args/kwargs mode
- job = q.enqueue(echo, 1, timeout=1, result_ttl=1, bar='baz')
+ job = q.enqueue(echo, 1, job_timeout=1, result_ttl=1, bar='baz')
self.assertEqual(job.timeout, 1)
self.assertEqual(job.result_ttl, 1)
self.assertEqual(
@@ -343,7 +352,7 @@ class TestQueue(RQTestCase):
'timeout': 1,
'result_ttl': 1,
}
- job = q.enqueue(echo, timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
+ job = q.enqueue(echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
self.assertEqual(job.timeout, 2)
self.assertEqual(job.result_ttl, 2)
self.assertEqual(
diff --git a/tests/test_registry.py b/tests/test_registry.py
index f4e38b8..66d5454 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -78,17 +78,17 @@ class TestRegistry(RQTestCase):
def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry."""
timestamp = current_timestamp()
- self.testconn.zadd(self.registry.key, timestamp + 10, 'foo')
- self.testconn.zadd(self.registry.key, timestamp + 20, 'bar')
+ self.testconn.zadd(self.registry.key, {'foo': timestamp + 10})
+ self.testconn.zadd(self.registry.key, {'bar': timestamp + 20})
self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar'])
def test_get_expired_job_ids(self):
"""Getting expired job ids form StartedJobRegistry."""
timestamp = current_timestamp()
- self.testconn.zadd(self.registry.key, 1, 'foo')
- self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
- self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
+ self.testconn.zadd(self.registry.key, {'foo': 1})
+ self.testconn.zadd(self.registry.key, {'bar': timestamp + 10})
+ self.testconn.zadd(self.registry.key, {'baz': timestamp + 30})
self.assertEqual(self.registry.get_expired_job_ids(), ['foo'])
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
@@ -100,7 +100,7 @@ class TestRegistry(RQTestCase):
failed_job_registry = FailedJobRegistry(connection=self.testconn)
job = queue.enqueue(say_hello)
- self.testconn.zadd(self.registry.key, 2, job.id)
+ self.testconn.zadd(self.registry.key, {job.id: 2})
# Job has not been moved to FailedJobRegistry
self.registry.cleanup(1)
@@ -157,8 +157,8 @@ class TestRegistry(RQTestCase):
def test_get_job_count(self):
"""StartedJobRegistry returns the right number of job count."""
timestamp = current_timestamp() + 10
- self.testconn.zadd(self.registry.key, timestamp, 'foo')
- self.testconn.zadd(self.registry.key, timestamp, 'bar')
+ self.testconn.zadd(self.registry.key, {'foo': timestamp})
+ self.testconn.zadd(self.registry.key, {'bar': timestamp})
self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2)
@@ -168,10 +168,10 @@ class TestRegistry(RQTestCase):
queue = Queue(connection=self.testconn)
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
- self.testconn.zadd(finished_job_registry.key, 1, 'foo')
+ self.testconn.zadd(finished_job_registry.key, {'foo': 1})
started_job_registry = StartedJobRegistry(connection=self.testconn)
- self.testconn.zadd(started_job_registry.key, 1, 'foo')
+ self.testconn.zadd(started_job_registry.key, {'foo': 1})
failed_job_registry = FailedJobRegistry(connection=self.testconn)
self.testconn.zadd(failed_job_registry.key, 1, 'foo')
@@ -203,9 +203,9 @@ class TestFinishedJobRegistry(RQTestCase):
def test_cleanup(self):
"""Finished job registry removes expired jobs."""
timestamp = current_timestamp()
- self.testconn.zadd(self.registry.key, 1, 'foo')
- self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
- self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
+ self.testconn.zadd(self.registry.key, {'foo': 1})
+ self.testconn.zadd(self.registry.key, {'bar': timestamp + 10})
+ self.testconn.zadd(self.registry.key, {'baz': timestamp + 30})
self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz'])
diff --git a/tests/test_worker.py b/tests/test_worker.py
index d2d9c26..2f3e4f3 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -255,7 +255,7 @@ class TestWorker(RQTestCase):
for timeout, expected_heartbeats in [(2, 0), (7, 1), (12, 2)]:
job = q.enqueue(long_running_job,
args=(timeout,),
- timeout=30,
+ job_timeout=30,
result_ttl=-1)
with mock.patch.object(w, 'heartbeat', wraps=w.heartbeat) as mocked:
w.execute_job(job, q)
@@ -429,7 +429,7 @@ class TestWorker(RQTestCase):
# Put it on the queue with a timeout value
res = q.enqueue(create_file_after_timeout,
args=(sentinel_file, 4),
- timeout=1)
+ job_timeout=1)
try:
os.unlink(sentinel_file)
@@ -452,7 +452,7 @@ class TestWorker(RQTestCase):
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
- self.assertNotEqual(self.testconn._ttl(job.key), 0)
+ self.assertNotEqual(self.testconn.ttl(job.key), 0)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
# Job with -1 result_ttl don't expire
@@ -460,7 +460,7 @@ class TestWorker(RQTestCase):
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
- self.assertEqual(self.testconn._ttl(job.key), -1)
+ self.assertEqual(self.testconn.ttl(job.key), -1)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
# Job with result_ttl = 0 gets deleted immediately
@@ -580,7 +580,7 @@ class TestWorker(RQTestCase):
worker = SimpleWorker([queue])
job_timeout = 300
- job = queue.enqueue(save_key_ttl, worker.key, timeout=job_timeout)
+ job = queue.enqueue(save_key_ttl, worker.key, job_timeout=job_timeout)
worker.work(burst=True)
job.refresh()
self.assertGreater(job.meta['ttl'], job_timeout)
@@ -713,12 +713,12 @@ class TestWorker(RQTestCase):
"""worker.clean_registries sets last_cleaned_at and cleans registries."""
foo_queue = Queue('foo', connection=self.testconn)
foo_registry = StartedJobRegistry('foo', connection=self.testconn)
- self.testconn.zadd(foo_registry.key, 1, 'foo')
+ self.testconn.zadd(foo_registry.key, {'foo': 1})
self.assertEqual(self.testconn.zcard(foo_registry.key), 1)
bar_queue = Queue('bar', connection=self.testconn)
bar_registry = StartedJobRegistry('bar', connection=self.testconn)
- self.testconn.zadd(bar_registry.key, 1, 'bar')
+ self.testconn.zadd(bar_registry.key, {'bar': 1})
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)
worker = Worker([foo_queue, bar_queue])
@@ -743,7 +743,7 @@ class TestWorker(RQTestCase):
"""Worker calls clean_registries when run."""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
- self.testconn.zadd(registry.key, 1, 'foo')
+ self.testconn.zadd(registry.key, {'foo': 1})
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)