summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2019-09-21 10:54:49 +0700
committerSelwin Ong <selwin.ong@gmail.com>2019-09-21 10:54:49 +0700
commit040e05a90ce3b2c18d3d238189cd2e4f4a4ca154 (patch)
tree39837e0019f1f1d161f0c05fccfc5a479e25b43d
parentd6b1746d46a618d5c0531d64f85eef14790d1261 (diff)
downloadrq-040e05a90ce3b2c18d3d238189cd2e4f4a4ca154.tar.gz
Added `auto_start` argument to scheduler.acquire_locks()
-rw-r--r--rq/scheduler.py9
-rw-r--r--rq/worker.py6
-rw-r--r--tests/test_scheduler.py16
3 files changed, 26 insertions, 5 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py
index dce03c4..2fb8f4f 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -55,7 +55,7 @@ class RQScheduler(object):
def status(self):
return self._status
- def acquire_locks(self):
+ def acquire_locks(self, auto_start=False):
"""Returns names of queue it successfully acquires lock on"""
successful_locks = set([])
pid = os.getpid()
@@ -65,6 +65,13 @@ class RQScheduler(object):
self._acquired_locks = self._acquired_locks.union(successful_locks)
if self._acquired_locks:
self.prepare_registries(self._acquired_locks)
+
+ # If auto_start is requested and scheduler is not started,
+ # run self.start()
+ if self._acquired_locks and auto_start:
+ if not self._process:
+ self.start()
+
return successful_locks
def prepare_registries(self, queue_names):
diff --git a/rq/worker.py b/rq/worker.py
index afddc31..1e03d25 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -460,9 +460,7 @@ class Worker(object):
if with_scheduler:
self.scheduler = RQScheduler(self.queues, connection=self.connection)
- self.scheduler.acquire_locks()
- if self.scheduler.acquired_locks:
- self.scheduler.start()
+ self.scheduler.acquire_locks(auto_start=True)
self._install_signal_handlers()
@@ -474,7 +472,7 @@ class Worker(object):
if self.should_run_maintenance_tasks:
self.clean_registries()
if self.scheduler:
- self.scheduler.acquire_locks()
+ self.scheduler.acquire_locks(auto_start=True)
if self._stop_requested:
self.log.info('Worker %s: stopping on request', self.key)
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 7a4f792..be30913 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -13,6 +13,8 @@ from rq.worker import Worker
from .fixtures import kill_worker, say_hello
from tests import RQTestCase
+import mock
+
class TestScheduledJobRegistry(RQTestCase):
@@ -98,6 +100,20 @@ class TestScheduler(RQTestCase):
self.assertEqual(scheduler.acquire_locks(), {name_3})
self.assertEqual(scheduler._acquired_locks, {name_2, name_3})
+ def test_lock_acquisition_with_auto_start(self):
+ """Test lock acquisition with auto_start=True"""
+ scheduler = RQScheduler(['auto-start'], self.testconn)
+ with mock.patch.object(scheduler, 'start') as mocked:
+ scheduler.acquire_locks(auto_start=True)
+ self.assertEqual(mocked.call_count, 1)
+
+ # If process has started, scheduler.start() won't be called
+ scheduler = RQScheduler(['auto-start2'], self.testconn)
+ scheduler._process = 1
+ with mock.patch.object(scheduler, 'start') as mocked:
+ scheduler.acquire_locks(auto_start=True)
+ self.assertEqual(mocked.call_count, 0)
+
def test_heartbeat(self):
"""Test that heartbeat updates locking keys TTL"""
name_1 = 'lock-test-1'