From cc70cacc1b87466920a737bdde28ac46a7f38346 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 7 Sep 2021 13:03:22 +0700 Subject: Add more tolerance to scheduler heartbeat (#1555) --- rq/scheduler.py | 6 +++--- tests/test_scheduler.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rq/scheduler.py b/rq/scheduler.py index ce5e754..850c1bc 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -106,7 +106,7 @@ class RQScheduler: pid = os.getpid() self.log.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) for name in self._queue_names: - if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=60): + if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60): successful_locks.add(name) # Always reset _scheduled_job_registries when acquiring locks @@ -186,11 +186,11 @@ class RQScheduler: with self.connection.pipeline() as pipeline: for name in self._queue_names: key = self.get_locking_key(name) - pipeline.expire(key, self.interval + 5) + pipeline.expire(key, self.interval + 60) pipeline.execute() else: key = self.get_locking_key(next(iter(self._queue_names))) - self.connection.expire(key, self.interval + 5) + self.connection.expire(key, self.interval + 60) def stop(self): self.log.info("Scheduler stopping, releasing locks for %s...", diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 0cdcec4..684a076 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -198,8 +198,8 @@ class TestScheduler(RQTestCase): pipeline.expire(locking_key_2, 1000) scheduler.heartbeat() - self.assertEqual(self.testconn.ttl(locking_key_1), 6) - self.assertEqual(self.testconn.ttl(locking_key_1), 6) + self.assertEqual(self.testconn.ttl(locking_key_1), 61) + self.assertEqual(self.testconn.ttl(locking_key_1), 61) # scheduler.stop() releases locks and sets status to STOPPED scheduler._status = scheduler.Status.WORKING @@ -213,7 +213,7 @@ class TestScheduler(RQTestCase): scheduler.acquire_locks() self.testconn.expire(locking_key_1, 1000) scheduler.heartbeat() - self.assertEqual(self.testconn.ttl(locking_key_1), 6) + self.assertEqual(self.testconn.ttl(locking_key_1), 61) def test_enqueue_scheduled_jobs(self): """Scheduler can enqueue scheduled jobs""" -- cgit v1.2.1