diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2021-09-07 13:03:22 +0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-07 13:03:22 +0700 |
commit | cc70cacc1b87466920a737bdde28ac46a7f38346 (patch) | |
tree | 57696d3fe4d7783d56079380ffe7b489e7a3f7a6 | |
parent | c556106a38505ea7485daec9420c408f720d2961 (diff) | |
download | rq-cc70cacc1b87466920a737bdde28ac46a7f38346.tar.gz |
Add more tolerance to scheduler heartbeat (#1555)
-rw-r--r-- | rq/scheduler.py | 6 | ||||
-rw-r--r-- | tests/test_scheduler.py | 6 |
2 files changed, 6 insertions, 6 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py index ce5e754..850c1bc 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -106,7 +106,7 @@ class RQScheduler: pid = os.getpid() self.log.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) for name in self._queue_names: - if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=60): + if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60): successful_locks.add(name) # Always reset _scheduled_job_registries when acquiring locks @@ -186,11 +186,11 @@ class RQScheduler: with self.connection.pipeline() as pipeline: for name in self._queue_names: key = self.get_locking_key(name) - pipeline.expire(key, self.interval + 5) + pipeline.expire(key, self.interval + 60) pipeline.execute() else: key = self.get_locking_key(next(iter(self._queue_names))) - self.connection.expire(key, self.interval + 5) + self.connection.expire(key, self.interval + 60) def stop(self): self.log.info("Scheduler stopping, releasing locks for %s...", diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 0cdcec4..684a076 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -198,8 +198,8 @@ class TestScheduler(RQTestCase): pipeline.expire(locking_key_2, 1000) scheduler.heartbeat() - self.assertEqual(self.testconn.ttl(locking_key_1), 6) - self.assertEqual(self.testconn.ttl(locking_key_1), 6) + self.assertEqual(self.testconn.ttl(locking_key_1), 61) + self.assertEqual(self.testconn.ttl(locking_key_1), 61) # scheduler.stop() releases locks and sets status to STOPPED scheduler._status = scheduler.Status.WORKING @@ -213,7 +213,7 @@ class TestScheduler(RQTestCase): scheduler.acquire_locks() self.testconn.expire(locking_key_1, 1000) scheduler.heartbeat() - self.assertEqual(self.testconn.ttl(locking_key_1), 6) + self.assertEqual(self.testconn.ttl(locking_key_1), 61) def test_enqueue_scheduled_jobs(self): """Scheduler can enqueue scheduled jobs""" |