summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlowercase00 <21188280+lowercase00@users.noreply.github.com>2023-01-30 01:42:04 -0300
committerGitHub <noreply@github.com>2023-01-30 11:42:04 +0700
commitbd0731025377d5e8ebcbce78026698a57dea01df (patch)
treee3b365abc7a5b50483f94f3d742f141d9b2ee0bb
parent398d5784db27ee7bc97fd2eb98aa5eb7d473d071 (diff)
downloadrq-bd0731025377d5e8ebcbce78026698a57dea01df.tar.gz
Job methods docstrings (#1772)
* Improve docstrings on `connections` * Enhanced Job methods docstrings & Serialization Protocol This adds docstrings to all Job methods in a standard format. It also implements a `serializer` protocol. * Excludes `Protocol` (keeping compatibility with < 3.8) * Add docstrings & type annotation to the `job` decorator * Docstrings for the `defaults` vars * Add deprecation warning to Connection context manager * Fix Types
-rw-r--r--dev-requirements.txt2
-rw-r--r--rq/command.py19
-rw-r--r--rq/connections.py26
-rw-r--r--rq/decorators.py34
-rw-r--r--rq/defaults.py83
-rw-r--r--rq/job.py558
-rw-r--r--rq/serializers.py11
-rw-r--r--rq/types.py17
-rw-r--r--rq/utils.py25
9 files changed, 620 insertions, 155 deletions
diff --git a/dev-requirements.txt b/dev-requirements.txt
index a002971..d16aa41 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -3,3 +3,5 @@ psutil
pytest
pytest-cov
sentry-sdk
+redis
+click
diff --git a/rq/command.py b/rq/command.py
index 7ade232..b98082c 100644
--- a/rq/command.py
+++ b/rq/command.py
@@ -1,9 +1,10 @@
import json
import os
import signal
-import typing as t
-if t.TYPE_CHECKING:
+from typing import TYPE_CHECKING, Dict, Any
+
+if TYPE_CHECKING:
from redis import Redis
from .worker import Worker
@@ -28,7 +29,7 @@ def send_command(connection: 'Redis', worker_name: str, command, **kwargs):
connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
-def parse_payload(payload: t.Dict[t.Any, t.Any]) -> t.Dict[t.Any, t.Any]:
+def parse_payload(payload: Dict[Any, Any]) -> Dict[Any, Any]:
"""
Returns a dict of command data
@@ -75,12 +76,12 @@ def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None):
send_command(connection, job.worker_name, 'stop-job', job_id=job_id)
-def handle_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]):
+def handle_command(worker: 'Worker', payload: Dict[Any, Any]):
"""Parses payload and routes commands
Args:
worker (Worker): The worker to use
- payload (t.Dict[t.Any, t.Any]): The Payload
+ payload (Dict[Any, Any]): The Payload
"""
if payload['command'] == 'stop-job':
handle_stop_job_command(worker, payload)
@@ -101,13 +102,13 @@ def handle_shutdown_command(worker: 'Worker'):
os.kill(pid, signal.SIGINT)
-def handle_kill_worker_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]):
+def handle_kill_worker_command(worker: 'Worker', payload: Dict[Any, Any]):
"""
Stops work horse
Args:
worker (Worker): The worker to stop
- payload (t.Dict[t.Any, t.Any]): The payload.
+ payload (Dict[Any, Any]): The payload.
"""
worker.log.info('Received kill horse command.')
@@ -118,12 +119,12 @@ def handle_kill_worker_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]):
worker.log.info('Worker is not working, kill horse command ignored')
-def handle_stop_job_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]):
+def handle_stop_job_command(worker: 'Worker', payload: Dict[Any, Any]):
"""Handles stop job command.
Args:
worker (Worker): The worker to use
- payload (t.Dict[t.Any, t.Any]): The payload.
+ payload (Dict[Any, Any]): The payload.
"""
job_id = payload.get('job_id')
worker.log.debug('Received command to stop job %s', job_id)
diff --git a/rq/connections.py b/rq/connections.py
index b37e746..fd88099 100644
--- a/rq/connections.py
+++ b/rq/connections.py
@@ -1,5 +1,6 @@
from contextlib import contextmanager
import typing as t
+import warnings
from redis import Redis
from .local import LocalStack, release_local
@@ -11,6 +12,21 @@ class NoRedisConnectionException(Exception):
@contextmanager
def Connection(connection: t.Optional['Redis'] = None): # noqa
+ """The context manager for handling connections in a clean way.
+ It will push the connection to the LocalStack, and pop the connection
+ when leaving the context
+ Example:
+ ..codeblock:python::
+
+ with Connection():
+ w = Worker()
+ w.work()
+
+ Args:
+ connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None.
+ """
+ warnings.warn("The Conneciton context manager is deprecated. Use the `connection` parameter instead.",
+ DeprecationWarning)
if connection is None:
connection = Redis()
push_connection(connection)
@@ -33,9 +49,12 @@ def push_connection(redis: 'Redis'):
_connection_stack.push(redis)
-def pop_connection():
+def pop_connection() -> 'Redis':
"""
Pops the topmost connection from the stack.
+
+ Returns:
+ redis (Redis): A Redis connection
"""
return _connection_stack.pop()
@@ -57,10 +76,13 @@ def use_connection(redis: t.Optional['Redis'] = None):
push_connection(redis)
-def get_current_connection():
+def get_current_connection() -> 'Redis':
"""
Returns the current Redis connection (i.e. the topmost on the
connection stack).
+
+ Returns:
+ Redis: A Redis Connection
"""
return _connection_stack.top
diff --git a/rq/decorators.py b/rq/decorators.py
index 3c8dc83..70a61aa 100644
--- a/rq/decorators.py
+++ b/rq/decorators.py
@@ -1,7 +1,7 @@
from functools import wraps
-import typing as t
+from typing import TYPE_CHECKING, Callable, Dict, Optional, List, Any, Union
-if t.TYPE_CHECKING:
+if TYPE_CHECKING:
from redis import Redis
from .job import Retry
@@ -13,13 +13,13 @@ from .utils import backend_class
class job: # noqa
queue_class = Queue
- def __init__(self, queue: 'Queue', connection: t.Optional['Redis'] = None, timeout=None,
- result_ttl=DEFAULT_RESULT_TTL, ttl=None,
- queue_class=None, depends_on: t.Optional[t.List[t.Any]] = None, at_front: t.Optional[bool] = None,
- meta=None, description=None, failure_ttl=None, retry: t.Optional['Retry'] = None, on_failure=None,
- on_success=None):
- """
- A decorator that adds a ``delay`` method to the decorated function,
+ def __init__(self, queue: Union['Queue', str], connection: Optional['Redis'] = None, timeout: Optional[int] = None,
+ result_ttl: int = DEFAULT_RESULT_TTL, ttl: Optional[int] = None,
+ queue_class: Optional['Queue'] = None, depends_on: Optional[List[Any]] = None, at_front: Optional[bool] = None,
+ meta: Optional[Dict[Any, Any]] = None, description: Optional[str] = None, failure_ttl: Optional[int] = None,
+ retry: Optional['Retry'] = None, on_failure: Optional[Callable[..., Any]] = None,
+ on_success: Optional[Callable[..., Any]] = None):
+ """A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required
``queue`` argument that can be either a ``Queue`` instance or a string
denoting the queue name. For example::
@@ -32,6 +32,22 @@ class job: # noqa
>>> ...
>>> # Puts `simple_add` function into queue
>>> simple_add.delay(1, 2)
+
+ Args:
+ queue (Union['Queue', str]): The queue to use, can be the Queue class itself, or the queue name (str)
+ connection (Optional[Redis], optional): Redis Connection. Defaults to None.
+ timeout (Optional[int], optional): Job timeout. Defaults to None.
+ result_ttl (int, optional): Result time to live. Defaults to DEFAULT_RESULT_TTL.
+ ttl (Optional[int], optional): Time to live. Defaults to None.
+ queue_class (Optional[Queue], optional): A custom class that inherits from `Queue`. Defaults to None.
+ depends_on (Optional[List[Any]], optional): A list of dependents jobs. Defaults to None.
+ at_front (Optional[bool], optional): Whether to enqueue the job at front of the queue. Defaults to None.
+ meta (Optional[Dict[Any, Any]], optional): Arbitraty metadata about the job. Defaults to None.
+ description (Optional[str], optional): Job description. Defaults to None.
+ failure_ttl (Optional[int], optional): Failture time to live. Defaults to None.
+ retry (Optional[Retry], optional): A Retry object. Defaults to None.
+ on_failure (Optional[Callable[..., Any]], optional): Callable to run on failure. Defaults to None.
+ on_success (Optional[Callable[..., Any]], optional): Callable to run on success. Defaults to None.
"""
self.queue = queue
self.queue_class = backend_class(self, 'queue_class', override=queue_class)
diff --git a/rq/defaults.py b/rq/defaults.py
index bb7ec79..ef76678 100644
--- a/rq/defaults.py
+++ b/rq/defaults.py
@@ -1,14 +1,91 @@
DEFAULT_JOB_CLASS = 'rq.job.Job'
+""" The path for the default Job class to use.
+Defaults to the main `Job` class within the `rq.job` module
+"""
+
+
DEFAULT_QUEUE_CLASS = 'rq.Queue'
+""" The path for the default Queue class to use.
+Defaults to the main `Queue` class within the `rq.queue` module
+"""
+
+
DEFAULT_WORKER_CLASS = 'rq.Worker'
+""" The path for the default Worker class to use.
+Defaults to the main `Worker` class within the `rq.worker` module
+"""
+
+
DEFAULT_SERIALIZER_CLASS = 'rq.serializers.DefaultSerializer'
+""" The path for the default Serializer class to use.
+Defaults to the main `DefaultSerializer` class within the `rq.serializers` module
+"""
+
+
DEFAULT_CONNECTION_CLASS = 'redis.Redis'
+""" The path for the default Redis client class to use.
+Defaults to the main `Redis` class within the `redis` module
+As imported like `from redis import Redis`
+"""
+
+
DEFAULT_WORKER_TTL = 420
+""" The default Time To Live (TTL) for the Worker in seconds
+Defines the effective timeout period for a worker
+"""
+
+
DEFAULT_JOB_MONITORING_INTERVAL = 30
+""" The interval in seconds for Job monitoring
+"""
+
+
DEFAULT_RESULT_TTL = 500
-DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds
-DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
+""" The Time To Live (TTL) in seconds to keep job results
+Means that the results will be expired from Redis
+after `DEFAULT_RESULT_TTL` seconds
+"""
+
+
+DEFAULT_FAILURE_TTL = 31536000
+""" The Time To Live (TTL) in seconds to keep job failure information
+Means that the failure information will be expired from Redis
+after `DEFAULT_FAILURE_TTL` seconds.
+Defaults to 1 YEAR in seconds
+"""
+
+
DEFAULT_SCHEDULER_FALLBACK_PERIOD = 120
-DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
+""" The amount in seconds it will take for a new scheduler
+to pickup tasks after a scheduler has died.
+This is used as a safety net to avoid race conditions and duplicates
+when using multiple schedulers
+"""
+
+
DEFAULT_MAINTENANCE_TASK_INTERVAL = 10 * 60
+""" The interval to run maintenance tasks
+in seconds. Defaults to 10 minutes.
+"""
+
+
CALLBACK_TIMEOUT = 60
+""" The timeout period in seconds for Callback functions
+Means that Functions used in `success_callback` and `failure_callback`
+will timeout after N seconds
+"""
+
+
+DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
+""" The Date Format to use for RQ logging.
+Defaults to Hour:Minute:Seconds on 24hour format
+eg.: `15:45:23`
+"""
+
+
+DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
+""" The default Logging Format to use
+Uses Python's default attributes as defined
+https://docs.python.org/3/library/logging.html#logrecord-attributes
+"""
+
diff --git a/rq/job.py b/rq/job.py
index c41c17d..cfd7418 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -1,40 +1,37 @@
import inspect
import json
-import pickle
import warnings
import zlib
-import typing as t
import asyncio
from collections.abc import Iterable
from datetime import datetime, timedelta, timezone
from enum import Enum
-from functools import partial
from redis import WatchError
-from typing import Any, List, Optional
+from typing import (TYPE_CHECKING, Any, Callable, Dict, Iterable, List,
+ Optional, Tuple, Union)
from uuid import uuid4
-if t.TYPE_CHECKING:
+
+if TYPE_CHECKING:
from .results import Result
from .queue import Queue
from redis import Redis
from redis.client import Pipeline
from .connections import resolve_connection
-from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
+from .exceptions import (DeserializationError, InvalidJobOperation,
+ NoSuchJobError)
from .local import LocalStack
from .serializers import resolve_serializer
-from .utils import (get_version, import_attribute, parse_timeout, str_to_date,
- utcformat, utcnow, ensure_list, get_call_string, as_text,
- decode_redis_hash)
-
-# Serialize pickle dumps using the highest pickle protocol (binary, default
-# uses ascii)
-dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
-loads = pickle.loads
+from .types import FunctionReferenceType, JobDependencyType
+from .utils import (as_text, decode_redis_hash, ensure_list, get_call_string,
+ get_version, import_attribute, parse_timeout, str_to_date,
+ utcformat, utcnow)
class JobStatus(str, Enum):
+ """The Status of Job within its lifecycle at any given time. """
QUEUED = 'queued'
FINISHED = 'finished'
FAILED = 'failed'
@@ -46,7 +43,21 @@ class JobStatus(str, Enum):
class Dependency:
- def __init__(self, jobs: t.List[t.Union['Job', str]], allow_failure: bool = False, enqueue_at_front: bool = False):
+ def __init__(self, jobs: List[Union['Job', str]], allow_failure: bool = False, enqueue_at_front: bool = False):
+ """The definition of a Dependency.
+
+ Args:
+ jobs (List[Union[Job, str]]): A list of Job instances or Job IDs.
+ Anything different will raise a ValueError
+ allow_failure (bool, optional): Whether to allow for failure when running the depency,
+ meaning, the dependencies should continue running even after one of them failed.
+ Defaults to False.
+ enqueue_at_front (bool, optional): Whether this dependecy should be enqueued at the front of the queue.
+ Defaults to False.
+
+ Raises:
+ ValueError: If the `jobs` param has anything different than `str` or `Job` class or the job list is empty
+ """
dependent_jobs = ensure_list(jobs)
if not all(
isinstance(job, Job) or isinstance(job, str)
@@ -62,47 +73,119 @@ class Dependency:
self.enqueue_at_front = enqueue_at_front
-# Sentinel value to mark that some of our lazily evaluated properties have not
-# yet been evaluated.
UNEVALUATED = object()
+"""Sentinel value to mark that some of our lazily evaluated properties have not
+yet been evaluated.
+"""
+
+def cancel_job(job_id: str, connection: Optional['Redis'] = None,
+ serializer=None, enqueue_dependents: bool = False):
+ """Cancels the job with the given job ID, preventing execution.
+ Use with caution. This will discard any job info (i.e. it can't be requeued later).
-def cancel_job(job_id: str, connection: Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False):
- """Cancels the job with the given job ID, preventing execution. Discards
- any job info (i.e. it can't be requeued later).
+ Args:
+ job_id (str): The Job ID
+ connection (Optional[Redis], optional): The Redis Connection. Defaults to None.
+ serializer (str, optional): The string of the path to the serializer to use. Defaults to None.
+ enqueue_dependents (bool, optional): Whether dependents should still be enqueued. Defaults to False.
"""
Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents)
-def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['Job'] = None):
- """Returns the Job instance that is currently being executed. If this
- function is invoked from outside a job context, None is returned.
- """
+def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['Job'] = None) -> Optional['Job']:
+ """Returns the Job instance that is currently being executed.
+ If this function is invoked from outside a job context, None is returned.
+
+ Args:
+ connection (Optional[Redis], optional): The connection to use. Defaults to None.
+ job_class (Optional[Job], optional): The job class (DEPRECATED). Defaults to None.
+
+ Returns:
+ job (Optional[Job]): The current Job running
+ """
+ if connection:
+ warnings.warn("connection argument for get_current_job is deprecated.",
+ DeprecationWarning)
if job_class:
warnings.warn("job_class argument for get_current_job is deprecated.",
DeprecationWarning)
return _job_stack.top
-def requeue_job(job_id: str, connection: 'Redis', serializer=None):
+def requeue_job(job_id: str, connection: 'Redis', serializer=None) -> 'Job':
+ """Fetches a Job by ID and requeues it using the `requeue()` method.
+
+ Args:
+ job_id (str): The Job ID that should be requeued.
+ connection (Redis): The Redis Connection to use
+ serializer (Optional[str], optional): The serializer. Defaults to None.
+
+ Returns:
+ Job: The requeued Job object.
+ """
job = Job.fetch(job_id, connection=connection, serializer=serializer)
return job.requeue()
class Job:
- """A Job is just a convenient datastructure to pass around job (meta) data.
- """
+ """A Job is just a convenient datastructure to pass around job (meta) data."""
redis_job_namespace_prefix = 'rq:job:'
- # Job construction
@classmethod
- def create(cls, func: t.Callable[..., t.Any], args=None, kwargs=None, connection: Optional['Redis'] = None,
- result_ttl=None, ttl=None, status: JobStatus = None, description=None,
- depends_on=None, timeout=None, id=None, origin=None, meta=None,
- failure_ttl=None, serializer=None, *, on_success=None, on_failure=None) -> 'Job':
+ def create(cls, func: FunctionReferenceType, args: Union[List[Any], Optional[Tuple]] = None,
+ kwargs: Optional[Dict[str, Any]] = None, connection: Optional['Redis'] = None,
+ result_ttl: Optional[int] = None, ttl: Optional[int] = None,
+ status: Optional[JobStatus] = None, description: Optional[str] =None,
+ depends_on: Optional[JobDependencyType] = None,
+ timeout: Optional[int] = None, id: Optional[str] = None,
+ origin=None, meta: Optional[Dict[str, Any]] = None,
+ failure_ttl: Optional[int] = None, serializer=None, *,
+ on_success: Optional[Callable[..., Any]] = None,
+ on_failure: Optional[Callable[..., Any]] = None) -> 'Job':
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
- """
+
+ Args:
+ func (FunctionReference): The function/method/callable for the Job. This can be
+ a reference to a concrete callable or a string representing the path of function/method to be
+ imported. Effectively this is the only required attribute when creating a new Job.
+ args (Union[List[Any], Optional[Tuple]], optional): A Tuple / List of positional arguments to pass the callable.
+ Defaults to None, meaning no args being passed.
+ kwargs (Optional[Dict], optional): A Dictionary of keyword arguments to pass the callable.
+ Defaults to None, meaning no kwargs being passed.
+ connection (Optional[Redis], optional): The Redis connection to use. Defaults to None.
+ This will be "resolved" using the `resolve_connection` function when initialzing the Job Class.
+ result_ttl (Optional[int], optional): The amount of time in seconds the results should live.
+ Defaults to None.
+ ttl (Optional[int], optional): The Time To Live (TTL) for the job itself. Defaults to None.
+ status (JobStatus, optional): The Job Status. Defaults to None.
+ description (Optional[str], optional): The Job Description. Defaults to None.
+ depends_on (Union['Dependency', List[Union['Dependency', 'Job']]], optional): What the jobs depends on.
+ This accepts a variaty of different arguments including a `Dependency`, a list of `Dependency` or a `Job`
+ list of `Job`. Defaults to None.
+ timeout (Optional[int], optional): The amount of time in seconds that should be a hardlimit for a job execution. Defaults to None.
+ id (Optional[str], optional): An Optional ID (str) for the Job. Defaults to None.
+ origin (Optional[str], optional): The queue of origin. Defaults to None.
+ meta (Optional[Dict[str, Any]], optional): Custom metadata about the job, takes a dictioanry. Defaults to None.
+ failure_ttl (Optional[int], optional): THe time to live in seconds for failed-jobs information. Defaults to None.
+ serializer (Optional[str], optional): The serializer class path to use. Should be a string with the import
+ path for the serializer to use. eg. `mymodule.myfile.MySerializer` Defaults to None.
+ on_success (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run
+ when/if the Job finishes sucessfully. Defaults to None.
+ on_failure (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run
+ when/if the Job fails. Defaults to None.
+
+ Raises:
+ TypeError: If `args` is not a tuple/list
+ TypeError: If `kwargs` is not a dict
+ TypeError: If the `func` is something other than a string or a Callable reference
+ ValueError: If `on_failure` is not a function
+ ValueError: If `on_success` is not a function
+
+ Returns:
+ Job: A job instance.
+ """
if args is None:
args = ()
if kwargs is None:
@@ -171,25 +254,51 @@ class Job:
return job
- def get_position(self):
+ def get_position(self) -> Optional[int]:
+ """Get's the job's position on the queue
+
+ Returns:
+ position (Optional[int]): The position
+ """
from .queue import Queue
if self.origin:
q = Queue(name=self.origin, connection=self.connection)
return q.get_job_position(self._id)
return None
- def get_status(self, refresh: bool = True) -> str:
+ def get_status(self, refresh: bool = True) -> JobStatus:
+ """Gets the Job Status
+
+ Args:
+ refresh (bool, optional): Whether to refresh the Job. Defaults to True.
+
+ Returns:
+ status (JobStatus): The Job Status
+ """
if refresh:
self._status = as_text(self.connection.hget(self.key, 'status'))
-
return self._status
- def set_status(self, status: str, pipeline: Optional['Pipeline'] = None):
+ def set_status(self, status: JobStatus, pipeline: Optional['Pipeline'] = None) -> None:
+ """Set's the Job Status
+
+ Args:
+ status (JobStatus): The Job Status to be set
+ pipeline (Optional[Pipeline], optional): Optional Redis Pipeline to use. Defaults to None.
+ """
self._status = status
connection: 'Redis' = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status)
- def get_meta(self, refresh: bool = True):
+ def get_meta(self, refresh: bool = True) -> Dict:
+ """Get's the metadata for a Job, an arbitrary dictionary.
+
+ Args:
+ refresh (bool, optional): Whether to refresh. Defaults to True.
+
+ Returns:
+ meta (Dict): The dictionary of metadata
+ """
if refresh:
meta = self.connection.hget(self.key, 'meta')
self.meta = self.serializer.loads(meta) if meta else {}
@@ -231,7 +340,7 @@ class Job:
@property
def _dependency_id(self):
"""Returns the first item in self._dependency_ids. Present to
- preserve compatibility with third party packages..
+ preserve compatibility with third party packages.
"""
if self._dependency_ids:
return self._dependency_ids[0]
@@ -250,7 +359,7 @@ class Job:
return job
@property
- def dependent_ids(self) -> t.List[str]:
+ def dependent_ids(self) -> List[str]:
"""Returns a list of ids of jobs whose execution depends on this
job's successful execution."""
return list(map(as_text, self.connection.smembers(self.dependents_key)))
@@ -287,10 +396,15 @@ class Job:
return self._failure_callback
def _deserialize_data(self):
+ """Deserializes the Job `data` into a tuple.
+ This includes the `_func_name`, `_instance`, `_args` and `_kwargs`
+
+ Raises:
+ DeserializationError: Cathes any deserialization error (since serializers are generic)
+ """
try:
self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data)
except Exception as e:
- # catch anything because serializers are generic
raise DeserializationError() from e
@property
@@ -365,45 +479,71 @@ class Job:
self._data = UNEVALUATED
@classmethod
- def exists(cls, job_id: str, connection: Optional['Redis'] = None) -> int:
- """Returns whether a job hash exists for the given job ID."""
+ def exists(cls, job_id: str, connection: Optional['Redis'] = None) -> bool:
+ """Checks whether a Job Hash exists for the given Job ID
+
+ Args:
+ job_id (str): The Job ID
+ connection (Optional[Redis], optional): Optional connection to use. Defaults to None.
+
+ Returns:
+ job_exists (bool): Whether the Job exists
+ """
conn = resolve_connection(connection)
- return conn.exists(cls.key_for(job_id))
+ job_key = cls.key_for(job_id)
+ job_exists = conn.exists(job_key)
+ return bool(job_exists)
@classmethod
def fetch(cls, id: str, connection: Optional['Redis'] = None, serializer=None) -> 'Job':
- """Fetches a persisted job from its corresponding Redis key and
- instantiates it.
+ """Fetches a persisted Job from its corresponding Redis key and instantiates it
+
+ Args:
+ id (str): The Job to fetch
+ connection (Optional[&#39;Redis&#39;], optional): An optional Redis connection. Defaults to None.
+ serializer (_type_, optional): The serializer to use. Defaults to None.
+
+ Returns:
+ Job: The Job instance
"""
job = cls(id, connection=connection, serializer=serializer)
job.refresh()
return job
@classmethod
- def fetch_many(cls, job_ids: t.Iterable[str], connection: 'Redis', serializer=None):
+ def fetch_many(cls, job_ids: Iterable[str], connection: 'Redis', serializer=None) -> List['Job']:
"""
Bulk version of Job.fetch
For any job_ids which a job does not exist, the corresponding item in
the returned list will be None.
+
+ Args:
+ job_ids (Iterable[str]): A list of job ids.
+ connection (Redis): Redis connection
+ serializer (Callable): A serializer
+
+ Returns:
+ jobs (list[Job]): A list of Jobs instances.
"""
with connection.pipeline() as pipeline:
for job_id in job_ids:
pipeline.hgetall(cls.key_for(job_id))
results = pipeline.execute()
- jobs: t.List[Optional['Job']] = []
+ jobs: List[Optional['Job']] = []
for i, job_id in enumerate(job_ids):
- if results[i]:
- job = cls(job_id, connection=connection, serializer=serializer)
- job.restore(results[i])
- jobs.append(job)
- else:
+ if not results[i]:
jobs.append(None)
+ continue
+
+ job = cls(job_id, connection=connection, serializer=serializer)
+ job.restore(results[i])
+ jobs.append(job)
return jobs
- def __init__(self, id: str = None, connection: Optional['Redis'] = None, serializer=None):
+ def __init__(self, id: Optional[str] = None, connection: Optional['Redis'] = None, serializer=None):
self.connection = resolve_connection(connection)
self._id = id
self.created_at = utcnow()
@@ -429,12 +569,12 @@ class Job:
self.ttl: Optional[int] = None
self.worker_name: Optional[str] = None
self._status = None
- self._dependency_ids: t.List[str] = []
+ self._dependency_ids: List[str] = []
self.meta = {}
self.serializer = resolve_serializer(serializer)
self.retries_left = None
- self.retry_intervals: Optional[t.List[int]] = None
- self.redis_server_version = None
+ self.retry_intervals: Optional[List[int]] = None
+ self.redis_server_version: Optional[Tuple[int, int, int]] = None
self.last_heartbeat: Optional[datetime] = None
self.allow_dependency_failures: Optional[bool] = None
self.enqueue_at_front: Optional[bool] = None
@@ -459,21 +599,38 @@ class Job:
return hash(self.id)
# Data access
- def get_id(self): # noqa
+ def get_id(self) -> str: # noqa
"""The job ID for this job instance. Generates an ID lazily the
first time the ID is requested.
+
+ Returns:
+ job_id (str): The Job ID
"""
if self._id is None:
self._id = str(uuid4())
return self._id
- def set_id(self, value: str):
- """Sets a job ID for the given job."""
+ def set_id(self, value: str) -> None:
+ """Sets a job ID for the given job
+
+ Args:
+ value (str): The value to set as Job ID
+ """
if not isinstance(value, str):
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
def heartbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False):
+ """Sets the heartbeat for a job.
+ It will set a hash in Redis with the `last_heartbeat` key and datetime value.
+ If a Redis' pipeline is passed, it will use that, else, it will use the job's own connection.
+
+ Args:
+ timestamp (datetime): The timestamp to use
+ ttl (int): The time to live
+ pipeline (Optional[Pipeline], optional): Can receive a Redis' pipeline to use. Defaults to None.
+ xx (bool, optional): Only sets the key if already exists. Defaults to False.
+ """
self.last_heartbeat = timestamp
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
@@ -482,13 +639,27 @@ class Job:
id = property(get_id, set_id)
@classmethod
- def key_for(cls, job_id: str):
- """The Redis key that is used to store job hash under."""
+ def key_for(cls, job_id: str) -> bytes:
+ """The Redis key that is used to store job hash under.
+
+ Args:
+ job_id (str): The Job ID
+
+ Returns:
+ redis_job_key (bytes): The Redis fully qualified key for the job
+ """
return (cls.redis_job_namespace_prefix + job_id).encode('utf-8')
@classmethod
- def dependents_key_for(cls, job_id: str):
- """The Redis key that is used to store job dependents hash under."""
+ def dependents_key_for(cls, job_id: str) -> str:
+ """The Redis key that is used to store job dependents hash under.
+
+ Args:
+ job_id (str): The "parent" job id
+
+ Returns:
+ dependents_key (str): The dependents key
+ """
return '{0}{1}:dependents'.format(cls.redis_job_namespace_prefix, job_id)
@property
@@ -505,25 +676,29 @@ class Job:
def dependencies_key(self):
return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id)
- def fetch_dependencies(self, watch: bool = False, pipeline: Optional['Pipeline'] = None):
- """
- Fetch all of a job's dependencies. If a pipeline is supplied, and
+ def fetch_dependencies(self, watch: bool = False, pipeline: Optional['Pipeline'] = None) -> List['Job']:
+ """Fetch all of a job's dependencies. If a pipeline is supplied, and
watch is true, then set WATCH on all the keys of all dependencies.
Returned jobs will use self's connection, not the pipeline supplied.
If a job has been deleted from redis, it is not returned.
- """
+
+ Args:
+ watch (bool, optional): Wether to WATCH the keys. Defaults to False.
+ pipeline (Optional[Pipeline]): The Redis' pipeline to use. Defaults to None.
+
+ Returns:
+ jobs (list[Job]): A list of Jobs
+ """
connection = pipeline if pipeline is not None else self.connection
if watch and self._dependency_ids:
connection.watch(*[self.key_for(dependency_id)
for dependency_id in self._dependency_ids])
- jobs = [job
- for job in self.fetch_many(self._dependency_ids, connection=self.connection, serializer=self.serializer)
- if job]
-
+ dependencies_list = self.fetch_many(self._dependency_ids, connection=self.connection, serializer=self.serializer)
+ jobs = [job for job in dependencies_list if job]
return jobs
@property
@@ -545,18 +720,29 @@ class Job:
return self._exc_info
- def return_value(self, refresh=False) -> Any:
- """Returns the return value of the latest execution, if it was successful"""
+ def return_value(self, refresh: bool = False) -> Optional[Any]:
+ """Returns the return value of the latest execution, if it was successful
+
+ Args:
+ refresh (bool, optional): Whether to refresh the current status. Defaults to False.
+
+ Returns:
+ result (Optional[Any]): The job return value.
+ """
from .results import Result
if refresh:
self._cached_result = None
- if self.supports_redis_streams:
- if not self._cached_result:
- self._cached_result = self.latest_result()
+ if not self.supports_redis_streams:
+ return None
- if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL:
- return self._cached_result.return_value
+ if not self._cached_result:
+ self._cached_result = self.latest_result()
+
+ if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL:
+ return self._cached_result.return_value
+
+ return None
@property
def result(self) -> Any:
@@ -597,17 +783,33 @@ class Job:
return self._result
def results(self) -> List['Result']:
- """Returns all Result objects"""
+ """Returns all Result objects
+
+ Returns:
+ all_results (List[Result]): A list of 'Result' objects
+ """
from .results import Result
return Result.all(self, serializer=self.serializer)
def latest_result(self) -> Optional['Result']:
+ """Get the latest job result.
+
+ Returns:
+ result (Result): The Result object
+ """
"""Returns the latest Result object"""
from .results import Result
return Result.fetch_latest(self, serializer=self.serializer)
- def restore(self, raw_data):
- """Overwrite properties with the provided values stored in Redis"""
+ def restore(self, raw_data) -> Any:
+ """Overwrite properties with the provided values stored in Redis.
+
+ Args:
+ raw_data (_type_): The raw data to load the job data from
+
+ Raises:
+ NoSuchJobError: If there way an error getting the job data
+ """
obj = decode_redis_hash(raw_data)
try:
raw_data = obj['data']
@@ -680,11 +882,17 @@ class Job:
self.restore(data)
def to_dict(self, include_meta: bool = True, include_result: bool = True) -> dict:
- """
- Returns a serialization of the current job instance
+ """Returns a serialization of the current job instance
You can exclude serializing the `meta` dictionary by setting
`include_meta=False`.
+
+ Args:
+ include_meta (bool, optional): Whether to include the Job's metadata. Defaults to True.
+ include_result (bool, optional): Whether to include the Job's result. Defaults to True.
+
+ Returns:
+ dict: The Job serialized as a dictionary
"""
obj = {
'created_at': utcformat(self.created_at or utcnow()),
@@ -741,15 +949,19 @@ class Job:
return obj
def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True,
- include_result=True):
- """
- Dumps the current job instance to its corresponding Redis key.
+ include_result: bool = True):
+ """Dumps the current job instance to its corresponding Redis key.
Exclude saving the `meta` dictionary by setting
`include_meta=False`. This is useful to prevent clobbering
user metadata without an expensive `refresh()` call first.
Redis key persistence may be altered by `cleanup()` method.
+
+ Args:
+ pipeline (Optional[Pipeline], optional): The Redis' pipeline to use. Defaults to None.
+ include_meta (bool, optional): Whether to include the job's metadata. Defaults to True.
+ include_result (bool, optional): Whether to include the job's result. Defaults to True.
"""
key = self.key
connection = pipeline if pipeline is not None else self.connection
@@ -766,9 +978,13 @@ class Job:
"""Only supported by Redis server >= 5.0 is required."""
return self.get_redis_server_version() >= (5, 0, 0)
- def get_redis_server_version(self):
- """Return Redis server version of connection"""
- if not self.redis_server_version:
+ def get_redis_server_version(self) -> Tuple[int, int, int]:
+ """Return Redis server version of connection
+
+ Returns:
+ redis_server_version (Tuple[int, int, int]): The Redis version within a Tuple of integers, eg (5, 0, 9)
+ """
+ if self.redis_server_version is None:
self.redis_server_version = get_version(self.connection)
return self.redis_server_version
@@ -788,6 +1004,13 @@ class Job:
You can enqueue the jobs dependents optionally,
Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in.
+
+ Args:
+ pipeline (Optional[Pipeline], optional): The Redis' pipeline to use. Defaults to None.
+ enqueue_dependents (bool, optional): Whether to enqueue dependents jobs. Defaults to False.
+
+ Raises:
+ InvalidJobOperation: If the job has already been cancelled.
"""
if self.is_canceled:
raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id()))
@@ -834,8 +1057,15 @@ class Job:
# handle it
raise
- def requeue(self, at_front: bool = False):
- """Requeues job."""
+ def requeue(self, at_front: bool = False) -> 'Job':
+ """Requeues job
+
+ Args:
+ at_front (bool, optional): Whether the job should be requeued at the front of the queue. Defaults to False.
+
+ Returns:
+ job (Job): The requeued Job instance
+ """
return self.failed_job_registry.requeue(self, at_front=at_front)
def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True):
@@ -888,10 +1118,15 @@ class Job:
registry.remove(self, pipeline=pipeline)
def delete(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True,
- delete_dependents=False):
+ delete_dependents: bool = False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
- on this job can optionally be deleted as well."""
+ on this job can optionally be deleted as well.
+ Args:
+ pipeline (Optional[Pipeline], optional): Redis' piepline. Defaults to None.
+ remove_from_queue (bool, optional): Whether the job should be removed from the queue. Defaults to True.
+ delete_dependents (bool, optional): Whether job dependents should also be deleted. Defaults to False.
+ """
connection = pipeline if pipeline is not None else self.connection
self._remove_from_registries(pipeline=pipeline, remove_from_queue=remove_from_queue)
@@ -902,21 +1137,29 @@ class Job:
connection.delete(self.key, self.dependents_key, self.dependencies_key)
def delete_dependents(self, pipeline: Optional['Pipeline'] = None):
- """Delete jobs depending on this job."""
+ """Delete jobs depending on this job.
+
+ Args:
+ pipeline (Optional[Pipeline], optional): Redis' piepline. Defaults to None.
+ """
connection = pipeline if pipeline is not None else self.connection
for dependent_id in self.dependent_ids:
try:
job = Job.fetch(dependent_id, connection=self.connection, serializer=self.serializer)
- job.delete(pipeline=pipeline,
- remove_from_queue=False)
+ job.delete(pipeline=pipeline, remove_from_queue=False)
except NoSuchJobError:
# It could be that the dependent job was never saved to redis
pass
connection.delete(self.dependents_key)
# Job execution
- def perform(self): # noqa
- """Invokes the job function with the job arguments."""
+ def perform(self) -> Any: # noqa
+ """The main execution method. Invokes the job function with the job arguments.
+ This is the method that actually performs the job - it's what its called by the worker.
+
+ Returns:
+ result (Any): The job result
+ """
self.connection.persist(self.key)
_job_stack.push(self)
try:
@@ -926,13 +1169,19 @@ class Job:
return self._result
def prepare_for_execution(self, worker_name: str, pipeline: 'Pipeline'):
- """Set job metadata before execution begins"""
+ """Prepares the job for execution, setting the worker name,
+ heartbeat information, status and other metadata before execution begins.
+
+ Args:
+ worker_name (str): The worker that will perform the job
+ pipeline (Pipeline): The Redis' piipeline to use
+ """
self.worker_name = worker_name
self.last_heartbeat = utcnow()
self.started_at = self.last_heartbeat
self._status = JobStatus.STARTED
mapping = {
- 'last_heartbeat': utcformat(self.last_heartbeat), # type: ignore
+ 'last_heartbeat': utcformat(self.last_heartbeat),
'status': self._status,
'started_at': utcformat(self.started_at), # type: ignore
'worker_name': worker_name
@@ -940,9 +1189,17 @@ class Job:
if self.get_redis_server_version() >= (4, 0, 0):
pipeline.hset(self.key, mapping=mapping)
else:
- pipeline.hmset(self.key, mapping)
+ pipeline.hmset(self.key, mapping=mapping)
- def _execute(self):
+ def _execute(self) -> Any:
+ """Actually runs the function with it's *args and **kwargs.
+ It will use the `func` property, which was already resolved and ready to run at this point.
+ If the function is a coroutine (it's an async function/method), then the `result`
+ will have to be awaited within an event loop.
+
+ Returns:
+ result (Any): The function result
+ """
result = self.func(*self.args, **self.kwargs)
if asyncio.iscoroutine(result):
loop = asyncio.new_event_loop()
@@ -950,36 +1207,56 @@ class Job:
return coro_result
return result
- def get_ttl(self, default_ttl: Optional[int] = None):
+ def get_ttl(self, default_ttl: Optional[int] = None) -> Optional[int]:
"""Returns ttl for a job that determines how long a job will be
persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
+
+ Args:
+ default_ttl (Optional[int]): The default time to live for the job
+
+ Returns:
+ ttl (int): The time to live
"""
return default_ttl if self.ttl is None else self.ttl
- def get_result_ttl(self, default_ttl: Optional[int] = None):
+ def get_result_ttl(self, default_ttl: Optional[int] = None) -> Optional[int]:
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
+
+ Args:
+ default_ttl (Optional[int]): The default time to live for the job result
+
+ Returns:
+ ttl (int): The time to live for the result
"""
return default_ttl if self.result_ttl is None else self.result_ttl
# Representation
- def get_call_string(self): # noqa
+ def get_call_string(self) -> Optional[str]: # noqa
"""Returns a string representation of the call, formatted as a regular
Python function invocation statement.
+
+ Returns:
+ call_repr (str): The string representation
"""
- return get_call_string(self.func_name, self.args, self.kwargs, max_length=75)
+ call_repr = get_call_string(self.func_name, self.args, self.kwargs, max_length=75)
+ return call_repr
def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None,
remove_from_queue: bool = True):
- """Prepare job for eventual deletion (if needed). This method is usually
- called after successful execution. How long we persist the job and its
- result depends on the value of ttl:
+ """Prepare job for eventual deletion (if needed).
+ This method is usually called after successful execution.
+ How long we persist the job and its result depends on the value of ttl:
- If ttl is 0, cleanup the job immediately.
- If it's a positive number, set the job to expire in X seconds.
- - If ttl is negative, don't set an expiry to it (persist
- forever)
+ - If ttl is negative, don't set an expiry to it (persist forever)
+
+ Args:
+ ttl (Optional[int], optional): Time to live. Defaults to None.
+ pipeline (Optional[Pipeline], optional): Redis' pipeline. Defaults to None.
+ remove_from_queue (bool, optional): Whether the job should be removed from the queue. Defaults to True.
"""
if ttl == 0:
self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue)
@@ -1005,10 +1282,13 @@ class Job:
job_class=self.__class__,
serializer=self.serializer)
- def get_retry_interval(self):
+ def get_retry_interval(self) -> int:
"""Returns the desired retry interval.
If number of retries is bigger than length of intervals, the first
value in the list will be used multiple times.
+
+ Returns:
+ retry_interval (int): The desired retry interval
"""
if self.retry_intervals is None:
return 0
@@ -1017,7 +1297,15 @@ class Job:
return self.retry_intervals[index]
def retry(self, queue: 'Queue', pipeline: 'Pipeline'):
- """Requeue or schedule this job for execution"""
+ """Requeue or schedule this job for execution.
+ If the the `retry_interval` was set on the job itself,
+ it will calculate a scheduled time for the job to run, and instead
+ of just regularly `enqueing` the job, it will `schedule` it.
+
+ Args:
+ queue (Queue): The queue to retry the job on
+ pipeline (Pipeline): The Redis' pipeline to use
+ """
retry_interval = self.get_retry_interval()
self.retries_left = self.retries_left - 1
if retry_interval:
@@ -1032,11 +1320,15 @@ class Job:
depend on are successfully performed. We record this relation as
a reverse dependency (a Redis set), with a key that looks something
like:
+ ..codeblock:python::
rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
This method adds the job in its dependencies' dependents sets,
and adds the job to DeferredJobRegistry.
+
+ Args:
+ pipeline (Optional[Pipeline]): The Redis' pipeline. Defaults to None
"""
from .registry import DeferredJobRegistry
@@ -1054,14 +1346,14 @@ class Job:
connection.sadd(self.dependencies_key, dependency_id)
@property
- def dependency_ids(self):
+ def dependency_ids(self) -> List[bytes]:
dependencies = self.connection.smembers(self.dependencies_key)
return [Job.key_for(_id.decode())
for _id in dependencies]
- def dependencies_are_met(self, parent_job: Optional['Job'] = None,
- pipeline: Optional['Pipeline'] = None, exclude_job_id: str = None):
- """Returns a boolean indicating if all of this job's dependencies are _FINISHED_
+ def dependencies_are_met(self, parent_job: Optional['Job'] = None, pipeline: Optional['Pipeline'] = None,
+ exclude_job_id: Optional[str] = None) -> bool:
+ """Returns a boolean indicating if all of this job's dependencies are `FINISHED`
If a pipeline is passed, all dependencies are WATCHed.
@@ -1069,6 +1361,14 @@ class Job:
This is useful when enqueueing the dependents of a _successful_ job -- that status of
`FINISHED` may not be yet set in redis, but said job is indeed _done_ and this
method is _called_ in the _stack_ of its dependents are being enqueued.
+
+ Args:
+ parent_job (Optional[Job], optional): The parent Job. Defaults to None.
+ pipeline (Optional[Pipeline], optional): The Redis' pipeline. Defaults to None.
+ exclude_job_id (Optional[str], optional): Whether to exclude the job id.. Defaults to None.
+
+ Returns:
+ are_met (bool): Whether the dependencies were met.
"""
connection = pipeline if pipeline is not None else self.connection
@@ -1076,8 +1376,7 @@ class Job:
connection.watch(*[self.key_for(dependency_id)
for dependency_id in self._dependency_ids])
- dependencies_ids = {_id.decode()
- for _id in connection.smembers(self.dependencies_key)}
+ dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)}
if exclude_job_id:
dependencies_ids.discard(exclude_job_id)
@@ -1104,10 +1403,9 @@ class Job:
dependencies_statuses = pipeline.execute()
+ allowed_statuses = [JobStatus.FINISHED]
if self.allow_dependency_failures:
- allowed_statuses = [JobStatus.FINISHED, JobStatus.FAILED]
- else:
- allowed_statuses = [JobStatus.FINISHED]
+ allowed_statuses.append(JobStatus.FAILED)
return all(
status.decode() in allowed_statuses
@@ -1121,8 +1419,18 @@ _job_stack = LocalStack()
class Retry:
- def __init__(self, max, interval: int = 0):
- """`interval` can be a positive number or a list of ints"""
+ def __init__(self, max: int, interval: Union[int, List[int]] = 0):
+ """The main object to defined Retry logics for jobs.
+
+ Args:
+ max (int): The max number of times a job should be retried
+ interval (Union[int, List[int]], optional): The interval between retries.
+ Can be a positive number (int) or a list of ints. Defaults to 0 (meaning no interval between retries).
+
+ Raises:
+ ValueError: If the `max` argument is lower than 1
+ ValueError: If the interval param is negative or the list contains negative numbers
+ """
super().__init__()
if max < 1:
raise ValueError('max: please enter a value greater than 0')
diff --git a/rq/serializers.py b/rq/serializers.py
index 00fd0a7..9e63bc7 100644
--- a/rq/serializers.py
+++ b/rq/serializers.py
@@ -1,6 +1,7 @@
from functools import partial
import pickle
import json
+from typing import Optional, Union
from .utils import import_attribute
@@ -20,11 +21,17 @@ class JSONSerializer():
return json.loads(s.decode('utf-8'), *args, **kwargs)
-def resolve_serializer(serializer: str):
+def resolve_serializer(serializer=None):
"""This function checks the user defined serializer for ('dumps', 'loads') methods
It returns a default pickle serializer if not found else it returns a MySerializer
The returned serializer objects implement ('dumps', 'loads') methods
- Also accepts a string path to serializer that will be loaded as the serializer
+ Also accepts a string path to serializer that will be loaded as the serializer.
+
+ Args:
+ serializer (Callable): The serializer to resolve.
+
+ Returns:
+ serializer (Callable): An object that implements the SerializerProtocol
"""
if not serializer:
return DefaultSerializer
diff --git a/rq/types.py b/rq/types.py
new file mode 100644
index 0000000..fe8e002
--- /dev/null
+++ b/rq/types.py
@@ -0,0 +1,17 @@
+from typing import TYPE_CHECKING, Any, Callable, List, TypeVar, Union
+
+if TYPE_CHECKING:
+ from .job import Dependency, Job
+
+
+FunctionReferenceType = TypeVar('FunctionReferenceType', str, Callable[..., Any])
+"""Custom type definition for what a `func` is in the context of a job.
+A `func` can be a string with the function import path (eg.: `myfile.mymodule.myfunc`)
+or a direct callable (function/method).
+"""
+
+
+JobDependencyType = TypeVar('JobDependencyType', 'Dependency', 'Job', str, List[Union['Dependency', 'Job']])
+"""Custom type definition for a job dependencies.
+A simple helper definition for the `depends_on` parameter when creating a job.
+"""
diff --git a/rq/utils.py b/rq/utils.py
index a9fbb1d..7f9e90b 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -148,11 +148,20 @@ def as_text(v):
raise ValueError('Unknown type %r' % type(v))
-def decode_redis_hash(h):
+def decode_redis_hash(h) -> t.Dict[str, t.Any]:
+ """Decodes the Redis hash, ensuring that keys are strings
+ Most importantly, decodes bytes strings, ensuring the dict has str keys.
+
+ Args:
+ h (Dict[Any, Any]): The Redis hash
+
+ Returns:
+ Dict[str, t.Any]: The decoded Redis data (Dictionary)
+ """
return dict((as_text(k), h[k]) for k in h)
-def import_attribute(name: str):
+def import_attribute(name: str) -> t.Callable[..., t.Any]:
"""Returns an attribute from a dotted path name. Example: `path.to.func`.
When the attribute we look for is a staticmethod, module name in its
@@ -216,11 +225,11 @@ def now():
_TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
-def utcformat(dt: dt.datetime):
+def utcformat(dt: dt.datetime) -> str:
return dt.strftime(as_text(_TIMESTAMP_FORMAT))
-def utcparse(string: str):
+def utcparse(string: str) -> dt.datetime:
try:
return datetime.datetime.strptime(string, _TIMESTAMP_FORMAT)
except ValueError:
@@ -320,13 +329,16 @@ def parse_timeout(timeout: t.Any):
return timeout
-def get_version(connection: 'Redis'):
+def get_version(connection: 'Redis') -> t.Tuple[int, int, int]:
"""
Returns tuple of Redis server version.
This function also correctly handles 4 digit redis server versions.
Args:
connection (Redis): The Redis connection.
+
+ Returns:
+ version (Tuple[int, int, int]): A tuple representing the semantic versioning format (eg. (5, 0, 9))
"""
try:
# Getting the connection info for each job tanks performance, we can cache it on the connection object
@@ -366,6 +378,9 @@ def truncate_long_string(data: str, max_length: t.Optional[int] = None) -> str:
Args:
data (str): The data to truncate
max_length (t.Optional[int], optional): The max length. Defaults to None.
+
+ Returns:
+ truncated (str): The truncated string
"""
if max_length is None:
return data