diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2020-11-08 11:34:34 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2020-11-08 11:34:34 +0700 |
commit | 6fab48845f4b4cefcd92bd6d072183786b26b2d5 (patch) | |
tree | 4379595a25d032e9a7c1b0d9dddb69565172e9e0 | |
parent | b3703b53573253efb51170657b97603fcc8f2ccb (diff) | |
parent | d1528d776de9905fabb51bccaa731441e7b41f67 (diff) | |
download | rq-6fab48845f4b4cefcd92bd6d072183786b26b2d5.tar.gz |
Merge branch 'master' of github.com:rq/rq
-rw-r--r-- | rq/scheduler.py | 7 | ||||
-rw-r--r-- | rq/worker.py | 2 | ||||
-rw-r--r-- | tests/test_scheduler.py | 1 |
3 files changed, 8 insertions, 2 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py index a041a71..afa68e0 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -177,9 +177,14 @@ class RQScheduler(object): def stop(self): self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names)) + self.release_locks() + self._status = self.Status.STOPPED + + def release_locks(self): + """Release acquired locks""" keys = [self.get_locking_key(name) for name in self._queue_names] self.connection.delete(*keys) - self._status = self.Status.STOPPED + self._acquired_locks = set() def start(self): self._status = self.Status.STARTED diff --git a/rq/worker.py b/rq/worker.py index 1e2dba5..3b4aade 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -3,7 +3,6 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import errno -import json import logging import os import random @@ -551,6 +550,7 @@ class Worker(object): # before working. Otherwise, start scheduler in a separate process if burst: self.scheduler.enqueue_scheduled_jobs() + self.scheduler.release_locks() else: self.scheduler.start() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 13c68e2..b1de0e7 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -268,6 +268,7 @@ class TestWorker(RQTestCase): worker = Worker(queues=[queue], connection=self.testconn) worker.work(burst=True, with_scheduler=True) self.assertIsNotNone(worker.scheduler) + self.assertIsNone(self.testconn.get(worker.scheduler.get_locking_key('default'))) @mock.patch.object(RQScheduler, 'acquire_locks') def test_run_maintenance_tasks(self, mocked): |