summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2021-09-07 13:03:22 +0700
committerGitHub <noreply@github.com>2021-09-07 13:03:22 +0700
commitcc70cacc1b87466920a737bdde28ac46a7f38346 (patch)
tree57696d3fe4d7783d56079380ffe7b489e7a3f7a6
parentc556106a38505ea7485daec9420c408f720d2961 (diff)
downloadrq-cc70cacc1b87466920a737bdde28ac46a7f38346.tar.gz
Add more tolerance to scheduler heartbeat (#1555)
-rw-r--r--rq/scheduler.py6
-rw-r--r--tests/test_scheduler.py6
2 files changed, 6 insertions, 6 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py
index ce5e754..850c1bc 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -106,7 +106,7 @@ class RQScheduler:
pid = os.getpid()
self.log.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
for name in self._queue_names:
- if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=60):
+ if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60):
successful_locks.add(name)
# Always reset _scheduled_job_registries when acquiring locks
@@ -186,11 +186,11 @@ class RQScheduler:
with self.connection.pipeline() as pipeline:
for name in self._queue_names:
key = self.get_locking_key(name)
- pipeline.expire(key, self.interval + 5)
+ pipeline.expire(key, self.interval + 60)
pipeline.execute()
else:
key = self.get_locking_key(next(iter(self._queue_names)))
- self.connection.expire(key, self.interval + 5)
+ self.connection.expire(key, self.interval + 60)
def stop(self):
self.log.info("Scheduler stopping, releasing locks for %s...",
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 0cdcec4..684a076 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -198,8 +198,8 @@ class TestScheduler(RQTestCase):
pipeline.expire(locking_key_2, 1000)
scheduler.heartbeat()
- self.assertEqual(self.testconn.ttl(locking_key_1), 6)
- self.assertEqual(self.testconn.ttl(locking_key_1), 6)
+ self.assertEqual(self.testconn.ttl(locking_key_1), 61)
+ self.assertEqual(self.testconn.ttl(locking_key_1), 61)
# scheduler.stop() releases locks and sets status to STOPPED
scheduler._status = scheduler.Status.WORKING
@@ -213,7 +213,7 @@ class TestScheduler(RQTestCase):
scheduler.acquire_locks()
self.testconn.expire(locking_key_1, 1000)
scheduler.heartbeat()
- self.assertEqual(self.testconn.ttl(locking_key_1), 6)
+ self.assertEqual(self.testconn.ttl(locking_key_1), 61)
def test_enqueue_scheduled_jobs(self):
"""Scheduler can enqueue scheduled jobs"""