summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2019-08-03 19:06:33 +0700
committerSelwin Ong <selwin.ong@gmail.com>2019-08-03 19:06:33 +0700
commitce7d0ef4a8754394728b34c9cbd6b179dab4387d (patch)
treeb3d4a33aa60d75fd6c3f729aa9e0bebf758e5430
parent9877c9f1b7df293507e0661789a44476a8194335 (diff)
downloadrq-ce7d0ef4a8754394728b34c9cbd6b179dab4387d.tar.gz
Added scheduler.prepare_registries()
-rw-r--r--rq/registry.py3
-rw-r--r--rq/scheduler.py20
-rw-r--r--rq/worker.py14
-rw-r--r--tests/test_scheduler.py16
4 files changed, 45 insertions, 8 deletions
diff --git a/rq/registry.py b/rq/registry.py
index 75b18e7..a83b89a 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -34,6 +34,9 @@ class BaseRegistry(object):
def __len__(self):
"""Returns the number of jobs in this registry"""
return self.count
+
+ def __eq__(self, other):
+ return (self.name == other.name and self.connection == other.connection)
def __contains__(self, item):
"""
diff --git a/rq/scheduler.py b/rq/scheduler.py
index 7095c85..452a735 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -28,11 +28,7 @@ class RQScheduler(object):
def __init__(self, queues, connection, interval=1):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set([])
- self._scheduled_job_registries = []
- for name in self._queue_names:
- self._scheduled_job_registries.append(
- ScheduledJobRegistry(name, connection=connection)
- )
+ self._scheduled_job_registries = []
self.connection = connection
self.interval = 1
self._stop_requested = False
@@ -45,8 +41,18 @@ class RQScheduler(object):
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5):
successful_locks.add(name)
self._acquired_locks = self._acquired_locks.union(successful_locks)
+ if self._acquired_locks:
+ self.prepare_registries(self._acquired_locks)
return successful_locks
+ def prepare_registries(self, queue_names):
+ """Prepare scheduled job registries for use"""
+ self._scheduled_job_registries = []
+ for name in queue_names:
+ self._scheduled_job_registries.append(
+ ScheduledJobRegistry(name, connection=self.connection)
+ )
+
@classmethod
def get_locking_key(self, name):
"""Returns scheduler key for a given queue name"""
@@ -99,7 +105,7 @@ class RQScheduler(object):
pipeline.expire(key, self.interval + 5)
pipeline.execute()
else:
- key = self.get_key(self._queue_names[0])
+ key = self.get_key(next(iter(self._queue_names)))
self.connection.expire(key, self.interval + 5)
def stop(self):
@@ -108,7 +114,7 @@ class RQScheduler(object):
self.connection.delete(*keys)
def start(self):
- self._install_signal_handlers()
+ # self._install_signal_handlers()
thread = threading.Thread(target=run, args=(self,), daemon=True)
thread.start()
diff --git a/rq/worker.py b/rq/worker.py
index a321205..369afc5 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -35,6 +35,7 @@ from .logutils import setup_loghandlers
from .queue import Queue
from .registry import (FailedJobRegistry, FinishedJobRegistry,
StartedJobRegistry, clean_registries)
+from .scheduler import RQScheduler
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import (backend_class, ensure_list, enum,
@@ -202,6 +203,7 @@ class Worker(object):
self.failed_job_count = 0
self.total_working_time = 0
self.birth_date = None
+ self.scheduler = None
self.disable_default_exception_handler = disable_default_exception_handler
@@ -397,6 +399,8 @@ class Worker(object):
signal.signal(signal.SIGTERM, self.request_force_stop)
self.handle_warm_shutdown_request()
+ if self.scheduler:
+ self.scheduler.request_stop(signum, frame)
# If shutdown is requested in the middle of a job, wait until
# finish before shutting down and save the request in redis
@@ -435,7 +439,7 @@ class Worker(object):
self.set_state(before_state)
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
- log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None):
+ log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False):
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
@@ -453,6 +457,12 @@ class Worker(object):
qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames)))
+ if with_scheduler:
+ self.log.info('Starting scheduler for %s...', green(', '.join(qnames)))
+ self.scheduler = RQScheduler(self.queues, connection=self.connection)
+ self.scheduler.acquire_locks()
+ self.scheduler.start()
+
try:
while True:
try:
@@ -460,6 +470,8 @@ class Worker(object):
if self.should_run_maintenance_tasks:
self.clean_registries()
+ if self.scheduler:
+ self.scheduler.acquire_locks()
if self._stop_requested:
self.log.info('Worker %s: stopping on request', self.key)
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 803d5df..8f07d77 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -62,6 +62,7 @@ class TestScheduledJobRegistry(RQTestCase):
job.save()
registry.schedule(job, datetime(2019, 1, 1))
scheduler = RQScheduler([queue], connection=self.testconn)
+ scheduler.acquire_locks()
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
@@ -72,6 +73,20 @@ class TestScheduledJobRegistry(RQTestCase):
registry.schedule(job, datetime(2100, 1, 1))
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
+
+ def test_prepare_registries(self):
+ """prepare_registries() creates self._scheduled_job_registries"""
+ foo_queue = Queue('foo', connection=self.testconn)
+ bar_queue = Queue('bar', connection=self.testconn)
+ scheduler = RQScheduler([foo_queue, bar_queue], connection=self.testconn)
+ self.assertEqual(scheduler._scheduled_job_registries, [])
+ scheduler.prepare_registries([foo_queue.name])
+ self.assertEqual(scheduler._scheduled_job_registries, [ScheduledJobRegistry(queue=foo_queue)])
+ scheduler.prepare_registries([foo_queue.name, bar_queue.name])
+ self.assertEqual(
+ scheduler._scheduled_job_registries,
+ [ScheduledJobRegistry(queue=foo_queue), ScheduledJobRegistry(queue=bar_queue)]
+ )
def test_get_scheduled_time(self):
"""get_scheduled_time() returns job's scheduled datetime"""
@@ -117,6 +132,7 @@ class TestQueue(RQTestCase):
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
scheduler = RQScheduler([queue], connection=self.testconn)
+ scheduler.acquire_locks()
# Jobs created using enqueue_at is put in the ScheduledJobRegistry
queue.enqueue_at(datetime(2019, 1, 1), say_hello)