summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2019-09-25 09:53:39 +0700
committerSelwin Ong <selwin.ong@gmail.com>2019-09-25 09:53:39 +0700
commit6e38ce877a47432c67aa89f5772c3bb3b726c0ea (patch)
treed047b3ac3725ec557c457f5c2974df4c2af48a5c
parent0987e6692edcdab538e8438a979408a255ab3893 (diff)
downloadrq-6e38ce877a47432c67aa89f5772c3bb3b726c0ea.tar.gz
Scheduler should periodically try to acquire locks for other queues it doesn't have
-rw-r--r--rq/scheduler.py28
-rw-r--r--rq/worker.py2
2 files changed, 24 insertions, 6 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py
index 4b200a5..a2510aa 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -4,6 +4,7 @@ import signal
import time
import traceback
+from datetime import datetime
from multiprocessing import Process
try:
@@ -41,25 +42,36 @@ class RQScheduler(object):
def __init__(self, queues, connection, interval=1):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set([])
- self._scheduled_job_registries = []
+ self._scheduled_job_registries = []
+ self.lock_acquisition_time = None
self.connection = connection
self.interval = interval
self._stop_requested = False
self._status = self.Status.STOPPED
self._process = None
-
+
@property
def acquired_locks(self):
return self._acquired_locks
-
+
@property
def status(self):
return self._status
+ @property
+ def should_reacquire_locks(self):
+ """Returns True if lock_acquisition_time is longer than 15 minutes ago"""
+ if self._queue_names == self.acquired_locks:
+ return False
+ if not self.lock_acquisition_time:
+ return True
+ return (datetime.now() - self.lock_acquisition_time).total_seconds() > 900
+
def acquire_locks(self, auto_start=False):
"""Returns names of queue it successfully acquires lock on"""
successful_locks = set([])
pid = os.getpid()
+ logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
for name in self._queue_names:
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5):
successful_locks.add(name)
@@ -67,12 +79,14 @@ class RQScheduler(object):
if self._acquired_locks:
self.prepare_registries(self._acquired_locks)
+ self.lock_acquisition_time = datetime.now()
+
# If auto_start is requested and scheduler is not started,
# run self.start()
- if self._acquired_locks and auto_start:
+ if self._acquired_locks and auto_start:
if not self._process:
self.start()
-
+
return successful_locks
def prepare_registries(self, queue_names):
@@ -159,6 +173,10 @@ class RQScheduler(object):
if self._stop_requested:
self.stop()
break
+
+ if self.should_reacquire_locks:
+ self.acquire_locks()
+
self.enqueue_scheduled_jobs()
self.heartbeat()
time.sleep(self.interval)
diff --git a/rq/worker.py b/rq/worker.py
index 49f5a46..2f14fd9 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -450,7 +450,7 @@ class Worker(object):
"""
# No need to try to start scheduler on first run
if self.last_cleaned_at:
- if self.scheduler:
+ if self.scheduler and not self.scheduler._process:
self.scheduler.acquire_locks(auto_start=True)
self.clean_registries()