diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2018-12-03 07:42:57 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2018-12-03 07:42:57 +0700 |
commit | 331327d7d750a6a6d8a761a3a2428b707d71b9c7 (patch) | |
tree | 17622a497c7f0782e239c7a097b92edc4d827ed5 | |
parent | d1537281453958012de635b99e991e4f7770da5e (diff) | |
parent | ada2ad03ca959e993a86ce7259d6d96ef91c8bfa (diff) | |
download | rq-331327d7d750a6a6d8a761a3a2428b707d71b9c7.tar.gz |
Merge branch 'master' into v1.0
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | docs/docs/results.md | 2 | ||||
-rw-r--r-- | docs/index.md | 2 | ||||
-rw-r--r-- | requirements.txt | 2 | ||||
-rw-r--r-- | rq/cli/helpers.py | 4 | ||||
-rw-r--r-- | rq/compat/connections.py | 41 | ||||
-rw-r--r-- | rq/connections.py | 11 | ||||
-rw-r--r-- | rq/contrib/legacy.py | 2 | ||||
-rw-r--r-- | rq/defaults.py | 2 | ||||
-rw-r--r-- | rq/job.py | 5 | ||||
-rw-r--r-- | rq/queue.py | 17 | ||||
-rw-r--r-- | rq/registry.py | 4 | ||||
-rw-r--r-- | rq/worker.py | 10 | ||||
-rw-r--r-- | rq/worker_registration.py | 2 | ||||
-rw-r--r-- | setup.cfg | 2 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tests/__init__.py | 4 | ||||
-rw-r--r-- | tests/test_decorator.py | 4 | ||||
-rw-r--r-- | tests/test_queue.py | 13 | ||||
-rw-r--r-- | tests/test_registry.py | 26 | ||||
-rw-r--r-- | tests/test_worker.py | 16 |
21 files changed, 74 insertions, 99 deletions
@@ -3,7 +3,7 @@ them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily. -RQ requires Redis >= 2.7.0. +RQ requires Redis >= 3.0.0. [![Build status](https://travis-ci.org/rq/rq.svg?branch=master)](https://secure.travis-ci.org/rq/rq) [![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq) diff --git a/docs/docs/results.md b/docs/docs/results.md index e5cc9ca..a641bf8 100644 --- a/docs/docs/results.md +++ b/docs/docs/results.md @@ -73,7 +73,7 @@ possibly resubmit the job. When workers get killed in the polite way (Ctrl+C or `kill`), RQ tries hard not to lose any work. The current work is finished after which the worker will stop further processing of jobs. This ensures that jobs always get a fair -change to finish themselves. +chance to finish themselves. However, workers can be killed forcefully by `kill -9`, which will not give the workers a chance to finish the job gracefully or to put the job on the `failed` diff --git a/docs/index.md b/docs/index.md index 15e77ed..64fddcf 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,7 +7,7 @@ RQ (_Redis Queue_) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It can be integrated in your web stack easily. -RQ requires Redis >= 2.7.0. +RQ requires Redis >= 3.0.0. ## Getting started diff --git a/requirements.txt b/requirements.txt index 291ee5b..7c061a7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -redis>=2.7 +redis>=3.0 click>=3.0.0 diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 33a267b..0e964f9 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -9,7 +9,7 @@ from functools import partial import click import redis -from redis import StrictRedis +from redis import Redis from redis.sentinel import Sentinel from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS) @@ -30,7 +30,7 @@ def read_config_file(module): if k.upper() == k]) -def get_redis_from_config(settings, connection_class=StrictRedis): +def get_redis_from_config(settings, connection_class=Redis): """Returns a StrictRedis instance from a dictionary of settings. To use redis sentinel, you must specify a dictionary in the configuration file. Example of a dictionary with keys without values: diff --git a/rq/compat/connections.py b/rq/compat/connections.py index 7d8798b..8e2b511 100644 --- a/rq/compat/connections.py +++ b/rq/compat/connections.py @@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function, from functools import partial -from redis import Redis, StrictRedis +from redis import Redis def fix_return_type(func): @@ -16,42 +16,3 @@ def fix_return_type(func): value = -1 return value return _inner - - -PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl'] - - -def _hset(self, key, field_name, value, pipeline=None): - connection = pipeline if pipeline is not None else self - connection.hset(key, field_name, value) - - -def patch_connection(connection): - # Don't patch already patches objects - if all([hasattr(connection, attr) for attr in PATCHED_METHODS]): - return connection - - connection._hset = partial(_hset, connection) - - if isinstance(connection, Redis): - connection._setex = partial(StrictRedis.setex, connection) - connection._lrem = partial(StrictRedis.lrem, connection) - connection._zadd = partial(StrictRedis.zadd, connection) - connection._pipeline = partial(StrictRedis.pipeline, connection) - connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection)) - if hasattr(connection, 'pttl'): - connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection)) - - # add support for mock redis objects - elif hasattr(connection, 'setex'): - connection._setex = connection.setex - connection._lrem = connection.lrem - connection._zadd = connection.zadd - connection._pipeline = connection.pipeline - connection._ttl = connection.ttl - if hasattr(connection, 'pttl'): - connection._pttl = connection.pttl - else: - raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection))) - - return connection diff --git a/rq/connections.py b/rq/connections.py index d7f6a61..7186056 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -4,9 +4,8 @@ from __future__ import (absolute_import, division, print_function, from contextlib import contextmanager -from redis import StrictRedis +from redis import Redis -from .compat.connections import patch_connection from .local import LocalStack, release_local @@ -17,7 +16,7 @@ class NoRedisConnectionException(Exception): @contextmanager def Connection(connection=None): # noqa if connection is None: - connection = StrictRedis() + connection = Redis() push_connection(connection) try: yield @@ -30,7 +29,7 @@ def Connection(connection=None): # noqa def push_connection(redis): """Pushes the given connection on the stack.""" - _connection_stack.push(patch_connection(redis)) + _connection_stack.push(redis) def pop_connection(): @@ -47,7 +46,7 @@ def use_connection(redis=None): release_local(_connection_stack) if redis is None: - redis = StrictRedis() + redis = Redis() push_connection(redis) @@ -63,7 +62,7 @@ def resolve_connection(connection=None): Raises an exception if it cannot resolve a connection now. """ if connection is not None: - return patch_connection(connection) + return connection connection = get_current_connection() if connection is None: diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py index 710c2e3..880d865 100644 --- a/rq/contrib/legacy.py +++ b/rq/contrib/legacy.py @@ -23,7 +23,7 @@ def cleanup_ghosts(conn=None): """ conn = conn if conn else get_current_connection() for worker in Worker.all(connection=conn): - if conn._ttl(worker.key) == -1: + if conn.ttl(worker.key) == -1: ttl = worker.default_worker_ttl conn.expire(worker.key, ttl) logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl)) diff --git a/rq/defaults.py b/rq/defaults.py index 6b31a16..701e356 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -1,7 +1,7 @@ DEFAULT_JOB_CLASS = 'rq.job.Job' DEFAULT_QUEUE_CLASS = 'rq.Queue' DEFAULT_WORKER_CLASS = 'rq.Worker' -DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis' +DEFAULT_CONNECTION_CLASS = 'redis.Redis' DEFAULT_WORKER_TTL = 420 DEFAULT_JOB_MONITORING_INTERVAL = 30 DEFAULT_RESULT_TTL = 500 @@ -146,7 +146,8 @@ class Job(object): def set_status(self, status, pipeline=None): self._status = status - self.connection._hset(self.key, 'status', self._status, pipeline) + connection = pipeline or self.connection + connection.hset(self.key, 'status', self._status) @property def is_finished(self): @@ -514,7 +515,7 @@ class Job(object): cancellation. """ from .queue import Queue - pipeline = pipeline or self.connection._pipeline() + pipeline = pipeline or self.connection.pipeline() if self.origin: q = Queue(name=self.origin, connection=self.connection) q.remove(self, pipeline=pipeline) diff --git a/rq/queue.py b/rq/queue.py index d2fae1a..a1e6ec2 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -116,7 +116,7 @@ class Queue(object): if delete_jobs: self.empty() - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: pipeline.srem(self.redis_queues_keys, self._key) pipeline.delete(self._key) pipeline.execute() @@ -183,7 +183,7 @@ class Queue(object): pipeline.lrem(self.key, 1, job_id) return - return self.connection._lrem(self.key, 1, job_id) + return self.connection.lrem(self.key, 1, job_id) def compact(self): """Removes all "dead" jobs from the queue by cycling through it, while @@ -240,7 +240,7 @@ class Queue(object): if not isinstance(depends_on, self.job_class): depends_on = self.job_class(id=depends_on, connection=self.connection) - with self.connection._pipeline() as pipe: + with self.connection.pipeline() as pipe: while True: try: pipe.watch(depends_on.key) @@ -294,7 +294,12 @@ class Queue(object): # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) - timeout = kwargs.pop('timeout', None) + timeout = kwargs.pop('job_timeout', None) + if timeout is None: + timeout = kwargs.pop('timeout', None) + if timeout: + warnings.warn('The `timeout` keyword is deprecated. Use `job_timeout` instead', DeprecationWarning) + description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) ttl = kwargs.pop('ttl', None) @@ -321,7 +326,7 @@ class Queue(object): If Queue is instantiated with is_async=False, job is executed immediately. """ - pipe = pipeline if pipeline is not None else self.connection._pipeline() + pipe = pipeline if pipeline is not None else self.connection.pipeline() # Add Queue key set pipe.sadd(self.redis_queues_keys, self.key) @@ -355,7 +360,7 @@ class Queue(object): """ from .registry import DeferredJobRegistry - pipe = pipeline if pipeline is not None else self.connection._pipeline() + pipe = pipeline if pipeline is not None else self.connection.pipeline() dependents_key = job.dependents_key while True: diff --git a/rq/registry.py b/rq/registry.py index 5cfdd43..ee56c58 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -54,9 +54,9 @@ class BaseRegistry(object): if score == -1: score = '+inf' if pipeline is not None: - return pipeline.zadd(self.key, score, job.id) + return pipeline.zadd(self.key, {job.id: score}) - return self.connection._zadd(self.key, score, job.id) + return self.connection.zadd(self.key, {job.id: score}) def remove(self, job, pipeline=None): connection = pipeline if pipeline is not None else self.connection diff --git a/rq/worker.py b/rq/worker.py index b018519..2574b42 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -270,7 +270,7 @@ class Worker(object): raise ValueError(msg.format(self.name)) key = self.key queues = ','.join(self.queue_names()) - with self.connection._pipeline() as p: + with self.connection.pipeline() as p: p.delete(key) now = utcnow() now_in_string = utcformat(utcnow()) @@ -285,7 +285,7 @@ class Worker(object): def register_death(self): """Registers its own death.""" self.log.debug('Registering death') - with self.connection._pipeline() as p: + with self.connection.pipeline() as p: # We cannot use self.state = 'dead' here, because that would # rollback the pipeline worker_registration.unregister(self, p) @@ -694,7 +694,7 @@ class Worker(object): if heartbeat_ttl is None: heartbeat_ttl = self.job_monitoring_interval + 5 - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(heartbeat_ttl, pipeline=pipeline) @@ -716,7 +716,7 @@ class Worker(object): 3. Setting the workers current job to None 4. Add the job to FailedJobRegistry """ - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: if started_job_registry is None: started_job_registry = StartedJobRegistry( job.origin, @@ -749,7 +749,7 @@ class Worker(object): def handle_job_success(self, job, queue, started_job_registry): - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: while True: try: # if dependencies are inserted after enqueue_dependents diff --git a/rq/worker_registration.py b/rq/worker_registration.py index 73cb0ef..d24fac3 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -17,7 +17,7 @@ def register(worker, pipeline=None): def unregister(worker, pipeline=None): """Remove worker key from Redis.""" if pipeline is None: - connection = worker.connection._pipeline() + connection = worker.connection.pipeline() else: connection = pipeline @@ -1,5 +1,5 @@ [bdist_rpm] -requires = redis >= 2.7.0 +requires = redis >= 3.0.0 click >= 3.0 [wheel] @@ -32,7 +32,7 @@ setup( zip_safe=False, platforms='any', install_requires=[ - 'redis >= 2.7.0', + 'redis >= 3.0.0', 'click >= 5.0' ], python_requires='>=2.7', diff --git a/tests/__init__.py b/tests/__init__.py index fea12b5..c916a30 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function, import logging -from redis import StrictRedis +from redis import Redis from rq import pop_connection, push_connection from rq.compat import is_python_version @@ -19,7 +19,7 @@ def find_empty_redis_database(): will use/connect it when no keys are in there. """ for dbnum in range(4, 17): - testconn = StrictRedis(db=dbnum) + testconn = Redis(db=dbnum) empty = testconn.dbsize() == 0 if empty: return testconn diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 1b9e974..5f74935 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import mock -from redis import StrictRedis +from redis import Redis from rq.decorators import job from rq.job import Job from rq.worker import DEFAULT_RESULT_TTL @@ -152,7 +152,7 @@ class TestDecorator(RQTestCase): def test_decorator_connection_laziness(self, resolve_connection): """Ensure that job decorator resolve connection in `lazy` way """ - resolve_connection.return_value = StrictRedis() + resolve_connection.return_value = Redis() @job(queue='queue_name') def foo(): diff --git a/tests/test_queue.py b/tests/test_queue.py index 0434c62..33f24e2 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -325,12 +325,21 @@ class TestQueue(RQTestCase): job.refresh() self.assertEqual(job.failure_ttl, 10) + def test_job_timeout(self): + """Timeout can be passed via job_timeout argument""" + queue = Queue() + job = queue.enqueue(echo, 1, timeout=15) + self.assertEqual(job.timeout, 15) + + job = queue.enqueue(echo, 1, job_timeout=15) + self.assertEqual(job.timeout, 15) + def test_enqueue_explicit_args(self): """enqueue() works for both implicit/explicit args.""" q = Queue() # Implicit args/kwargs mode - job = q.enqueue(echo, 1, timeout=1, result_ttl=1, bar='baz') + job = q.enqueue(echo, 1, job_timeout=1, result_ttl=1, bar='baz') self.assertEqual(job.timeout, 1) self.assertEqual(job.result_ttl, 1) self.assertEqual( @@ -343,7 +352,7 @@ class TestQueue(RQTestCase): 'timeout': 1, 'result_ttl': 1, } - job = q.enqueue(echo, timeout=2, result_ttl=2, args=[1], kwargs=kwargs) + job = q.enqueue(echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs) self.assertEqual(job.timeout, 2) self.assertEqual(job.result_ttl, 2) self.assertEqual( diff --git a/tests/test_registry.py b/tests/test_registry.py index f4e38b8..66d5454 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -78,17 +78,17 @@ class TestRegistry(RQTestCase): def test_get_job_ids(self): """Getting job ids from StartedJobRegistry.""" timestamp = current_timestamp() - self.testconn.zadd(self.registry.key, timestamp + 10, 'foo') - self.testconn.zadd(self.registry.key, timestamp + 20, 'bar') + self.testconn.zadd(self.registry.key, {'foo': timestamp + 10}) + self.testconn.zadd(self.registry.key, {'bar': timestamp + 20}) self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) def test_get_expired_job_ids(self): """Getting expired job ids form StartedJobRegistry.""" timestamp = current_timestamp() - self.testconn.zadd(self.registry.key, 1, 'foo') - self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') - self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') + self.testconn.zadd(self.registry.key, {'foo': 1}) + self.testconn.zadd(self.registry.key, {'bar': timestamp + 10}) + self.testconn.zadd(self.registry.key, {'baz': timestamp + 30}) self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), @@ -100,7 +100,7 @@ class TestRegistry(RQTestCase): failed_job_registry = FailedJobRegistry(connection=self.testconn) job = queue.enqueue(say_hello) - self.testconn.zadd(self.registry.key, 2, job.id) + self.testconn.zadd(self.registry.key, {job.id: 2}) # Job has not been moved to FailedJobRegistry self.registry.cleanup(1) @@ -157,8 +157,8 @@ class TestRegistry(RQTestCase): def test_get_job_count(self): """StartedJobRegistry returns the right number of job count.""" timestamp = current_timestamp() + 10 - self.testconn.zadd(self.registry.key, timestamp, 'foo') - self.testconn.zadd(self.registry.key, timestamp, 'bar') + self.testconn.zadd(self.registry.key, {'foo': timestamp}) + self.testconn.zadd(self.registry.key, {'bar': timestamp}) self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) @@ -168,10 +168,10 @@ class TestRegistry(RQTestCase): queue = Queue(connection=self.testconn) finished_job_registry = FinishedJobRegistry(connection=self.testconn) - self.testconn.zadd(finished_job_registry.key, 1, 'foo') + self.testconn.zadd(finished_job_registry.key, {'foo': 1}) started_job_registry = StartedJobRegistry(connection=self.testconn) - self.testconn.zadd(started_job_registry.key, 1, 'foo') + self.testconn.zadd(started_job_registry.key, {'foo': 1}) failed_job_registry = FailedJobRegistry(connection=self.testconn) self.testconn.zadd(failed_job_registry.key, 1, 'foo') @@ -203,9 +203,9 @@ class TestFinishedJobRegistry(RQTestCase): def test_cleanup(self): """Finished job registry removes expired jobs.""" timestamp = current_timestamp() - self.testconn.zadd(self.registry.key, 1, 'foo') - self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') - self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') + self.testconn.zadd(self.registry.key, {'foo': 1}) + self.testconn.zadd(self.registry.key, {'bar': timestamp + 10}) + self.testconn.zadd(self.registry.key, {'baz': timestamp + 30}) self.registry.cleanup() self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz']) diff --git a/tests/test_worker.py b/tests/test_worker.py index d2d9c26..2f3e4f3 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -255,7 +255,7 @@ class TestWorker(RQTestCase): for timeout, expected_heartbeats in [(2, 0), (7, 1), (12, 2)]: job = q.enqueue(long_running_job, args=(timeout,), - timeout=30, + job_timeout=30, result_ttl=-1) with mock.patch.object(w, 'heartbeat', wraps=w.heartbeat) as mocked: w.execute_job(job, q) @@ -429,7 +429,7 @@ class TestWorker(RQTestCase): # Put it on the queue with a timeout value res = q.enqueue(create_file_after_timeout, args=(sentinel_file, 4), - timeout=1) + job_timeout=1) try: os.unlink(sentinel_file) @@ -452,7 +452,7 @@ class TestWorker(RQTestCase): w = Worker([q]) self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) - self.assertNotEqual(self.testconn._ttl(job.key), 0) + self.assertNotEqual(self.testconn.ttl(job.key), 0) self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) # Job with -1 result_ttl don't expire @@ -460,7 +460,7 @@ class TestWorker(RQTestCase): w = Worker([q]) self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) - self.assertEqual(self.testconn._ttl(job.key), -1) + self.assertEqual(self.testconn.ttl(job.key), -1) self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) # Job with result_ttl = 0 gets deleted immediately @@ -580,7 +580,7 @@ class TestWorker(RQTestCase): worker = SimpleWorker([queue]) job_timeout = 300 - job = queue.enqueue(save_key_ttl, worker.key, timeout=job_timeout) + job = queue.enqueue(save_key_ttl, worker.key, job_timeout=job_timeout) worker.work(burst=True) job.refresh() self.assertGreater(job.meta['ttl'], job_timeout) @@ -713,12 +713,12 @@ class TestWorker(RQTestCase): """worker.clean_registries sets last_cleaned_at and cleans registries.""" foo_queue = Queue('foo', connection=self.testconn) foo_registry = StartedJobRegistry('foo', connection=self.testconn) - self.testconn.zadd(foo_registry.key, 1, 'foo') + self.testconn.zadd(foo_registry.key, {'foo': 1}) self.assertEqual(self.testconn.zcard(foo_registry.key), 1) bar_queue = Queue('bar', connection=self.testconn) bar_registry = StartedJobRegistry('bar', connection=self.testconn) - self.testconn.zadd(bar_registry.key, 1, 'bar') + self.testconn.zadd(bar_registry.key, {'bar': 1}) self.assertEqual(self.testconn.zcard(bar_registry.key), 1) worker = Worker([foo_queue, bar_queue]) @@ -743,7 +743,7 @@ class TestWorker(RQTestCase): """Worker calls clean_registries when run.""" queue = Queue(connection=self.testconn) registry = StartedJobRegistry(connection=self.testconn) - self.testconn.zadd(registry.key, 1, 'foo') + self.testconn.zadd(registry.key, {'foo': 1}) worker = Worker(queue, connection=self.testconn) worker.work(burst=True) |