diff options
Diffstat (limited to 'rq/worker.py')
-rw-r--r-- | rq/worker.py | 21 |
1 files changed, 18 insertions, 3 deletions
diff --git a/rq/worker.py b/rq/worker.py index 3e95a3f..2cf4d18 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,6 +1,7 @@ import contextlib import errno import logging +import math import os import random import resource @@ -746,6 +747,7 @@ class Worker: date_format: str = DEFAULT_LOGGING_DATE_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT, max_jobs: Optional[int] = None, + max_idle_time: Optional[int] = None, with_scheduler: bool = False, ) -> bool: """Starts the work loop. @@ -753,6 +755,7 @@ class Worker: Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the queues, unless `burst` mode is enabled. + If `max_idle_time` is provided, worker will die when it's idle for more than the provided value. The return value indicates whether any jobs were processed. @@ -762,6 +765,7 @@ class Worker: date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. + max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. Returns: @@ -786,10 +790,12 @@ class Worker: break timeout = None if burst else self.dequeue_timeout - result = self.dequeue_job_and_maintain_ttl(timeout) + result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time) if result is None: if burst: self.log.info("Worker %s: done, quitting", self.key) + elif max_idle_time is not None: + self.log.info("Worker %s: idle for %d seconds, quitting", self.key, max_idle_time) break job, queue = result @@ -841,7 +847,7 @@ class Worker: pass self.scheduler._process.join() - def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']: + def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']: """Dequeues a job while maintaining the TTL. Returns: @@ -854,6 +860,8 @@ class Worker: self.procline('Listening on ' + qnames) self.log.debug('*** Listening on %s...', green(qnames)) connection_wait_time = 1.0 + idle_since = utcnow() + idle_time_left = max_idle_time while True: try: self.heartbeat() @@ -861,6 +869,9 @@ class Worker: if self.should_run_maintenance_tasks: self.run_maintenance_tasks() + if timeout is not None and idle_time_left is not None: + timeout = min(timeout, idle_time_left) + self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}") result = self.queue_class.dequeue_any( self._ordered_queues, @@ -880,7 +891,11 @@ class Worker: break except DequeueTimeout: - pass + if max_idle_time is not None: + idle_for = (utcnow() - idle_since).total_seconds() + idle_time_left = math.ceil(max_idle_time - idle_for) + if idle_time_left <= 0: + break except redis.exceptions.ConnectionError as conn_err: self.log.error( 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time |