diff options
Diffstat (limited to 'rq/scheduler.py')
-rw-r--r-- | rq/scheduler.py | 38 |
1 files changed, 17 insertions, 21 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py index de8f26e..da59b0d 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -9,8 +9,7 @@ from multiprocessing import Process from redis import SSLConnection, UnixDomainSocketConnection -from .defaults import (DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, - DEFAULT_SCHEDULER_FALLBACK_PERIOD) +from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, DEFAULT_SCHEDULER_FALLBACK_PERIOD from .job import Job from .logutils import setup_loghandlers from .queue import Queue @@ -35,9 +34,16 @@ class RQScheduler: Status = SchedulerStatus - def __init__(self, queues, connection, interval=1, logging_level=logging.INFO, - date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, serializer=None): + def __init__( + self, + queues, + connection, + interval=1, + logging_level=logging.INFO, + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + serializer=None, + ): self._queue_names = set(parse_names(queues)) self._acquired_locks = set() self._scheduled_job_registries = [] @@ -59,9 +65,7 @@ class RQScheduler: # the key is necessary. # `path` is not left in the dictionary as that keyword argument is # not expected by `redis.client.Redis` and would raise an exception. - self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop( - 'path' - ) + self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop('path') self.serializer = resolve_serializer(serializer) self._connection = None @@ -158,9 +162,7 @@ class RQScheduler: queue = Queue(registry.name, connection=self.connection, serializer=self.serializer) with self.connection.pipeline() as pipeline: - jobs = Job.fetch_many( - job_ids, connection=self.connection, serializer=self.serializer - ) + jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer) for job in jobs: if job is not None: queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) @@ -181,8 +183,7 @@ class RQScheduler: def heartbeat(self): """Updates the TTL on scheduler keys and the locks""" - self.log.debug("Scheduler sending heartbeat to %s", - ", ".join(self.acquired_locks)) + self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) if len(self._queue_names) > 1: with self.connection.pipeline() as pipeline: for name in self._acquired_locks: @@ -194,8 +195,7 @@ class RQScheduler: self.connection.expire(key, self.interval + 60) def stop(self): - self.log.info("Scheduler stopping, releasing locks for %s...", - ','.join(self._queue_names)) + self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names)) self.release_locks() self._status = self.Status.STOPPED @@ -231,15 +231,11 @@ class RQScheduler: def run(scheduler): - scheduler.log.info("Scheduler for %s started with PID %s", - ','.join(scheduler._queue_names), os.getpid()) + scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid()) try: scheduler.work() except: # noqa - scheduler.log.error( - 'Scheduler [PID %s] raised an exception.\n%s', - os.getpid(), traceback.format_exc() - ) + scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc()) raise scheduler.log.info("Scheduler with PID %s has stopped", os.getpid()) |