diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2019-08-03 19:06:33 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2019-08-03 19:06:33 +0700 |
commit | ce7d0ef4a8754394728b34c9cbd6b179dab4387d (patch) | |
tree | b3d4a33aa60d75fd6c3f729aa9e0bebf758e5430 | |
parent | 9877c9f1b7df293507e0661789a44476a8194335 (diff) | |
download | rq-ce7d0ef4a8754394728b34c9cbd6b179dab4387d.tar.gz |
Added scheduler.prepare_registries()
-rw-r--r-- | rq/registry.py | 3 | ||||
-rw-r--r-- | rq/scheduler.py | 20 | ||||
-rw-r--r-- | rq/worker.py | 14 | ||||
-rw-r--r-- | tests/test_scheduler.py | 16 |
4 files changed, 45 insertions, 8 deletions
diff --git a/rq/registry.py b/rq/registry.py index 75b18e7..a83b89a 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -34,6 +34,9 @@ class BaseRegistry(object): def __len__(self): """Returns the number of jobs in this registry""" return self.count + + def __eq__(self, other): + return (self.name == other.name and self.connection == other.connection) def __contains__(self, item): """ diff --git a/rq/scheduler.py b/rq/scheduler.py index 7095c85..452a735 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -28,11 +28,7 @@ class RQScheduler(object): def __init__(self, queues, connection, interval=1): self._queue_names = set(parse_names(queues)) self._acquired_locks = set([]) - self._scheduled_job_registries = [] - for name in self._queue_names: - self._scheduled_job_registries.append( - ScheduledJobRegistry(name, connection=connection) - ) + self._scheduled_job_registries = [] self.connection = connection self.interval = 1 self._stop_requested = False @@ -45,8 +41,18 @@ class RQScheduler(object): if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5): successful_locks.add(name) self._acquired_locks = self._acquired_locks.union(successful_locks) + if self._acquired_locks: + self.prepare_registries(self._acquired_locks) return successful_locks + def prepare_registries(self, queue_names): + """Prepare scheduled job registries for use""" + self._scheduled_job_registries = [] + for name in queue_names: + self._scheduled_job_registries.append( + ScheduledJobRegistry(name, connection=self.connection) + ) + @classmethod def get_locking_key(self, name): """Returns scheduler key for a given queue name""" @@ -99,7 +105,7 @@ class RQScheduler(object): pipeline.expire(key, self.interval + 5) pipeline.execute() else: - key = self.get_key(self._queue_names[0]) + key = self.get_key(next(iter(self._queue_names))) self.connection.expire(key, self.interval + 5) def stop(self): @@ -108,7 +114,7 @@ class RQScheduler(object): self.connection.delete(*keys) def start(self): - self._install_signal_handlers() + # self._install_signal_handlers() thread = threading.Thread(target=run, args=(self,), daemon=True) thread.start() diff --git a/rq/worker.py b/rq/worker.py index a321205..369afc5 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -35,6 +35,7 @@ from .logutils import setup_loghandlers from .queue import Queue from .registry import (FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, clean_registries) +from .scheduler import RQScheduler from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .utils import (backend_class, ensure_list, enum, @@ -202,6 +203,7 @@ class Worker(object): self.failed_job_count = 0 self.total_working_time = 0 self.birth_date = None + self.scheduler = None self.disable_default_exception_handler = disable_default_exception_handler @@ -397,6 +399,8 @@ class Worker(object): signal.signal(signal.SIGTERM, self.request_force_stop) self.handle_warm_shutdown_request() + if self.scheduler: + self.scheduler.request_stop(signum, frame) # If shutdown is requested in the middle of a job, wait until # finish before shutting down and save the request in redis @@ -435,7 +439,7 @@ class Worker(object): self.set_state(before_state) def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None): + log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -453,6 +457,12 @@ class Worker(object): qnames = self.queue_names() self.log.info('*** Listening on %s...', green(', '.join(qnames))) + if with_scheduler: + self.log.info('Starting scheduler for %s...', green(', '.join(qnames))) + self.scheduler = RQScheduler(self.queues, connection=self.connection) + self.scheduler.acquire_locks() + self.scheduler.start() + try: while True: try: @@ -460,6 +470,8 @@ class Worker(object): if self.should_run_maintenance_tasks: self.clean_registries() + if self.scheduler: + self.scheduler.acquire_locks() if self._stop_requested: self.log.info('Worker %s: stopping on request', self.key) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 803d5df..8f07d77 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -62,6 +62,7 @@ class TestScheduledJobRegistry(RQTestCase): job.save() registry.schedule(job, datetime(2019, 1, 1)) scheduler = RQScheduler([queue], connection=self.testconn) + scheduler.acquire_locks() scheduler.enqueue_scheduled_jobs() self.assertEqual(len(queue), 1) @@ -72,6 +73,20 @@ class TestScheduledJobRegistry(RQTestCase): registry.schedule(job, datetime(2100, 1, 1)) scheduler.enqueue_scheduled_jobs() self.assertEqual(len(queue), 1) + + def test_prepare_registries(self): + """prepare_registries() creates self._scheduled_job_registries""" + foo_queue = Queue('foo', connection=self.testconn) + bar_queue = Queue('bar', connection=self.testconn) + scheduler = RQScheduler([foo_queue, bar_queue], connection=self.testconn) + self.assertEqual(scheduler._scheduled_job_registries, []) + scheduler.prepare_registries([foo_queue.name]) + self.assertEqual(scheduler._scheduled_job_registries, [ScheduledJobRegistry(queue=foo_queue)]) + scheduler.prepare_registries([foo_queue.name, bar_queue.name]) + self.assertEqual( + scheduler._scheduled_job_registries, + [ScheduledJobRegistry(queue=foo_queue), ScheduledJobRegistry(queue=bar_queue)] + ) def test_get_scheduled_time(self): """get_scheduled_time() returns job's scheduled datetime""" @@ -117,6 +132,7 @@ class TestQueue(RQTestCase): queue = Queue(connection=self.testconn) registry = ScheduledJobRegistry(queue=queue) scheduler = RQScheduler([queue], connection=self.testconn) + scheduler.acquire_locks() # Jobs created using enqueue_at is put in the ScheduledJobRegistry queue.enqueue_at(datetime(2019, 1, 1), say_hello) |