diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2019-09-25 09:53:39 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2019-09-25 09:53:39 +0700 |
commit | 6e38ce877a47432c67aa89f5772c3bb3b726c0ea (patch) | |
tree | d047b3ac3725ec557c457f5c2974df4c2af48a5c | |
parent | 0987e6692edcdab538e8438a979408a255ab3893 (diff) | |
download | rq-6e38ce877a47432c67aa89f5772c3bb3b726c0ea.tar.gz |
Scheduler should periodically try to acquire locks for other queues it doesn't have
-rw-r--r-- | rq/scheduler.py | 28 | ||||
-rw-r--r-- | rq/worker.py | 2 |
2 files changed, 24 insertions, 6 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py index 4b200a5..a2510aa 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -4,6 +4,7 @@ import signal import time import traceback +from datetime import datetime from multiprocessing import Process try: @@ -41,25 +42,36 @@ class RQScheduler(object): def __init__(self, queues, connection, interval=1): self._queue_names = set(parse_names(queues)) self._acquired_locks = set([]) - self._scheduled_job_registries = [] + self._scheduled_job_registries = [] + self.lock_acquisition_time = None self.connection = connection self.interval = interval self._stop_requested = False self._status = self.Status.STOPPED self._process = None - + @property def acquired_locks(self): return self._acquired_locks - + @property def status(self): return self._status + @property + def should_reacquire_locks(self): + """Returns True if lock_acquisition_time is longer than 15 minutes ago""" + if self._queue_names == self.acquired_locks: + return False + if not self.lock_acquisition_time: + return True + return (datetime.now() - self.lock_acquisition_time).total_seconds() > 900 + def acquire_locks(self, auto_start=False): """Returns names of queue it successfully acquires lock on""" successful_locks = set([]) pid = os.getpid() + logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) for name in self._queue_names: if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5): successful_locks.add(name) @@ -67,12 +79,14 @@ class RQScheduler(object): if self._acquired_locks: self.prepare_registries(self._acquired_locks) + self.lock_acquisition_time = datetime.now() + # If auto_start is requested and scheduler is not started, # run self.start() - if self._acquired_locks and auto_start: + if self._acquired_locks and auto_start: if not self._process: self.start() - + return successful_locks def prepare_registries(self, queue_names): @@ -159,6 +173,10 @@ class RQScheduler(object): if self._stop_requested: self.stop() break + + if self.should_reacquire_locks: + self.acquire_locks() + self.enqueue_scheduled_jobs() self.heartbeat() time.sleep(self.interval) diff --git a/rq/worker.py b/rq/worker.py index 49f5a46..2f14fd9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -450,7 +450,7 @@ class Worker(object): """ # No need to try to start scheduler on first run if self.last_cleaned_at: - if self.scheduler: + if self.scheduler and not self.scheduler._process: self.scheduler.acquire_locks(auto_start=True) self.clean_registries() |