summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-07-12 11:00:40 +0700
committerSelwin Ong <selwin.ong@gmail.com>2020-07-12 11:00:40 +0700
commitb80e15745499860b616ad39529f520ac001882b8 (patch)
treebc255653999aa38b00392710a90fc19df7a89bb3
parent361db4aa83757c32e2b94a6daf5460d6a9767bd9 (diff)
downloadrq-fix-scheduler-onpy-38.tar.gz
Fix scheduler on Python 3.8fix-scheduler-onpy-38
-rw-r--r--rq/registry.py5
-rw-r--r--rq/scheduler.py35
-rw-r--r--rq/worker.py4
3 files changed, 34 insertions, 10 deletions
diff --git a/rq/registry.py b/rq/registry.py
index 9cb410c..579d0c5 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -37,7 +37,10 @@ class BaseRegistry(object):
return self.count
def __eq__(self, other):
- return (self.name == other.name and self.connection == other.connection)
+ return (
+ self.name == other.name and
+ self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs
+ )
def __contains__(self, item):
"""
diff --git a/rq/scheduler.py b/rq/scheduler.py
index cc1b999..4d2312f 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -13,6 +13,8 @@ from .registry import ScheduledJobRegistry
from .utils import current_timestamp, enum
from .logutils import setup_loghandlers
+from redis import Redis
+
SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s'
@@ -43,11 +45,19 @@ class RQScheduler(object):
self._acquired_locks = set()
self._scheduled_job_registries = []
self.lock_acquisition_time = None
- self.connection = connection
+ self._connection_kwargs = connection.connection_pool.connection_kwargs
+ self._connection = None
self.interval = interval
self._stop_requested = False
self._status = self.Status.STOPPED
self._process = None
+
+ @property
+ def connection(self):
+ if self._connection:
+ return self._connection
+ self._connection = Redis(**self._connection_kwargs)
+ return Redis(**self._connection_kwargs)
@property
def acquired_locks(self):
@@ -59,12 +69,12 @@ class RQScheduler(object):
@property
def should_reacquire_locks(self):
- """Returns True if lock_acquisition_time is longer than 15 minutes ago"""
+ """Returns True if lock_acquisition_time is longer than 10 minutes ago"""
if self._queue_names == self.acquired_locks:
return False
if not self.lock_acquisition_time:
return True
- return (datetime.now() - self.lock_acquisition_time).total_seconds() > 900
+ return (datetime.now() - self.lock_acquisition_time).total_seconds() > 600
def acquire_locks(self, auto_start=False):
"""Returns names of queue it successfully acquires lock on"""
@@ -74,9 +84,10 @@ class RQScheduler(object):
for name in self._queue_names:
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5):
successful_locks.add(name)
- self._acquired_locks = self._acquired_locks.union(successful_locks)
- if self._acquired_locks:
- self.prepare_registries(self._acquired_locks)
+
+ # Always reset _scheduled_job_registries when acquiring locks
+ self._scheduled_job_registries = []
+ self._acquired_locks = self._acquired_locks.union(successful_locks)
self.lock_acquisition_time = datetime.now()
@@ -88,9 +99,11 @@ class RQScheduler(object):
return successful_locks
- def prepare_registries(self, queue_names):
+ def prepare_registries(self, queue_names=None):
"""Prepare scheduled job registries for use"""
self._scheduled_job_registries = []
+ if not queue_names:
+ queue_names = self._acquired_locks
for name in queue_names:
self._scheduled_job_registries.append(
ScheduledJobRegistry(name, connection=self.connection)
@@ -104,6 +117,10 @@ class RQScheduler(object):
def enqueue_scheduled_jobs(self):
"""Enqueue jobs whose timestamp is in the past"""
self._status = self.Status.WORKING
+
+ if not self._scheduled_job_registries and self._acquired_locks:
+ self.prepare_registries()
+
for registry in self._scheduled_job_registries:
timestamp = current_timestamp()
@@ -158,12 +175,16 @@ class RQScheduler(object):
def start(self):
self._status = self.Status.STARTED
+ # Redis instance can't be pickled across processes so we need to
+ # clean this up before forking
+ self._connection = None
self._process = Process(target=run, args=(self,), name='Scheduler')
self._process.start()
return self._process
def work(self):
self._install_signal_handlers()
+
while True:
if self._stop_requested:
self.stop()
diff --git a/rq/worker.py b/rq/worker.py
index 7cfa130..87d405f 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -1006,10 +1006,10 @@ class Worker(object):
@property
def should_run_maintenance_tasks(self):
- """Maintenance tasks should run on first startup or 15 minutes."""
+ """Maintenance tasks should run on first startup or every 10 minutes."""
if self.last_cleaned_at is None:
return True
- if (utcnow() - self.last_cleaned_at) > timedelta(minutes=15):
+ if (utcnow() - self.last_cleaned_at) > timedelta(minutes=10):
return True
return False