diff options
Diffstat (limited to 'rq/queue.py')
-rw-r--r-- | rq/queue.py | 77 |
1 files changed, 51 insertions, 26 deletions
diff --git a/rq/queue.py b/rq/queue.py index 3c483e8..77a6f3e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -10,6 +10,8 @@ from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Ty from redis import WatchError +from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty + if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline @@ -62,13 +64,15 @@ class EnqueueData( @total_ordering class Queue: job_class: Type['Job'] = Job + death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty DEFAULT_TIMEOUT: int = 180 # Default timeout seconds. redis_queue_namespace_prefix: str = 'rq:queue:' redis_queues_keys: str = 'rq:queues' @classmethod def all( - cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None + cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, + serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None ) -> List['Queue']: """Returns an iterable of all Queues. @@ -76,6 +80,7 @@ class Queue: connection (Optional[Redis], optional): The Redis Connection. Defaults to None. job_class (Optional[Job], optional): The Job class to use. Defaults to None. serializer (optional): The serializer to use. Defaults to None. + death_penalty_class (Optional[Job], optional): The Death Penalty class to use. Defaults to None. Returns: queues (List[Queue]): A list of all queues. @@ -84,7 +89,8 @@ class Queue: def to_queue(queue_key): return cls.from_queue_key( - as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer + as_text(queue_key), connection=connection, job_class=job_class, + serializer=serializer, death_penalty_class=death_penalty_class ) all_registerd_queues = connection.smembers(cls.redis_queues_keys) @@ -96,8 +102,9 @@ class Queue: cls, queue_key: str, connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, + job_class: Optional[Type['Job']] = None, serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their @@ -108,6 +115,7 @@ class Queue: connection (Optional[Redis], optional): Redis connection. Defaults to None. job_class (Optional[Job], optional): Job class. Defaults to None. serializer (Any, optional): Serializer. Defaults to None. + death_penalty_class (Optional[BaseDeathPenalty], optional): Death penalty class. Defaults to None. Raises: ValueError: If the queue_key doesn't start with the defined prefix @@ -119,7 +127,8 @@ class Queue: if not queue_key.startswith(prefix): raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) name = queue_key[len(prefix):] - return cls(name, connection=connection, job_class=job_class, serializer=serializer) + return cls(name, connection=connection, job_class=job_class, serializer=serializer, + death_penalty_class=death_penalty_class) def __init__( self, @@ -129,6 +138,7 @@ class Queue: is_async: bool = True, job_class: Union[str, Type['Job'], None] = None, serializer: Any = None, + death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty, **kwargs, ): """Initializes a Queue object. @@ -141,6 +151,7 @@ class Queue: If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True. job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None. serializer (Any, optional): Serializer. Defaults to None. + death_penalty_class (Type[BaseDeathPenalty, optional): Job class or a string referencing the Job class path. Defaults to UnixSignalDeathPenalty. """ self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix @@ -159,6 +170,7 @@ class Queue: if isinstance(job_class, str): job_class = import_attribute(job_class) self.job_class = job_class + self.death_penalty_class = death_penalty_class self.serializer = resolve_serializer(serializer) self.redis_server_version: Optional[Tuple[int, int, int]] = None @@ -166,9 +178,6 @@ class Queue: def __len__(self): return self.count - def __nonzero__(self): - return True - def __bool__(self): return True @@ -336,7 +345,7 @@ class Queue: else: end = length job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)] - self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") + self.log.debug('Getting jobs for queue %s: %d found.', green(self.name), len(job_ids)) return job_ids def get_jobs(self, offset: int = 0, length: int = -1) -> List['Job']: @@ -455,7 +464,7 @@ class Queue: result = connection.lpush(self.key, job_id) else: result = connection.rpush(self.key, job_id) - self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") + self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result) def create_job( self, @@ -663,12 +672,7 @@ class Queue: on_success=on_success, on_failure=on_failure, ) - - job = self.setup_dependencies(job, pipeline=pipeline) - # If we do not depend on an unfinished job, enqueue the job. - if job.get_status(refresh=False) != JobStatus.DEFERRED: - return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) - return job + return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) @staticmethod def prepare_data( @@ -739,7 +743,7 @@ class Queue: """ pipe = pipeline if pipeline is not None else self.connection.pipeline() jobs = [ - self.enqueue_job( + self._enqueue_job( self.create_job( job_data.func, args=job_data.args, @@ -980,7 +984,26 @@ class Queue: return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: - """Enqueues a job for delayed execution. + """Enqueues a job for delayed execution checking dependencies. + + Args: + job (Job): The job to enqueue + pipeline (Optional[Pipeline], optional): The Redis pipeline to use. Defaults to None. + at_front (bool, optional): Whether should enqueue at the front of the queue. Defaults to False. + + Returns: + Job: The enqued job + """ + job.origin = self.name + job = self.setup_dependencies(job, pipeline=pipeline) + # If we do not depend on an unfinished job, enqueue the job. + if job.get_status(refresh=False) != JobStatus.DEFERRED: + return self._enqueue_job(job, pipeline=pipeline, at_front=at_front) + return job + + + def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: + """Enqueues a job for delayed execution without checking dependencies. If Queue is instantiated with is_async=False, job is executed immediately. @@ -1067,7 +1090,6 @@ class Queue: dependents_key = job.dependents_key while True: - try: # if a pipeline is passed, the caller is responsible for calling WATCH # to ensure all jobs are enqueued @@ -1107,10 +1129,10 @@ class Queue: registry.remove(dependent, pipeline=pipe) if dependent.origin == self.name: - self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) + self._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) else: queue = self.__class__(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) + queue._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) # Only delete dependents_key if all dependents have been enqueued if len(jobs_to_enqueue) == len(dependent_job_ids): @@ -1140,7 +1162,7 @@ class Queue: return as_text(self.connection.lpop(self.key)) @classmethod - def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None): + def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to @@ -1155,7 +1177,7 @@ class Queue: Args: queue_keys (_type_): _description_ - timeout (int): _description_ + timeout (Optional[int]): _description_ connection (Optional[Redis], optional): _description_. Defaults to None. Raises: @@ -1188,10 +1210,11 @@ class Queue: def dequeue_any( cls, queues: List['Queue'], - timeout: int, + timeout: Optional[int], connection: Optional['Redis'] = None, job_class: Optional['Job'] = None, serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -1205,10 +1228,11 @@ class Queue: Args: queues (List[Queue]): List of queue objects - timeout (int): Timeout for the LPOP + timeout (Optional[int]): Timeout for the LPOP connection (Optional[Redis], optional): Redis Connection. Defaults to None. - job_class (Optional[Job], optional): The job classification. Defaults to None. + job_class (Optional[Type[Job]], optional): The job class. Defaults to None. serializer (Any, optional): Serializer to use. Defaults to None. + death_penalty_class (Optional[Type[BaseDeathPenalty]], optional): The death penalty class. Defaults to None. Raises: e: Any exception @@ -1224,7 +1248,8 @@ class Queue: if result is None: return None queue_key, job_id = map(as_text, result) - queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, serializer=serializer) + queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, + serializer=serializer, death_penalty_class=death_penalty_class) try: job = job_class.fetch(job_id, connection=connection, serializer=serializer) except NoSuchJobError: |