summaryrefslogtreecommitdiff
path: root/rq
diff options
context:
space:
mode:
authorRony Lutsky <ronlut@gmail.com>2023-02-10 02:26:03 +0200
committerGitHub <noreply@github.com>2023-02-10 07:26:03 +0700
commitb69ee10cbba78789e48ce44fa69f14715d94b7a5 (patch)
tree5d72626562c2fd0c4887f6776d53a7a52016473e /rq
parentacdeff385daf5157e6646dfb47a02f92e6b19b3a (diff)
downloadrq-b69ee10cbba78789e48ce44fa69f14715d94b7a5.tar.gz
Fix - Use worker TTL for timeout (#1794)
* Use worker TTL for timeout * add test * renames * test * use dequeue_timeout
Diffstat (limited to 'rq')
-rw-r--r--rq/contrib/legacy.py2
-rw-r--r--rq/worker.py36
2 files changed, 19 insertions, 19 deletions
diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py
index 9c0ac1f..33ecf18 100644
--- a/rq/contrib/legacy.py
+++ b/rq/contrib/legacy.py
@@ -19,6 +19,6 @@ def cleanup_ghosts(conn=None):
conn = conn if conn else get_current_connection()
for worker in Worker.all(connection=conn):
if conn.ttl(worker.key) == -1:
- ttl = worker.default_worker_ttl
+ ttl = worker.worker_ttl
conn.expire(worker.key, ttl)
logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl))
diff --git a/rq/worker.py b/rq/worker.py
index 8720504..3dd7bd6 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -223,7 +223,11 @@ class Worker:
serializer=None,
): # noqa
- connection = self._set_connection(connection, default_worker_ttl)
+ self.default_result_ttl = default_result_ttl
+ self.worker_ttl = default_worker_ttl
+ self.job_monitoring_interval = job_monitoring_interval
+
+ connection = self._set_connection(connection)
self.connection = connection
self.redis_server_version = None
@@ -246,10 +250,6 @@ class Worker:
self._ordered_queues = self.queues[:]
self._exc_handlers: List[Callable] = []
- self.default_result_ttl = default_result_ttl
- self.default_worker_ttl = default_worker_ttl
- self.job_monitoring_interval = job_monitoring_interval
-
self._state: str = 'starting'
self._is_horse: bool = False
self._horse_pid: int = 0
@@ -296,21 +296,19 @@ class Worker:
elif exception_handlers is not None:
self.push_exc_handler(exception_handlers)
- def _set_connection(self, connection: Optional['Redis'], default_worker_ttl: int) -> 'Redis':
+ def _set_connection(self, connection: Optional['Redis']) -> 'Redis':
"""Configures the Redis connection to have a socket timeout.
This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP).
If the connection provided already has a `socket_timeout` defined, skips.
Args:
connection (Optional[Redis]): The Redis Connection.
- default_worker_ttl (int): The Default Worker TTL
"""
if connection is None:
connection = get_current_connection()
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
- timeout = self._get_timeout(default_worker_ttl) + 10
- timeout_config = {"socket_timeout": timeout}
+ timeout_config = {"socket_timeout": self.connection_timeout}
connection.connection_pool.connection_kwargs.update(timeout_config)
return connection
@@ -361,11 +359,13 @@ class Worker:
"""Returns whether or not this is the worker or the work horse."""
return self._is_horse
- def _get_timeout(self, worker_ttl: Optional[int] = None) -> int:
- timeout = DEFAULT_WORKER_TTL
- if worker_ttl:
- timeout = worker_ttl
- return max(1, timeout - 15)
+ @property
+ def dequeue_timeout(self) -> int:
+ return max(1, self.worker_ttl - 15)
+
+ @property
+ def connection_timeout(self) -> int:
+ return self.dequeue_timeout + 10
def procline(self, message):
"""Changes the current procname for the process.
@@ -405,7 +405,7 @@ class Worker:
p.hmset(key, mapping)
worker_registration.register(self, p)
- p.expire(key, self.default_worker_ttl + 60)
+ p.expire(key, self.worker_ttl + 60)
p.execute()
def register_death(self):
@@ -680,7 +680,7 @@ class Worker:
self.log.info('Worker %s: stopping on request', self.key)
break
- timeout = None if burst else self._get_timeout()
+ timeout = None if burst else self.dequeue_timeout
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
if burst:
@@ -789,10 +789,10 @@ class Worker:
The next heartbeat should come before this time, or the worker will
die (at least from the monitoring dashboards).
- If no timeout is given, the default_worker_ttl will be used to update
+ If no timeout is given, the worker_ttl will be used to update
the expiration time of the worker.
"""
- timeout = timeout or self.default_worker_ttl + 60
+ timeout = timeout or self.worker_ttl + 60
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))