summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2019-04-13 18:53:57 +0700
committerSelwin Ong <selwin.ong@gmail.com>2019-04-13 18:53:57 +0700
commit1164c06312f4e041f5028331469703c77a944536 (patch)
treec3b149834b1acc337c1b4d341520558d186b947b
parentb51c786e5d2587b4e5fd5d961b28af2b3523ce3e (diff)
downloadrq-1164c06312f4e041f5028331469703c77a944536.tar.gz
First RQScheduler prototype
-rw-r--r--rq/registry.py37
-rw-r--r--rq/scheduler.py79
-rw-r--r--tests/test_scheduler.py61
3 files changed, 177 insertions, 0 deletions
diff --git a/rq/registry.py b/rq/registry.py
index 85fefc2..151b1db 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -212,6 +212,43 @@ class DeferredJobRegistry(BaseRegistry):
pass
+class ScheduledJobRegistry(BaseRegistry):
+ """
+ Registry of scheduled jobs.
+ """
+ key_template = 'rq:scheduled:{0}'
+
+ def __init__(self, *args, **kwargs):
+ super(ScheduledJobRegistry, self).__init__(*args, **kwargs)
+ # The underlying implementation of get_jobs_to_enqueue() is
+ # the same as get_expired_job_ids, but get_expired_job_ids() doesn't
+ # make sense in this context
+ self.get_jobs_to_enqueue = self.get_expired_job_ids
+
+ def cleanup(self):
+ """This method is only here to prevent errors because this method is
+ automatically called by `count()` and `get_job_ids()` methods
+ implemented in BaseRegistry."""
+ pass
+
+ def remove_jobs(self, timestamp=None, pipeline=None):
+ """Remove jobs whose timestamp is in the past from registry."""
+ connection = pipeline if pipeline is not None else self.connection
+ score = timestamp if timestamp is not None else current_timestamp()
+ return connection.zremrangebyscore(self.key, 0, score)
+
+ def get_jobs_to_schedule(self, timestamp=None):
+ """Remove jobs whose timestamp is in the past from registry."""
+ score = timestamp if timestamp is not None else current_timestamp()
+ return [as_text(job_id) for job_id in
+ self.connection.zrangebyscore(self.key, 0, score)]
+
+ def acquire_lock(self):
+ """Returns True if lock is successfully acquired"""
+ key = '%s:lock' % self.key
+ return self.connection.set(key, 1, ex=10, nx=True)
+
+
def clean_registries(queue):
"""Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name,
diff --git a/rq/scheduler.py b/rq/scheduler.py
new file mode 100644
index 0000000..21d4727
--- /dev/null
+++ b/rq/scheduler.py
@@ -0,0 +1,79 @@
+import os
+
+from .job import Job
+from .queue import Queue
+from .registry import ScheduledJobRegistry
+from .utils import current_timestamp
+
+
+SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
+SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s'
+
+
+class RQScheduler(object):
+
+ def __init__(self, queues, connection, interval=1):
+ self._queue_names = parse_names(queues)
+ self._scheduled_job_registries = []
+ for name in self._queue_names:
+ self._scheduled_job_registries.append(
+ ScheduledJobRegistry(name, connection=connection)
+ )
+ self.connection = connection
+ self.interval = 1
+
+ @classmethod
+ def acquire_lock(cls, name, connection):
+ """Returns True if lock is successfully acquired"""
+ return connection.set(
+ cls.get_locking_key(name), os.getpid(), nx=True, ex=5
+ )
+
+ @classmethod
+ def get_locking_key(self, name):
+ """Returns scheduler key for a given queue name"""
+ return SCHEDULER_LOCKING_KEY_TEMPLATE % name
+
+ def get_key(self, name):
+ """Returns scheduler key for a given queue name"""
+ return SCHEDULER_KEY_TEMPLATE % name
+
+ def enqueue_scheduled_jobs(self):
+ """Enqueue jobs whose timestamp is in the past"""
+ for registry in self._scheduled_job_registries:
+ timestamp = current_timestamp()
+ job_ids = registry.get_jobs_to_schedule(timestamp)
+
+ if not job_ids:
+ continue
+
+ queue = Queue(registry.name, connection=self.connection)
+
+ with self.connection.pipeline() as pipeline:
+ # This should be done in bulk
+ for job_id in job_ids:
+ job = Job.fetch(job_id, connection=self.connection)
+ queue.enqueue_job(job, pipeline=pipeline)
+ registry.remove_jobs(timestamp)
+ pipeline.execute()
+
+ def heartbeart(self):
+ """Updates the TTL on scheduler keys"""
+ if len(self._queue_names) > 1:
+ with self.connection.pipeline() as pipeline:
+ for name in self._queue_names:
+ pipeline.expire(self.interval + 5)
+ pipeline.execute()
+ else:
+ self.connection.expire(self.interval + 5)
+
+
+def parse_names(queues_or_names):
+ """Given a list of strings or queues, returns queue names"""
+ names = []
+ for queue_or_name in queues_or_names:
+ if isinstance(queue_or_name, Queue):
+ names.append(queue_or_name.name)
+ else:
+ names.append(str(queue_or_name))
+ return names
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
new file mode 100644
index 0000000..6e41875
--- /dev/null
+++ b/tests/test_scheduler.py
@@ -0,0 +1,61 @@
+from rq import Queue
+from rq.job import Job
+from rq.registry import ScheduledJobRegistry
+from rq.scheduler import RQScheduler
+from rq.utils import current_timestamp
+
+from .fixtures import say_hello
+from tests import RQTestCase
+
+
+class TestScheduler(RQTestCase):
+
+ def test_init(self):
+ """Scheduler can be instantiated with queues or queue names"""
+ foo_queue = Queue('foo', connection=self.testconn)
+ # bar_queue = Queue('bar', connection=self.testconn)
+ scheduler = RQScheduler([foo_queue, 'bar'], connection=self.testconn)
+ self.assertEqual(scheduler._queue_names, ['foo', 'bar'])
+
+ def test_get_jobs_to_enqueue(self):
+ """Getting job ids to enqueue from ScheduledJobRegistry."""
+ queue = Queue(connection=self.testconn)
+ registry = ScheduledJobRegistry(queue=queue)
+ timestamp = current_timestamp()
+
+ self.testconn.zadd(registry.key, {'foo': 1})
+ self.testconn.zadd(registry.key, {'bar': timestamp + 10})
+ self.testconn.zadd(registry.key, {'baz': timestamp + 30})
+
+ self.assertEqual(registry.get_jobs_to_enqueue(), ['foo'])
+ self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20),
+ ['foo', 'bar'])
+
+ def test_lock_acquisition(self):
+ """Test lock acquisition"""
+ name = 'lock-test'
+ self.assertTrue(RQScheduler.acquire_lock(name, self.testconn))
+ self.assertFalse(RQScheduler.acquire_lock(name, self.testconn))
+
+ # If key is manually deleted, lock acquisition should be successful again
+ self.testconn.delete(RQScheduler.get_locking_key(name))
+ self.assertTrue(RQScheduler.acquire_lock(name, self.testconn))
+
+ def test_enqueue_scheduled_jobs(self):
+ queue = Queue(connection=self.testconn)
+ registry = ScheduledJobRegistry(queue=queue)
+ job = Job.create(
+ 'myfunc',
+ args=[12, "☃"],
+ kwargs=dict(snowman="☃", null=None),
+ connection=self.testconn,
+ )
+ job.save()
+ registry.add(job, 0)
+ print('JOB_IDS', registry.get_expired_job_ids())
+ scheduler = RQScheduler([queue], connection=self.testconn)
+ scheduler.enqueue_scheduled_jobs()
+ self.assertEqual(len(queue), 1)
+
+ # After job is scheduled, registry should be empty
+ self.assertEqual(len(registry), 0)