1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
from functools import wraps
from typing import TYPE_CHECKING, Callable, Dict, Optional, List, Any, Union
if TYPE_CHECKING:
from redis import Redis
from .job import Retry
from .defaults import DEFAULT_RESULT_TTL
from .queue import Queue
from .utils import backend_class
class job: # noqa
queue_class = Queue
def __init__(
self,
queue: Union['Queue', str],
connection: Optional['Redis'] = None,
timeout: Optional[int] = None,
result_ttl: int = DEFAULT_RESULT_TTL,
ttl: Optional[int] = None,
queue_class: Optional['Queue'] = None,
depends_on: Optional[List[Any]] = None,
at_front: Optional[bool] = None,
meta: Optional[Dict[Any, Any]] = None,
description: Optional[str] = None,
failure_ttl: Optional[int] = None,
retry: Optional['Retry'] = None,
on_failure: Optional[Callable[..., Any]] = None,
on_success: Optional[Callable[..., Any]] = None,
):
"""A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required
``queue`` argument that can be either a ``Queue`` instance or a string
denoting the queue name. For example::
..codeblock:python::
>>> @job(queue='default')
>>> def simple_add(x, y):
>>> return x + y
>>> ...
>>> # Puts `simple_add` function into queue
>>> simple_add.delay(1, 2)
Args:
queue (Union['Queue', str]): The queue to use, can be the Queue class itself, or the queue name (str)
connection (Optional[Redis], optional): Redis Connection. Defaults to None.
timeout (Optional[int], optional): Job timeout. Defaults to None.
result_ttl (int, optional): Result time to live. Defaults to DEFAULT_RESULT_TTL.
ttl (Optional[int], optional): Time to live. Defaults to None.
queue_class (Optional[Queue], optional): A custom class that inherits from `Queue`. Defaults to None.
depends_on (Optional[List[Any]], optional): A list of dependents jobs. Defaults to None.
at_front (Optional[bool], optional): Whether to enqueue the job at front of the queue. Defaults to None.
meta (Optional[Dict[Any, Any]], optional): Arbitraty metadata about the job. Defaults to None.
description (Optional[str], optional): Job description. Defaults to None.
failure_ttl (Optional[int], optional): Failture time to live. Defaults to None.
retry (Optional[Retry], optional): A Retry object. Defaults to None.
on_failure (Optional[Callable[..., Any]], optional): Callable to run on failure. Defaults to None.
on_success (Optional[Callable[..., Any]], optional): Callable to run on success. Defaults to None.
"""
self.queue = queue
self.queue_class = backend_class(self, 'queue_class', override=queue_class)
self.connection = connection
self.timeout = timeout
self.result_ttl = result_ttl
self.ttl = ttl
self.meta = meta
self.depends_on = depends_on
self.at_front = at_front
self.description = description
self.failure_ttl = failure_ttl
self.retry = retry
self.on_success = on_success
self.on_failure = on_failure
def __call__(self, f):
@wraps(f)
def delay(*args, **kwargs):
if isinstance(self.queue, str):
queue = self.queue_class(name=self.queue, connection=self.connection)
else:
queue = self.queue
depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False)
if not depends_on:
depends_on = self.depends_on
if not at_front:
at_front = self.at_front
return queue.enqueue_call(
f,
args=args,
kwargs=kwargs,
timeout=self.timeout,
result_ttl=self.result_ttl,
ttl=self.ttl,
depends_on=depends_on,
job_id=job_id,
at_front=at_front,
meta=self.meta,
description=self.description,
failure_ttl=self.failure_ttl,
retry=self.retry,
on_failure=self.on_failure,
on_success=self.on_success,
)
f.delay = delay
return f
|