summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRony Lutsky <3050627+ronlut@users.noreply.github.com>2023-02-22 01:35:43 +0200
committerGitHub <noreply@github.com>2023-02-22 06:35:43 +0700
commitaedc9b9e06abc26417cd71e842266c4ae5c658b6 (patch)
tree2114bf455bd3dc78f9efefbb46dac7e6758b2d52
parent41406db3ebd287ab1aa8885948140ec63f22d88a (diff)
downloadrq-aedc9b9e06abc26417cd71e842266c4ae5c658b6.tar.gz
Worker - max_idle_time feature (#1795)
* fix accessing None when dequeued result is None (burst=True, or timeout=None) * add a test * implement + tests * fix if * adjust test * merge * test * test * merge master * take max_idle_time into account for dequeue_timeout * refactor a bit * potential bug fix * tests * math.ceil * buffer tests
-rwxr-xr-xrq/cli/cli.py3
-rw-r--r--rq/queue.py8
-rw-r--r--rq/worker.py21
-rw-r--r--tests/test_worker.py26
4 files changed, 50 insertions, 8 deletions
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index ddfa8a3..1bf9151 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -223,6 +223,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
+@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.argument('queues', nargs=-1)
@@ -246,6 +247,7 @@ def worker(
pid,
disable_default_exception_handler,
max_jobs,
+ max_idle_time,
with_scheduler,
queues,
log_format,
@@ -317,6 +319,7 @@ def worker(
date_format=date_format,
log_format=log_format,
max_jobs=max_jobs,
+ max_idle_time=max_idle_time,
with_scheduler=with_scheduler,
)
except ConnectionError as e:
diff --git a/rq/queue.py b/rq/queue.py
index 3c483e8..45d6a40 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -1140,7 +1140,7 @@ class Queue:
return as_text(self.connection.lpop(self.key))
@classmethod
- def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None):
+ def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None):
"""Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to
@@ -1155,7 +1155,7 @@ class Queue:
Args:
queue_keys (_type_): _description_
- timeout (int): _description_
+ timeout (Optional[int]): _description_
connection (Optional[Redis], optional): _description_. Defaults to None.
Raises:
@@ -1188,7 +1188,7 @@ class Queue:
def dequeue_any(
cls,
queues: List['Queue'],
- timeout: int,
+ timeout: Optional[int],
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
@@ -1205,7 +1205,7 @@ class Queue:
Args:
queues (List[Queue]): List of queue objects
- timeout (int): Timeout for the LPOP
+ timeout (Optional[int]): Timeout for the LPOP
connection (Optional[Redis], optional): Redis Connection. Defaults to None.
job_class (Optional[Job], optional): The job classification. Defaults to None.
serializer (Any, optional): Serializer to use. Defaults to None.
diff --git a/rq/worker.py b/rq/worker.py
index 3e95a3f..2cf4d18 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -1,6 +1,7 @@
import contextlib
import errno
import logging
+import math
import os
import random
import resource
@@ -746,6 +747,7 @@ class Worker:
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
max_jobs: Optional[int] = None,
+ max_idle_time: Optional[int] = None,
with_scheduler: bool = False,
) -> bool:
"""Starts the work loop.
@@ -753,6 +755,7 @@ class Worker:
Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` mode is enabled.
+ If `max_idle_time` is provided, worker will die when it's idle for more than the provided value.
The return value indicates whether any jobs were processed.
@@ -762,6 +765,7 @@ class Worker:
date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
+ max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
Returns:
@@ -786,10 +790,12 @@ class Worker:
break
timeout = None if burst else self.dequeue_timeout
- result = self.dequeue_job_and_maintain_ttl(timeout)
+ result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time)
if result is None:
if burst:
self.log.info("Worker %s: done, quitting", self.key)
+ elif max_idle_time is not None:
+ self.log.info("Worker %s: idle for %d seconds, quitting", self.key, max_idle_time)
break
job, queue = result
@@ -841,7 +847,7 @@ class Worker:
pass
self.scheduler._process.join()
- def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']:
+ def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']:
"""Dequeues a job while maintaining the TTL.
Returns:
@@ -854,6 +860,8 @@ class Worker:
self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames))
connection_wait_time = 1.0
+ idle_since = utcnow()
+ idle_time_left = max_idle_time
while True:
try:
self.heartbeat()
@@ -861,6 +869,9 @@ class Worker:
if self.should_run_maintenance_tasks:
self.run_maintenance_tasks()
+ if timeout is not None and idle_time_left is not None:
+ timeout = min(timeout, idle_time_left)
+
self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}")
result = self.queue_class.dequeue_any(
self._ordered_queues,
@@ -880,7 +891,11 @@ class Worker:
break
except DequeueTimeout:
- pass
+ if max_idle_time is not None:
+ idle_for = (utcnow() - idle_since).total_seconds()
+ idle_time_left = math.ceil(max_idle_time - idle_for)
+ if idle_time_left <= 0:
+ break
except redis.exceptions.ConnectionError as conn_err:
self.log.error(
'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 99f6595..239252c 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -608,6 +608,31 @@ class TestWorker(RQTestCase):
# Should not have created evidence of execution
self.assertEqual(os.path.exists(SENTINEL_FILE), False)
+ @slow
+ def test_max_idle_time(self):
+ q = Queue()
+ w = Worker([q])
+ q.enqueue(say_hello, args=('Frank',))
+ self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1))
+
+ # idle for 1 second
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1))
+
+ # idle for 3 seconds
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3))
+ self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
+
+ # idle for 2 seconds because idle_time is less than timeout
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2))
+ self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer
+
+ # idle for 3 seconds because idle_time is less than two rounds of timeout
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3))
+ self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
+
@slow # noqa
def test_timeouts(self):
"""Worker kills jobs after timeout."""
@@ -640,7 +665,6 @@ class TestWorker(RQTestCase):
q = Queue()
w = Worker([q])
- # Put it on the queue with a timeout value
self.assertIsNone(w.dequeue_job_and_maintain_ttl(None))
def test_worker_ttl_param_resolves_timeout(self):