diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2019-04-13 18:53:57 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2019-04-13 18:53:57 +0700 |
commit | 1164c06312f4e041f5028331469703c77a944536 (patch) | |
tree | c3b149834b1acc337c1b4d341520558d186b947b | |
parent | b51c786e5d2587b4e5fd5d961b28af2b3523ce3e (diff) | |
download | rq-1164c06312f4e041f5028331469703c77a944536.tar.gz |
First RQScheduler prototype
-rw-r--r-- | rq/registry.py | 37 | ||||
-rw-r--r-- | rq/scheduler.py | 79 | ||||
-rw-r--r-- | tests/test_scheduler.py | 61 |
3 files changed, 177 insertions, 0 deletions
diff --git a/rq/registry.py b/rq/registry.py index 85fefc2..151b1db 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -212,6 +212,43 @@ class DeferredJobRegistry(BaseRegistry): pass +class ScheduledJobRegistry(BaseRegistry): + """ + Registry of scheduled jobs. + """ + key_template = 'rq:scheduled:{0}' + + def __init__(self, *args, **kwargs): + super(ScheduledJobRegistry, self).__init__(*args, **kwargs) + # The underlying implementation of get_jobs_to_enqueue() is + # the same as get_expired_job_ids, but get_expired_job_ids() doesn't + # make sense in this context + self.get_jobs_to_enqueue = self.get_expired_job_ids + + def cleanup(self): + """This method is only here to prevent errors because this method is + automatically called by `count()` and `get_job_ids()` methods + implemented in BaseRegistry.""" + pass + + def remove_jobs(self, timestamp=None, pipeline=None): + """Remove jobs whose timestamp is in the past from registry.""" + connection = pipeline if pipeline is not None else self.connection + score = timestamp if timestamp is not None else current_timestamp() + return connection.zremrangebyscore(self.key, 0, score) + + def get_jobs_to_schedule(self, timestamp=None): + """Remove jobs whose timestamp is in the past from registry.""" + score = timestamp if timestamp is not None else current_timestamp() + return [as_text(job_id) for job_id in + self.connection.zrangebyscore(self.key, 0, score)] + + def acquire_lock(self): + """Returns True if lock is successfully acquired""" + key = '%s:lock' % self.key + return self.connection.set(key, 1, ex=10, nx=True) + + def clean_registries(queue): """Cleans StartedJobRegistry and FinishedJobRegistry of a queue.""" registry = FinishedJobRegistry(name=queue.name, diff --git a/rq/scheduler.py b/rq/scheduler.py new file mode 100644 index 0000000..21d4727 --- /dev/null +++ b/rq/scheduler.py @@ -0,0 +1,79 @@ +import os + +from .job import Job +from .queue import Queue +from .registry import ScheduledJobRegistry +from .utils import current_timestamp + + +SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s' +SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s' + + +class RQScheduler(object): + + def __init__(self, queues, connection, interval=1): + self._queue_names = parse_names(queues) + self._scheduled_job_registries = [] + for name in self._queue_names: + self._scheduled_job_registries.append( + ScheduledJobRegistry(name, connection=connection) + ) + self.connection = connection + self.interval = 1 + + @classmethod + def acquire_lock(cls, name, connection): + """Returns True if lock is successfully acquired""" + return connection.set( + cls.get_locking_key(name), os.getpid(), nx=True, ex=5 + ) + + @classmethod + def get_locking_key(self, name): + """Returns scheduler key for a given queue name""" + return SCHEDULER_LOCKING_KEY_TEMPLATE % name + + def get_key(self, name): + """Returns scheduler key for a given queue name""" + return SCHEDULER_KEY_TEMPLATE % name + + def enqueue_scheduled_jobs(self): + """Enqueue jobs whose timestamp is in the past""" + for registry in self._scheduled_job_registries: + timestamp = current_timestamp() + job_ids = registry.get_jobs_to_schedule(timestamp) + + if not job_ids: + continue + + queue = Queue(registry.name, connection=self.connection) + + with self.connection.pipeline() as pipeline: + # This should be done in bulk + for job_id in job_ids: + job = Job.fetch(job_id, connection=self.connection) + queue.enqueue_job(job, pipeline=pipeline) + registry.remove_jobs(timestamp) + pipeline.execute() + + def heartbeart(self): + """Updates the TTL on scheduler keys""" + if len(self._queue_names) > 1: + with self.connection.pipeline() as pipeline: + for name in self._queue_names: + pipeline.expire(self.interval + 5) + pipeline.execute() + else: + self.connection.expire(self.interval + 5) + + +def parse_names(queues_or_names): + """Given a list of strings or queues, returns queue names""" + names = [] + for queue_or_name in queues_or_names: + if isinstance(queue_or_name, Queue): + names.append(queue_or_name.name) + else: + names.append(str(queue_or_name)) + return names diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..6e41875 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,61 @@ +from rq import Queue +from rq.job import Job +from rq.registry import ScheduledJobRegistry +from rq.scheduler import RQScheduler +from rq.utils import current_timestamp + +from .fixtures import say_hello +from tests import RQTestCase + + +class TestScheduler(RQTestCase): + + def test_init(self): + """Scheduler can be instantiated with queues or queue names""" + foo_queue = Queue('foo', connection=self.testconn) + # bar_queue = Queue('bar', connection=self.testconn) + scheduler = RQScheduler([foo_queue, 'bar'], connection=self.testconn) + self.assertEqual(scheduler._queue_names, ['foo', 'bar']) + + def test_get_jobs_to_enqueue(self): + """Getting job ids to enqueue from ScheduledJobRegistry.""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + timestamp = current_timestamp() + + self.testconn.zadd(registry.key, {'foo': 1}) + self.testconn.zadd(registry.key, {'bar': timestamp + 10}) + self.testconn.zadd(registry.key, {'baz': timestamp + 30}) + + self.assertEqual(registry.get_jobs_to_enqueue(), ['foo']) + self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20), + ['foo', 'bar']) + + def test_lock_acquisition(self): + """Test lock acquisition""" + name = 'lock-test' + self.assertTrue(RQScheduler.acquire_lock(name, self.testconn)) + self.assertFalse(RQScheduler.acquire_lock(name, self.testconn)) + + # If key is manually deleted, lock acquisition should be successful again + self.testconn.delete(RQScheduler.get_locking_key(name)) + self.assertTrue(RQScheduler.acquire_lock(name, self.testconn)) + + def test_enqueue_scheduled_jobs(self): + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + job = Job.create( + 'myfunc', + args=[12, "☃"], + kwargs=dict(snowman="☃", null=None), + connection=self.testconn, + ) + job.save() + registry.add(job, 0) + print('JOB_IDS', registry.get_expired_job_ids()) + scheduler = RQScheduler([queue], connection=self.testconn) + scheduler.enqueue_scheduled_jobs() + self.assertEqual(len(queue), 1) + + # After job is scheduled, registry should be empty + self.assertEqual(len(registry), 0) |