summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-11-08 11:16:23 +0700
committerGitHub <noreply@github.com>2020-11-08 11:16:23 +0700
commitd1528d776de9905fabb51bccaa731441e7b41f67 (patch)
tree9c955946e8905d66193728df16f082827fa2745c
parentb50f1e2121171a2079282753ef699c07bcfe8918 (diff)
downloadrq-d1528d776de9905fabb51bccaa731441e7b41f67.tar.gz
Release scheduler lock when running in burst mode (#1374)
* Fixed an issue where scheduler lock is not release when running worker in burst mode * Remove unused import
-rw-r--r--rq/scheduler.py7
-rw-r--r--rq/worker.py2
-rw-r--r--tests/test_scheduler.py1
3 files changed, 8 insertions, 2 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py
index a041a71..afa68e0 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -177,9 +177,14 @@ class RQScheduler(object):
def stop(self):
self.log.info("Scheduler stopping, releasing locks for %s...",
','.join(self._queue_names))
+ self.release_locks()
+ self._status = self.Status.STOPPED
+
+ def release_locks(self):
+ """Release acquired locks"""
keys = [self.get_locking_key(name) for name in self._queue_names]
self.connection.delete(*keys)
- self._status = self.Status.STOPPED
+ self._acquired_locks = set()
def start(self):
self._status = self.Status.STARTED
diff --git a/rq/worker.py b/rq/worker.py
index 1e2dba5..3b4aade 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -3,7 +3,6 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import errno
-import json
import logging
import os
import random
@@ -551,6 +550,7 @@ class Worker(object):
# before working. Otherwise, start scheduler in a separate process
if burst:
self.scheduler.enqueue_scheduled_jobs()
+ self.scheduler.release_locks()
else:
self.scheduler.start()
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 13c68e2..b1de0e7 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -268,6 +268,7 @@ class TestWorker(RQTestCase):
worker = Worker(queues=[queue], connection=self.testconn)
worker.work(burst=True, with_scheduler=True)
self.assertIsNotNone(worker.scheduler)
+ self.assertIsNone(self.testconn.get(worker.scheduler.get_locking_key('default')))
@mock.patch.object(RQScheduler, 'acquire_locks')
def test_run_maintenance_tasks(self, mocked):