summaryrefslogtreecommitdiff
path: root/rq/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'rq/queue.py')
-rw-r--r--rq/queue.py77
1 files changed, 51 insertions, 26 deletions
diff --git a/rq/queue.py b/rq/queue.py
index 3c483e8..77a6f3e 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -10,6 +10,8 @@ from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Ty
from redis import WatchError
+from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty
+
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
@@ -62,13 +64,15 @@ class EnqueueData(
@total_ordering
class Queue:
job_class: Type['Job'] = Job
+ death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty
DEFAULT_TIMEOUT: int = 180 # Default timeout seconds.
redis_queue_namespace_prefix: str = 'rq:queue:'
redis_queues_keys: str = 'rq:queues'
@classmethod
def all(
- cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None
+ cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None,
+ serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None
) -> List['Queue']:
"""Returns an iterable of all Queues.
@@ -76,6 +80,7 @@ class Queue:
connection (Optional[Redis], optional): The Redis Connection. Defaults to None.
job_class (Optional[Job], optional): The Job class to use. Defaults to None.
serializer (optional): The serializer to use. Defaults to None.
+ death_penalty_class (Optional[Job], optional): The Death Penalty class to use. Defaults to None.
Returns:
queues (List[Queue]): A list of all queues.
@@ -84,7 +89,8 @@ class Queue:
def to_queue(queue_key):
return cls.from_queue_key(
- as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer
+ as_text(queue_key), connection=connection, job_class=job_class,
+ serializer=serializer, death_penalty_class=death_penalty_class
)
all_registerd_queues = connection.smembers(cls.redis_queues_keys)
@@ -96,8 +102,9 @@ class Queue:
cls,
queue_key: str,
connection: Optional['Redis'] = None,
- job_class: Optional['Job'] = None,
+ job_class: Optional[Type['Job']] = None,
serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
@@ -108,6 +115,7 @@ class Queue:
connection (Optional[Redis], optional): Redis connection. Defaults to None.
job_class (Optional[Job], optional): Job class. Defaults to None.
serializer (Any, optional): Serializer. Defaults to None.
+ death_penalty_class (Optional[BaseDeathPenalty], optional): Death penalty class. Defaults to None.
Raises:
ValueError: If the queue_key doesn't start with the defined prefix
@@ -119,7 +127,8 @@ class Queue:
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
name = queue_key[len(prefix):]
- return cls(name, connection=connection, job_class=job_class, serializer=serializer)
+ return cls(name, connection=connection, job_class=job_class, serializer=serializer,
+ death_penalty_class=death_penalty_class)
def __init__(
self,
@@ -129,6 +138,7 @@ class Queue:
is_async: bool = True,
job_class: Union[str, Type['Job'], None] = None,
serializer: Any = None,
+ death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
**kwargs,
):
"""Initializes a Queue object.
@@ -141,6 +151,7 @@ class Queue:
If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True.
job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None.
serializer (Any, optional): Serializer. Defaults to None.
+ death_penalty_class (Type[BaseDeathPenalty, optional): Job class or a string referencing the Job class path. Defaults to UnixSignalDeathPenalty.
"""
self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix
@@ -159,6 +170,7 @@ class Queue:
if isinstance(job_class, str):
job_class = import_attribute(job_class)
self.job_class = job_class
+ self.death_penalty_class = death_penalty_class
self.serializer = resolve_serializer(serializer)
self.redis_server_version: Optional[Tuple[int, int, int]] = None
@@ -166,9 +178,6 @@ class Queue:
def __len__(self):
return self.count
- def __nonzero__(self):
- return True
-
def __bool__(self):
return True
@@ -336,7 +345,7 @@ class Queue:
else:
end = length
job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)]
- self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.")
+ self.log.debug('Getting jobs for queue %s: %d found.', green(self.name), len(job_ids))
return job_ids
def get_jobs(self, offset: int = 0, length: int = -1) -> List['Job']:
@@ -455,7 +464,7 @@ class Queue:
result = connection.lpush(self.key, job_id)
else:
result = connection.rpush(self.key, job_id)
- self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.")
+ self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result)
def create_job(
self,
@@ -663,12 +672,7 @@ class Queue:
on_success=on_success,
on_failure=on_failure,
)
-
- job = self.setup_dependencies(job, pipeline=pipeline)
- # If we do not depend on an unfinished job, enqueue the job.
- if job.get_status(refresh=False) != JobStatus.DEFERRED:
- return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
- return job
+ return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
@staticmethod
def prepare_data(
@@ -739,7 +743,7 @@ class Queue:
"""
pipe = pipeline if pipeline is not None else self.connection.pipeline()
jobs = [
- self.enqueue_job(
+ self._enqueue_job(
self.create_job(
job_data.func,
args=job_data.args,
@@ -980,7 +984,26 @@ class Queue:
return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs)
def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
- """Enqueues a job for delayed execution.
+ """Enqueues a job for delayed execution checking dependencies.
+
+ Args:
+ job (Job): The job to enqueue
+ pipeline (Optional[Pipeline], optional): The Redis pipeline to use. Defaults to None.
+ at_front (bool, optional): Whether should enqueue at the front of the queue. Defaults to False.
+
+ Returns:
+ Job: The enqued job
+ """
+ job.origin = self.name
+ job = self.setup_dependencies(job, pipeline=pipeline)
+ # If we do not depend on an unfinished job, enqueue the job.
+ if job.get_status(refresh=False) != JobStatus.DEFERRED:
+ return self._enqueue_job(job, pipeline=pipeline, at_front=at_front)
+ return job
+
+
+ def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
+ """Enqueues a job for delayed execution without checking dependencies.
If Queue is instantiated with is_async=False, job is executed immediately.
@@ -1067,7 +1090,6 @@ class Queue:
dependents_key = job.dependents_key
while True:
-
try:
# if a pipeline is passed, the caller is responsible for calling WATCH
# to ensure all jobs are enqueued
@@ -1107,10 +1129,10 @@ class Queue:
registry.remove(dependent, pipeline=pipe)
if dependent.origin == self.name:
- self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
+ self._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
else:
queue = self.__class__(name=dependent.origin, connection=self.connection)
- queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
+ queue._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
# Only delete dependents_key if all dependents have been enqueued
if len(jobs_to_enqueue) == len(dependent_job_ids):
@@ -1140,7 +1162,7 @@ class Queue:
return as_text(self.connection.lpop(self.key))
@classmethod
- def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None):
+ def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None):
"""Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to
@@ -1155,7 +1177,7 @@ class Queue:
Args:
queue_keys (_type_): _description_
- timeout (int): _description_
+ timeout (Optional[int]): _description_
connection (Optional[Redis], optional): _description_. Defaults to None.
Raises:
@@ -1188,10 +1210,11 @@ class Queue:
def dequeue_any(
cls,
queues: List['Queue'],
- timeout: int,
+ timeout: Optional[int],
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.
@@ -1205,10 +1228,11 @@ class Queue:
Args:
queues (List[Queue]): List of queue objects
- timeout (int): Timeout for the LPOP
+ timeout (Optional[int]): Timeout for the LPOP
connection (Optional[Redis], optional): Redis Connection. Defaults to None.
- job_class (Optional[Job], optional): The job classification. Defaults to None.
+ job_class (Optional[Type[Job]], optional): The job class. Defaults to None.
serializer (Any, optional): Serializer to use. Defaults to None.
+ death_penalty_class (Optional[Type[BaseDeathPenalty]], optional): The death penalty class. Defaults to None.
Raises:
e: Any exception
@@ -1224,7 +1248,8 @@ class Queue:
if result is None:
return None
queue_key, job_id = map(as_text, result)
- queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, serializer=serializer)
+ queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class,
+ serializer=serializer, death_penalty_class=death_penalty_class)
try:
job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError: