diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2019-09-21 10:54:49 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2019-09-21 10:54:49 +0700 |
commit | 040e05a90ce3b2c18d3d238189cd2e4f4a4ca154 (patch) | |
tree | 39837e0019f1f1d161f0c05fccfc5a479e25b43d | |
parent | d6b1746d46a618d5c0531d64f85eef14790d1261 (diff) | |
download | rq-040e05a90ce3b2c18d3d238189cd2e4f4a4ca154.tar.gz |
Added `auto_start` argument to scheduler.acquire_locks()
-rw-r--r-- | rq/scheduler.py | 9 | ||||
-rw-r--r-- | rq/worker.py | 6 | ||||
-rw-r--r-- | tests/test_scheduler.py | 16 |
3 files changed, 26 insertions, 5 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py index dce03c4..2fb8f4f 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -55,7 +55,7 @@ class RQScheduler(object): def status(self): return self._status - def acquire_locks(self): + def acquire_locks(self, auto_start=False): """Returns names of queue it successfully acquires lock on""" successful_locks = set([]) pid = os.getpid() @@ -65,6 +65,13 @@ class RQScheduler(object): self._acquired_locks = self._acquired_locks.union(successful_locks) if self._acquired_locks: self.prepare_registries(self._acquired_locks) + + # If auto_start is requested and scheduler is not started, + # run self.start() + if self._acquired_locks and auto_start: + if not self._process: + self.start() + return successful_locks def prepare_registries(self, queue_names): diff --git a/rq/worker.py b/rq/worker.py index afddc31..1e03d25 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -460,9 +460,7 @@ class Worker(object): if with_scheduler: self.scheduler = RQScheduler(self.queues, connection=self.connection) - self.scheduler.acquire_locks() - if self.scheduler.acquired_locks: - self.scheduler.start() + self.scheduler.acquire_locks(auto_start=True) self._install_signal_handlers() @@ -474,7 +472,7 @@ class Worker(object): if self.should_run_maintenance_tasks: self.clean_registries() if self.scheduler: - self.scheduler.acquire_locks() + self.scheduler.acquire_locks(auto_start=True) if self._stop_requested: self.log.info('Worker %s: stopping on request', self.key) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 7a4f792..be30913 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -13,6 +13,8 @@ from rq.worker import Worker from .fixtures import kill_worker, say_hello from tests import RQTestCase +import mock + class TestScheduledJobRegistry(RQTestCase): @@ -98,6 +100,20 @@ class TestScheduler(RQTestCase): self.assertEqual(scheduler.acquire_locks(), {name_3}) self.assertEqual(scheduler._acquired_locks, {name_2, name_3}) + def test_lock_acquisition_with_auto_start(self): + """Test lock acquisition with auto_start=True""" + scheduler = RQScheduler(['auto-start'], self.testconn) + with mock.patch.object(scheduler, 'start') as mocked: + scheduler.acquire_locks(auto_start=True) + self.assertEqual(mocked.call_count, 1) + + # If process has started, scheduler.start() won't be called + scheduler = RQScheduler(['auto-start2'], self.testconn) + scheduler._process = 1 + with mock.patch.object(scheduler, 'start') as mocked: + scheduler.acquire_locks(auto_start=True) + self.assertEqual(mocked.call_count, 0) + def test_heartbeat(self): """Test that heartbeat updates locking keys TTL""" name_1 = 'lock-test-1' |