summaryrefslogtreecommitdiff
path: root/rq/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'rq/worker.py')
-rw-r--r--rq/worker.py21
1 files changed, 18 insertions, 3 deletions
diff --git a/rq/worker.py b/rq/worker.py
index 3e95a3f..2cf4d18 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -1,6 +1,7 @@
import contextlib
import errno
import logging
+import math
import os
import random
import resource
@@ -746,6 +747,7 @@ class Worker:
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
max_jobs: Optional[int] = None,
+ max_idle_time: Optional[int] = None,
with_scheduler: bool = False,
) -> bool:
"""Starts the work loop.
@@ -753,6 +755,7 @@ class Worker:
Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` mode is enabled.
+ If `max_idle_time` is provided, worker will die when it's idle for more than the provided value.
The return value indicates whether any jobs were processed.
@@ -762,6 +765,7 @@ class Worker:
date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
+ max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
Returns:
@@ -786,10 +790,12 @@ class Worker:
break
timeout = None if burst else self.dequeue_timeout
- result = self.dequeue_job_and_maintain_ttl(timeout)
+ result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time)
if result is None:
if burst:
self.log.info("Worker %s: done, quitting", self.key)
+ elif max_idle_time is not None:
+ self.log.info("Worker %s: idle for %d seconds, quitting", self.key, max_idle_time)
break
job, queue = result
@@ -841,7 +847,7 @@ class Worker:
pass
self.scheduler._process.join()
- def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']:
+ def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']:
"""Dequeues a job while maintaining the TTL.
Returns:
@@ -854,6 +860,8 @@ class Worker:
self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames))
connection_wait_time = 1.0
+ idle_since = utcnow()
+ idle_time_left = max_idle_time
while True:
try:
self.heartbeat()
@@ -861,6 +869,9 @@ class Worker:
if self.should_run_maintenance_tasks:
self.run_maintenance_tasks()
+ if timeout is not None and idle_time_left is not None:
+ timeout = min(timeout, idle_time_left)
+
self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}")
result = self.queue_class.dequeue_any(
self._ordered_queues,
@@ -880,7 +891,11 @@ class Worker:
break
except DequeueTimeout:
- pass
+ if max_idle_time is not None:
+ idle_for = (utcnow() - idle_since).total_seconds()
+ idle_time_left = math.ceil(max_idle_time - idle_for)
+ if idle_time_left <= 0:
+ break
except redis.exceptions.ConnectionError as conn_err:
self.log.error(
'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time