summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2023-03-23 11:20:06 +0700
committerSelwin Ong <selwin.ong@gmail.com>2023-03-23 11:20:06 +0700
commit8abdc62d23926fbac15142d7b7b30cdf40cca55a (patch)
treeefd38bf95a713f9251674c95d3c83ac79d4b66eb
parent04722339d7598ff0c52f11c3680ed2dd922e6768 (diff)
downloadrq-wait-for-processing-unit.tar.gz
Added `worker.wait_for_procesing_unit()`.wait-for-processing-unit
-rw-r--r--rq/queue.py180
-rw-r--r--rq/worker.py64
2 files changed, 139 insertions, 105 deletions
diff --git a/rq/queue.py b/rq/queue.py
index 77a6f3e..43b31eb 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -71,8 +71,11 @@ class Queue:
@classmethod
def all(
- cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None,
- serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None
+ cls,
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ serializer=None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> List['Queue']:
"""Returns an iterable of all Queues.
@@ -89,8 +92,11 @@ class Queue:
def to_queue(queue_key):
return cls.from_queue_key(
- as_text(queue_key), connection=connection, job_class=job_class,
- serializer=serializer, death_penalty_class=death_penalty_class
+ as_text(queue_key),
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
)
all_registerd_queues = connection.smembers(cls.redis_queues_keys)
@@ -99,12 +105,12 @@ class Queue:
@classmethod
def from_queue_key(
- cls,
- queue_key: str,
- connection: Optional['Redis'] = None,
- job_class: Optional[Type['Job']] = None,
- serializer: Any = None,
- death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
+ cls,
+ queue_key: str,
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
@@ -126,20 +132,25 @@ class Queue:
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
- name = queue_key[len(prefix):]
- return cls(name, connection=connection, job_class=job_class, serializer=serializer,
- death_penalty_class=death_penalty_class)
+ name = queue_key[len(prefix) :]
+ return cls(
+ name,
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
+ )
def __init__(
- self,
- name: str = 'default',
- default_timeout: Optional[int] = None,
- connection: Optional['Redis'] = None,
- is_async: bool = True,
- job_class: Union[str, Type['Job'], None] = None,
- serializer: Any = None,
- death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
- **kwargs,
+ self,
+ name: str = 'default',
+ default_timeout: Optional[int] = None,
+ connection: Optional['Redis'] = None,
+ is_async: bool = True,
+ job_class: Union[str, Type['Job'], None] = None,
+ serializer: Any = None,
+ death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
+ **kwargs,
):
"""Initializes a Queue object.
@@ -207,6 +218,7 @@ class Queue:
@property
def scheduler_pid(self) -> int:
from rq.scheduler import RQScheduler
+
pid = self.connection.get(RQScheduler.get_locking_key(self.name))
return int(pid.decode()) if pid is not None else None
@@ -467,23 +479,23 @@ class Queue:
self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result)
def create_job(
- self,
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- depends_on: Optional['JobDependencyType'] = None,
- job_id: Optional[str] = None,
- meta: Optional[Dict] = None,
- status: JobStatus = JobStatus.QUEUED,
- retry: Optional['Retry'] = None,
- *,
- on_success: Optional[Callable] = None,
- on_failure: Optional[Callable] = None,
+ self,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ depends_on: Optional['JobDependencyType'] = None,
+ job_id: Optional[str] = None,
+ meta: Optional[Dict] = None,
+ status: JobStatus = JobStatus.QUEUED,
+ retry: Optional['Retry'] = None,
+ *,
+ on_success: Optional[Callable] = None,
+ on_failure: Optional[Callable] = None,
) -> Job:
"""Creates a job based on parameters given
@@ -609,23 +621,23 @@ class Queue:
return job
def enqueue_call(
- self,
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- depends_on: Optional['JobDependencyType'] = None,
- job_id: Optional[str] = None,
- at_front: bool = False,
- meta: Optional[Dict] = None,
- retry: Optional['Retry'] = None,
- on_success: Optional[Callable[..., Any]] = None,
- on_failure: Optional[Callable[..., Any]] = None,
- pipeline: Optional['Pipeline'] = None,
+ self,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ depends_on: Optional['JobDependencyType'] = None,
+ job_id: Optional[str] = None,
+ at_front: bool = False,
+ meta: Optional[Dict] = None,
+ retry: Optional['Retry'] = None,
+ on_success: Optional[Callable[..., Any]] = None,
+ on_failure: Optional[Callable[..., Any]] = None,
+ pipeline: Optional['Pipeline'] = None,
) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
@@ -676,20 +688,20 @@ class Queue:
@staticmethod
def prepare_data(
- func: 'FunctionReferenceType',
- args: Union[Tuple, List, None] = None,
- kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None,
- result_ttl: Optional[int] = None,
- ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None,
- description: Optional[str] = None,
- job_id: Optional[str] = None,
- at_front: bool = False,
- meta: Optional[Dict] = None,
- retry: Optional['Retry'] = None,
- on_success: Optional[Callable] = None,
- on_failure: Optional[Callable] = None,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ job_id: Optional[str] = None,
+ at_front: bool = False,
+ meta: Optional[Dict] = None,
+ retry: Optional['Retry'] = None,
+ on_success: Optional[Callable] = None,
+ on_failure: Optional[Callable] = None,
) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData
@@ -1001,7 +1013,6 @@ class Queue:
return self._enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job
-
def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution without checking dependencies.
@@ -1071,7 +1082,7 @@ class Queue:
return job
def enqueue_dependents(
- self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
+ self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
):
"""Enqueues all jobs in the given job's dependents set and clears it.
@@ -1108,7 +1119,7 @@ class Queue:
dependent_job_ids, connection=self.connection, serializer=self.serializer
)
if dependent_job
- and dependent_job.dependencies_are_met(
+ and dependent_job.dependencies_are_met(
parent_job=job,
pipeline=pipe,
exclude_job_id=exclude_job_id,
@@ -1208,13 +1219,13 @@ class Queue:
@classmethod
def dequeue_any(
- cls,
- queues: List['Queue'],
- timeout: Optional[int],
- connection: Optional['Redis'] = None,
- job_class: Optional['Job'] = None,
- serializer: Any = None,
- death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
+ cls,
+ queues: List['Queue'],
+ timeout: Optional[int],
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.
@@ -1248,8 +1259,13 @@ class Queue:
if result is None:
return None
queue_key, job_id = map(as_text, result)
- queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class,
- serializer=serializer, death_penalty_class=death_penalty_class)
+ queue = cls.from_queue_key(
+ queue_key,
+ connection=connection,
+ job_class=job_class,
+ serializer=serializer,
+ death_penalty_class=death_penalty_class,
+ )
try:
job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError:
diff --git a/rq/worker.py b/rq/worker.py
index 80c0384..fd9e912 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -13,8 +13,7 @@ import warnings
from datetime import timedelta
from enum import Enum
from random import shuffle
-from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type,
- Union)
+from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, Union
from uuid import uuid4
if TYPE_CHECKING:
@@ -57,7 +56,17 @@ from .scheduler import RQScheduler
from .serializers import resolve_serializer
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
-from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text
+from .utils import (
+ backend_class,
+ ensure_list,
+ get_version,
+ make_colorizer,
+ utcformat,
+ utcnow,
+ utcparse,
+ compact,
+ as_text,
+)
from .version import VERSION
from .serializers import resolve_serializer
@@ -249,7 +258,7 @@ class Worker:
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True,
serializer=None,
- work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None
+ work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None,
): # noqa
self.default_result_ttl = default_result_ttl
self.worker_ttl = default_worker_ttl
@@ -267,8 +276,13 @@ class Worker:
self.serializer = resolve_serializer(serializer)
queues = [
- self.queue_class(name=q, connection=connection, job_class=self.job_class,
- serializer=self.serializer, death_penalty_class=self.death_penalty_class,)
+ self.queue_class(
+ name=q,
+ connection=connection,
+ job_class=self.job_class,
+ serializer=self.serializer,
+ death_penalty_class=self.death_penalty_class,
+ )
if isinstance(q, str)
else q
for q in ensure_list(queues)
@@ -706,7 +720,7 @@ class Worker:
return
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
pos = self._ordered_queues.index(reference_queue)
- self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
+ self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
return
if self._dequeue_strategy == DequeueStrategy.RANDOM:
shuffle(self._ordered_queues)
@@ -716,7 +730,7 @@ class Worker:
self,
logging_level: str = "INFO",
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
- log_format: str = DEFAULT_LOGGING_FORMAT
+ log_format: str = DEFAULT_LOGGING_FORMAT,
):
"""Bootstraps the worker.
Runs the basic tasks that should run when the worker actually starts working.
@@ -772,6 +786,12 @@ class Worker:
else:
self.scheduler.start()
+ def wait_for_processing_unit(self):
+ """Wait for a processing unit to be available.
+ This is used to limit the number of jobs that can be processed at the same time.
+ """
+ return
+
def work(
self,
burst: bool = False,
@@ -781,7 +801,7 @@ class Worker:
max_jobs: Optional[int] = None,
max_idle_time: Optional[int] = None,
with_scheduler: bool = False,
- dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
+ dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
) -> bool:
"""Starts the work loop.
@@ -814,6 +834,12 @@ class Worker:
self._install_signal_handlers()
try:
while True:
+ self.wait_for_processing_unit()
+
+ if max_jobs is not None:
+ if completed_jobs >= max_jobs:
+ self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs)
+ break
try:
self.check_for_suspension(burst)
@@ -836,12 +862,7 @@ class Worker:
job, queue = result
self.execute_job(job, queue)
self.heartbeat()
-
completed_jobs += 1
- if max_jobs is not None:
- if completed_jobs >= max_jobs:
- self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs)
- break
except redis.exceptions.TimeoutError:
self.log.error('Worker %s: Redis connection timeout, quitting...', self.key)
@@ -881,7 +902,9 @@ class Worker:
pass
self.scheduler._process.join()
- def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']:
+ def dequeue_job_and_maintain_ttl(
+ self, timeout: Optional[int], max_idle_time: Optional[int] = None
+ ) -> Tuple['Job', 'Queue']:
"""Dequeues a job while maintaining the TTL.
Returns:
@@ -1167,10 +1190,7 @@ class Worker:
self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string)
self.handle_work_horse_killed(job, retpid, ret_val, rusage)
- self.handle_job_failure(
- job, queue=queue,
- exc_string=exc_string
- )
+ self.handle_job_failure(job, queue=queue, exc_string=exc_string)
def execute_job(self, job: 'Job', queue: 'Queue'):
"""Spawns a work horse to perform the actual work and passes it a job.
@@ -1458,9 +1478,7 @@ class Worker:
extra.update({'queue': job.origin, 'job_id': job.id})
# func_name
- self.log.error(
- '[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra
- )
+ self.log.error('[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra)
for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler)
@@ -1598,7 +1616,7 @@ class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
- self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
+ self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
class RandomWorker(Worker):