diff options
-rw-r--r-- | rq/worker.py | 4 | ||||
-rw-r--r-- | rq/worker_registration.py | 32 | ||||
-rw-r--r-- | tests/test_worker_registration.py | 67 | ||||
-rw-r--r-- | tests/test_worker_registry.py | 33 |
4 files changed, 89 insertions, 47 deletions
diff --git a/rq/worker.py b/rq/worker.py index 6912e8c..ca3f697 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -91,7 +91,7 @@ WorkerStatus = enum( class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' - redis_workers_keys = 'rq:workers' + redis_workers_keys = worker_registration.REDIS_WORKER_KEYS death_penalty_class = UnixSignalDeathPenalty queue_class = Queue job_class = Job @@ -126,7 +126,7 @@ class Worker(object): """ prefix = cls.redis_worker_namespace_prefix if not worker_key.startswith(prefix): - raise ValueError('Not a valid RQ worker key: {0}'.format(worker_key)) + raise ValueError('Not a valid RQ worker key: %s' % worker_key) if connection is None: connection = get_current_connection() diff --git a/rq/worker_registration.py b/rq/worker_registration.py index a5c240b..5c2b578 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -1,34 +1,42 @@ -# from .worker import Worker +from .compat import as_text -workers_by_queue_key = 'rq:workers:%s' +WORKERS_BY_QUEUE_KEY = 'rq:workers:%s' +REDIS_WORKER_KEYS = 'rq:workers' def register(worker, pipeline=None): - """ - Store worker key in Redis data structures so we can easily discover - all active workers. - """ + """Store worker key in Redis so we can easily discover active workers.""" connection = pipeline if pipeline is not None else worker.connection connection.sadd(worker.redis_workers_keys, worker.key) for name in worker.queue_names(): - redis_key = workers_by_queue_key % name + redis_key = WORKERS_BY_QUEUE_KEY % name connection.sadd(redis_key, worker.key) def unregister(worker, pipeline=None): - """ - Remove worker key from Redis. - """ + """Remove worker key from Redis.""" if pipeline is None: connection = worker.connection._pipeline() else: connection = pipeline - + connection.srem(worker.redis_workers_keys, worker.key) for name in worker.queue_names(): - redis_key = workers_by_queue_key % name + redis_key = WORKERS_BY_QUEUE_KEY % name connection.srem(redis_key, worker.key) if pipeline is None: connection.execute() + + +def get_keys(queue=None, connection=None): + """Returns a list of worker keys for a queue""" + if queue: + redis = queue.connection + redis_key = WORKERS_BY_QUEUE_KEY % queue.name + else: + redis = connection + redis_key = REDIS_WORKER_KEYS + + return {as_text(key) for key in redis.smembers(redis_key)} diff --git a/tests/test_worker_registration.py b/tests/test_worker_registration.py new file mode 100644 index 0000000..523d69f --- /dev/null +++ b/tests/test_worker_registration.py @@ -0,0 +1,67 @@ +from tests import RQTestCase + +from rq import Queue, Worker +from rq.worker_registration import (get_keys, register, unregister, + WORKERS_BY_QUEUE_KEY) + + +class TestWorkerRegistry(RQTestCase): + + def test_worker_registration(self): + """Ensure worker.key is correctly set in Redis.""" + foo_queue = Queue(name='foo') + bar_queue = Queue(name='bar') + worker = Worker([foo_queue, bar_queue]) + + register(worker) + redis = worker.connection + + self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key)) + self.assertTrue( + redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) + ) + self.assertTrue( + redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) + ) + + unregister(worker) + self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key)) + self.assertFalse( + redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) + ) + self.assertFalse( + redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) + ) + + def test_get_keys_by_queue(self): + """get_keys_by_queue only returns active workers for that queue""" + foo_queue = Queue(name='foo') + bar_queue = Queue(name='bar') + baz_queue = Queue(name='baz') + + worker1 = Worker([foo_queue, bar_queue]) + worker2 = Worker([foo_queue]) + worker3 = Worker([baz_queue]) + + self.assertEqual(set(), get_keys(foo_queue)) + + register(worker1) + register(worker2) + register(worker3) + + # get_keys(queue) will return worker keys for that queue + self.assertEqual( + set([worker1.key, worker2.key]), + get_keys(foo_queue) + ) + self.assertEqual(set([worker1.key]), get_keys(bar_queue)) + + # get_keys(connection=connection) will return all worker keys + self.assertEqual( + set([worker1.key, worker2.key, worker3.key]), + get_keys(connection=worker1.connection) + ) + + unregister(worker1) + unregister(worker2) + unregister(worker3) diff --git a/tests/test_worker_registry.py b/tests/test_worker_registry.py deleted file mode 100644 index 8393348..0000000 --- a/tests/test_worker_registry.py +++ /dev/null @@ -1,33 +0,0 @@ -from tests import RQTestCase - -from rq import Queue, Worker -from rq.worker_registration import register, unregister, workers_by_queue_key - - -class TestWorkerRegistry(RQTestCase): - - def test_worker_registration(self): - """Ensure worker.key is correctly set in Redis.""" - foo_queue = Queue(name='foo') - bar_queue = Queue(name='bar') - worker = Worker([foo_queue, bar_queue]) - - register(worker) - redis = worker.connection - - self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key)) - self.assertTrue( - redis.sismember(workers_by_queue_key % foo_queue.name, worker.key) - ) - self.assertTrue( - redis.sismember(workers_by_queue_key % bar_queue.name, worker.key) - ) - - unregister(worker) - self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key)) - self.assertFalse( - redis.sismember(workers_by_queue_key % foo_queue.name, worker.key) - ) - self.assertFalse( - redis.sismember(workers_by_queue_key % bar_queue.name, worker.key) - ) |