summaryrefslogtreecommitdiff
path: root/rq/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'rq/scheduler.py')
-rw-r--r--rq/scheduler.py38
1 files changed, 17 insertions, 21 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py
index de8f26e..da59b0d 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -9,8 +9,7 @@ from multiprocessing import Process
from redis import SSLConnection, UnixDomainSocketConnection
-from .defaults import (DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT,
- DEFAULT_SCHEDULER_FALLBACK_PERIOD)
+from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, DEFAULT_SCHEDULER_FALLBACK_PERIOD
from .job import Job
from .logutils import setup_loghandlers
from .queue import Queue
@@ -35,9 +34,16 @@ class RQScheduler:
Status = SchedulerStatus
- def __init__(self, queues, connection, interval=1, logging_level=logging.INFO,
- date_format=DEFAULT_LOGGING_DATE_FORMAT,
- log_format=DEFAULT_LOGGING_FORMAT, serializer=None):
+ def __init__(
+ self,
+ queues,
+ connection,
+ interval=1,
+ logging_level=logging.INFO,
+ date_format=DEFAULT_LOGGING_DATE_FORMAT,
+ log_format=DEFAULT_LOGGING_FORMAT,
+ serializer=None,
+ ):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set()
self._scheduled_job_registries = []
@@ -59,9 +65,7 @@ class RQScheduler:
# the key is necessary.
# `path` is not left in the dictionary as that keyword argument is
# not expected by `redis.client.Redis` and would raise an exception.
- self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop(
- 'path'
- )
+ self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop('path')
self.serializer = resolve_serializer(serializer)
self._connection = None
@@ -158,9 +162,7 @@ class RQScheduler:
queue = Queue(registry.name, connection=self.connection, serializer=self.serializer)
with self.connection.pipeline() as pipeline:
- jobs = Job.fetch_many(
- job_ids, connection=self.connection, serializer=self.serializer
- )
+ jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer)
for job in jobs:
if job is not None:
queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
@@ -181,8 +183,7 @@ class RQScheduler:
def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks"""
- self.log.debug("Scheduler sending heartbeat to %s",
- ", ".join(self.acquired_locks))
+ self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks))
if len(self._queue_names) > 1:
with self.connection.pipeline() as pipeline:
for name in self._acquired_locks:
@@ -194,8 +195,7 @@ class RQScheduler:
self.connection.expire(key, self.interval + 60)
def stop(self):
- self.log.info("Scheduler stopping, releasing locks for %s...",
- ','.join(self._queue_names))
+ self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names))
self.release_locks()
self._status = self.Status.STOPPED
@@ -231,15 +231,11 @@ class RQScheduler:
def run(scheduler):
- scheduler.log.info("Scheduler for %s started with PID %s",
- ','.join(scheduler._queue_names), os.getpid())
+ scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid())
try:
scheduler.work()
except: # noqa
- scheduler.log.error(
- 'Scheduler [PID %s] raised an exception.\n%s',
- os.getpid(), traceback.format_exc()
- )
+ scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc())
raise
scheduler.log.info("Scheduler with PID %s has stopped", os.getpid())