summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris de Graaf <me@cdg.dev>2020-10-31 08:23:21 -0500
committerGitHub <noreply@github.com>2020-10-31 20:23:21 +0700
commit5988889e57301f1d9e6c786fb62c0c3043c4f7d4 (patch)
tree75b08b49d022d7c49fce6da012a37fd27da2c441
parent676ec9b0ea28e9b923109452e734091c94099b56 (diff)
downloadrq-5988889e57301f1d9e6c786fb62c0c3043c4f7d4.tar.gz
Propagate logging settings from worker to scheduler (#1366)
-rw-r--r--rq/scheduler.py39
-rw-r--r--rq/worker.py8
2 files changed, 26 insertions, 21 deletions
diff --git a/rq/scheduler.py b/rq/scheduler.py
index 1be2e29..561ee5a 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -7,25 +7,18 @@ import traceback
from datetime import datetime
from multiprocessing import Process
+from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT
from .job import Job
+from .logutils import setup_loghandlers
from .queue import Queue
from .registry import ScheduledJobRegistry
from .utils import current_timestamp, enum
-from .logutils import setup_loghandlers
from redis import Redis, SSLConnection
SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s'
-logger = logging.getLogger(__name__)
-setup_loghandlers(
- level=logging.INFO,
- name="rq.scheduler",
- log_format="%(asctime)s: %(message)s",
- date_format="%H:%M:%S"
-)
-
class RQScheduler(object):
# STARTED: scheduler has been started but sleeping
@@ -39,7 +32,9 @@ class RQScheduler(object):
STOPPED='stopped'
)
- def __init__(self, queues, connection, interval=1):
+ def __init__(self, queues, connection, interval=1, logging_level=logging.INFO,
+ date_format=DEFAULT_LOGGING_DATE_FORMAT,
+ log_format=DEFAULT_LOGGING_FORMAT):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set()
self._scheduled_job_registries = []
@@ -54,6 +49,13 @@ class RQScheduler(object):
self._stop_requested = False
self._status = self.Status.STOPPED
self._process = None
+ self.log = logging.getLogger(__name__)
+ setup_loghandlers(
+ level=logging_level,
+ name=__name__,
+ log_format=log_format,
+ date_format=date_format,
+ )
@property
def connection(self):
@@ -83,7 +85,7 @@ class RQScheduler(object):
"""Returns names of queue it successfully acquires lock on"""
successful_locks = set()
pid = os.getpid()
- logger.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
+ self.log.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
for name in self._queue_names:
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=60):
successful_locks.add(name)
@@ -157,7 +159,8 @@ class RQScheduler(object):
def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks"""
- logger.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks))
+ self.log.debug("Scheduler sending heartbeat to %s",
+ ", ".join(self.acquired_locks))
if len(self._queue_names) > 1:
with self.connection.pipeline() as pipeline:
for name in self._queue_names:
@@ -169,8 +172,8 @@ class RQScheduler(object):
self.connection.expire(key, self.interval + 5)
def stop(self):
- logger.info("Scheduler stopping, releasing locks for %s...",
- ','.join(self._queue_names))
+ self.log.info("Scheduler stopping, releasing locks for %s...",
+ ','.join(self._queue_names))
keys = [self.get_locking_key(name) for name in self._queue_names]
self.connection.delete(*keys)
self._status = self.Status.STOPPED
@@ -201,17 +204,17 @@ class RQScheduler(object):
def run(scheduler):
- logger.info("Scheduler for %s started with PID %s",
- ','.join(scheduler._queue_names), os.getpid())
+ scheduler.log.info("Scheduler for %s started with PID %s",
+ ','.join(scheduler._queue_names), os.getpid())
try:
scheduler.work()
except: # noqa
- logger.error(
+ scheduler.log.error(
'Scheduler [PID %s] raised an exception.\n%s',
os.getpid(), traceback.format_exc()
)
raise
- logger.info("Scheduler with PID %s has stopped", os.getpid())
+ scheduler.log.info("Scheduler with PID %s has stopped", os.getpid())
def parse_names(queues_or_names):
diff --git a/rq/worker.py b/rq/worker.py
index cfbc675..1e2dba5 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -447,7 +447,7 @@ class Worker(object):
self.handle_warm_shutdown_request()
self._shutdown()
-
+
def _shutdown(self):
"""
If shutdown is requested in the middle of a job, wait until
@@ -504,7 +504,7 @@ class Worker(object):
if self.scheduler and not self.scheduler._process:
self.scheduler.acquire_locks(auto_start=True)
self.clean_registries()
-
+
def subscribe(self):
"""Subscribe to this worker's channel"""
self.log.info('Subscribing to channel %s', self.pubsub_channel_name)
@@ -541,7 +541,9 @@ class Worker(object):
self.log.info('*** Listening on %s...', green(', '.join(qnames)))
if with_scheduler:
- self.scheduler = RQScheduler(self.queues, connection=self.connection)
+ self.scheduler = RQScheduler(
+ self.queues, connection=self.connection, logging_level=logging_level,
+ date_format=date_format, log_format=log_format)
self.scheduler.acquire_locks()
# If lock is acquired, start scheduler
if self.scheduler.acquired_locks: