summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rq/worker.py4
-rw-r--r--rq/worker_registration.py32
-rw-r--r--tests/test_worker_registration.py67
-rw-r--r--tests/test_worker_registry.py33
4 files changed, 89 insertions, 47 deletions
diff --git a/rq/worker.py b/rq/worker.py
index 6912e8c..ca3f697 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -91,7 +91,7 @@ WorkerStatus = enum(
class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:'
- redis_workers_keys = 'rq:workers'
+ redis_workers_keys = worker_registration.REDIS_WORKER_KEYS
death_penalty_class = UnixSignalDeathPenalty
queue_class = Queue
job_class = Job
@@ -126,7 +126,7 @@ class Worker(object):
"""
prefix = cls.redis_worker_namespace_prefix
if not worker_key.startswith(prefix):
- raise ValueError('Not a valid RQ worker key: {0}'.format(worker_key))
+ raise ValueError('Not a valid RQ worker key: %s' % worker_key)
if connection is None:
connection = get_current_connection()
diff --git a/rq/worker_registration.py b/rq/worker_registration.py
index a5c240b..5c2b578 100644
--- a/rq/worker_registration.py
+++ b/rq/worker_registration.py
@@ -1,34 +1,42 @@
-# from .worker import Worker
+from .compat import as_text
-workers_by_queue_key = 'rq:workers:%s'
+WORKERS_BY_QUEUE_KEY = 'rq:workers:%s'
+REDIS_WORKER_KEYS = 'rq:workers'
def register(worker, pipeline=None):
- """
- Store worker key in Redis data structures so we can easily discover
- all active workers.
- """
+ """Store worker key in Redis so we can easily discover active workers."""
connection = pipeline if pipeline is not None else worker.connection
connection.sadd(worker.redis_workers_keys, worker.key)
for name in worker.queue_names():
- redis_key = workers_by_queue_key % name
+ redis_key = WORKERS_BY_QUEUE_KEY % name
connection.sadd(redis_key, worker.key)
def unregister(worker, pipeline=None):
- """
- Remove worker key from Redis.
- """
+ """Remove worker key from Redis."""
if pipeline is None:
connection = worker.connection._pipeline()
else:
connection = pipeline
-
+
connection.srem(worker.redis_workers_keys, worker.key)
for name in worker.queue_names():
- redis_key = workers_by_queue_key % name
+ redis_key = WORKERS_BY_QUEUE_KEY % name
connection.srem(redis_key, worker.key)
if pipeline is None:
connection.execute()
+
+
+def get_keys(queue=None, connection=None):
+ """Returns a list of worker keys for a queue"""
+ if queue:
+ redis = queue.connection
+ redis_key = WORKERS_BY_QUEUE_KEY % queue.name
+ else:
+ redis = connection
+ redis_key = REDIS_WORKER_KEYS
+
+ return {as_text(key) for key in redis.smembers(redis_key)}
diff --git a/tests/test_worker_registration.py b/tests/test_worker_registration.py
new file mode 100644
index 0000000..523d69f
--- /dev/null
+++ b/tests/test_worker_registration.py
@@ -0,0 +1,67 @@
+from tests import RQTestCase
+
+from rq import Queue, Worker
+from rq.worker_registration import (get_keys, register, unregister,
+ WORKERS_BY_QUEUE_KEY)
+
+
+class TestWorkerRegistry(RQTestCase):
+
+ def test_worker_registration(self):
+ """Ensure worker.key is correctly set in Redis."""
+ foo_queue = Queue(name='foo')
+ bar_queue = Queue(name='bar')
+ worker = Worker([foo_queue, bar_queue])
+
+ register(worker)
+ redis = worker.connection
+
+ self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key))
+ self.assertTrue(
+ redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key)
+ )
+ self.assertTrue(
+ redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key)
+ )
+
+ unregister(worker)
+ self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key))
+ self.assertFalse(
+ redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key)
+ )
+ self.assertFalse(
+ redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key)
+ )
+
+ def test_get_keys_by_queue(self):
+ """get_keys_by_queue only returns active workers for that queue"""
+ foo_queue = Queue(name='foo')
+ bar_queue = Queue(name='bar')
+ baz_queue = Queue(name='baz')
+
+ worker1 = Worker([foo_queue, bar_queue])
+ worker2 = Worker([foo_queue])
+ worker3 = Worker([baz_queue])
+
+ self.assertEqual(set(), get_keys(foo_queue))
+
+ register(worker1)
+ register(worker2)
+ register(worker3)
+
+ # get_keys(queue) will return worker keys for that queue
+ self.assertEqual(
+ set([worker1.key, worker2.key]),
+ get_keys(foo_queue)
+ )
+ self.assertEqual(set([worker1.key]), get_keys(bar_queue))
+
+ # get_keys(connection=connection) will return all worker keys
+ self.assertEqual(
+ set([worker1.key, worker2.key, worker3.key]),
+ get_keys(connection=worker1.connection)
+ )
+
+ unregister(worker1)
+ unregister(worker2)
+ unregister(worker3)
diff --git a/tests/test_worker_registry.py b/tests/test_worker_registry.py
deleted file mode 100644
index 8393348..0000000
--- a/tests/test_worker_registry.py
+++ /dev/null
@@ -1,33 +0,0 @@
-from tests import RQTestCase
-
-from rq import Queue, Worker
-from rq.worker_registration import register, unregister, workers_by_queue_key
-
-
-class TestWorkerRegistry(RQTestCase):
-
- def test_worker_registration(self):
- """Ensure worker.key is correctly set in Redis."""
- foo_queue = Queue(name='foo')
- bar_queue = Queue(name='bar')
- worker = Worker([foo_queue, bar_queue])
-
- register(worker)
- redis = worker.connection
-
- self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key))
- self.assertTrue(
- redis.sismember(workers_by_queue_key % foo_queue.name, worker.key)
- )
- self.assertTrue(
- redis.sismember(workers_by_queue_key % bar_queue.name, worker.key)
- )
-
- unregister(worker)
- self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key))
- self.assertFalse(
- redis.sismember(workers_by_queue_key % foo_queue.name, worker.key)
- )
- self.assertFalse(
- redis.sismember(workers_by_queue_key % bar_queue.name, worker.key)
- )