summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxzander <xznder@gmail.com>2023-05-18 03:59:51 +0400
committerGitHub <noreply@github.com>2023-05-18 06:59:51 +0700
commitbdbc9a4f9ca79727316f7e41473981593773e34c (patch)
treef04cc0d56913a6f34f1ba71576a4872235bfd8f9
parentea063edf0a790630d0800808fe6236b3e9ddcf22 (diff)
downloadrq-bdbc9a4f9ca79727316f7e41473981593773e34c.tar.gz
Scheduler should release and heartbeat only acquired locks (#1914)
* Scheduler should release and heartbeat only acquired locks. * Added tests for heartbeat and release only acquired locks. * Changed test description to correct one.
-rw-r--r--rq/scheduler.py10
-rw-r--r--tests/test_scheduler.py39
2 files changed, 42 insertions, 7 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py
index 97d627c..175f607 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -171,24 +171,24 @@ class RQScheduler:
def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks"""
self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks))
- if len(self._queue_names) > 1:
+ if len(self._acquired_locks) > 1:
with self.connection.pipeline() as pipeline:
for name in self._acquired_locks:
key = self.get_locking_key(name)
pipeline.expire(key, self.interval + 60)
pipeline.execute()
- else:
- key = self.get_locking_key(next(iter(self._queue_names)))
+ elif self._acquired_locks:
+ key = self.get_locking_key(next(iter(self._acquired_locks)))
self.connection.expire(key, self.interval + 60)
def stop(self):
- self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._queue_names))
+ self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._acquired_locks))
self.release_locks()
self._status = self.Status.STOPPED
def release_locks(self):
"""Release acquired locks"""
- keys = [self.get_locking_key(name) for name in self._queue_names]
+ keys = [self.get_locking_key(name) for name in self._acquired_locks]
self.connection.delete(*keys)
self._acquired_locks = set()
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 8aa722a..e6edf72 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -204,6 +204,33 @@ class TestScheduler(RQTestCase):
self.assertEqual(mocked.call_count, 1)
self.assertEqual(stopped_process.is_alive.call_count, 1)
+ def test_lock_release(self):
+ """Test that scheduler.release_locks() only releases acquired locks"""
+ name_1 = 'lock-test-1'
+ name_2 = 'lock-test-2'
+ scheduler_1 = RQScheduler([name_1], self.testconn)
+
+ self.assertEqual(scheduler_1.acquire_locks(), {name_1})
+ self.assertEqual(scheduler_1._acquired_locks, {name_1})
+
+ # Only name_2 is returned since name_1 is already locked
+ scheduler_1_2 = RQScheduler([name_1, name_2], self.testconn)
+ self.assertEqual(scheduler_1_2.acquire_locks(), {name_2})
+ self.assertEqual(scheduler_1_2._acquired_locks, {name_2})
+
+ self.assertTrue(self.testconn.exists(scheduler_1.get_locking_key(name_1)))
+ self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_1)))
+ self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_2)))
+
+ scheduler_1_2.release_locks()
+
+ self.assertEqual(scheduler_1_2._acquired_locks, set())
+ self.assertEqual(scheduler_1._acquired_locks, {name_1})
+
+ self.assertTrue(self.testconn.exists(scheduler_1.get_locking_key(name_1)))
+ self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_1)))
+ self.assertFalse(self.testconn.exists(scheduler_1_2.get_locking_key(name_2)))
+
def test_queue_scheduler_pid(self):
queue = Queue(connection=self.testconn)
scheduler = RQScheduler(
@@ -219,25 +246,33 @@ class TestScheduler(RQTestCase):
"""Test that heartbeat updates locking keys TTL"""
name_1 = 'lock-test-1'
name_2 = 'lock-test-2'
- scheduler = RQScheduler([name_1, name_2], self.testconn)
+ name_3 = 'lock-test-3'
+ scheduler = RQScheduler([name_3], self.testconn)
+ scheduler.acquire_locks()
+ scheduler = RQScheduler([name_1, name_2, name_3], self.testconn)
scheduler.acquire_locks()
locking_key_1 = RQScheduler.get_locking_key(name_1)
locking_key_2 = RQScheduler.get_locking_key(name_2)
+ locking_key_3 = RQScheduler.get_locking_key(name_3)
with self.testconn.pipeline() as pipeline:
pipeline.expire(locking_key_1, 1000)
pipeline.expire(locking_key_2, 1000)
+ pipeline.expire(locking_key_3, 1000)
+ pipeline.execute()
scheduler.heartbeat()
self.assertEqual(self.testconn.ttl(locking_key_1), 61)
- self.assertEqual(self.testconn.ttl(locking_key_1), 61)
+ self.assertEqual(self.testconn.ttl(locking_key_2), 61)
+ self.assertEqual(self.testconn.ttl(locking_key_3), 1000)
# scheduler.stop() releases locks and sets status to STOPPED
scheduler._status = scheduler.Status.WORKING
scheduler.stop()
self.assertFalse(self.testconn.exists(locking_key_1))
self.assertFalse(self.testconn.exists(locking_key_2))
+ self.assertTrue(self.testconn.exists(locking_key_3))
self.assertEqual(scheduler.status, scheduler.Status.STOPPED)
# Heartbeat also works properly for schedulers with a single queue