1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
import warnings
from contextlib import contextmanager
from typing import Optional, Tuple, Type
from redis import Connection as RedisConnection
from redis import Redis
from .local import LocalStack
class NoRedisConnectionException(Exception):
pass
@contextmanager
def Connection(connection: Optional['Redis'] = None): # noqa
"""The context manager for handling connections in a clean way.
It will push the connection to the LocalStack, and pop the connection
when leaving the context
Example:
..codeblock:python::
with Connection():
w = Worker()
w.work()
This method is deprecated on version 1.12.0 and will be removed in the future.
Pass the connection to the worker explicitly to handle Redis Connections.
Args:
connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None.
"""
warnings.warn(
"The Connection context manager is deprecated. Use the `connection` parameter instead.",
DeprecationWarning,
)
if connection is None:
connection = Redis()
push_connection(connection)
try:
yield
finally:
popped = pop_connection()
assert (
popped == connection
), 'Unexpected Redis connection was popped off the stack. Check your Redis connection setup.'
def push_connection(redis: 'Redis'):
"""
Pushes the given connection to the stack.
Args:
redis (Redis): A Redis connection
"""
warnings.warn(
"The `push_connection` function is deprecated. Pass the `connection` explicitly instead.",
DeprecationWarning,
)
_connection_stack.push(redis)
def pop_connection() -> 'Redis':
"""
Pops the topmost connection from the stack.
Returns:
redis (Redis): A Redis connection
"""
warnings.warn(
"The `pop_connection` function is deprecated. Pass the `connection` explicitly instead.",
DeprecationWarning,
)
return _connection_stack.pop()
def get_current_connection() -> 'Redis':
"""
Returns the current Redis connection (i.e. the topmost on the
connection stack).
Returns:
Redis: A Redis Connection
"""
warnings.warn(
"The `get_current_connection` function is deprecated. Pass the `connection` explicitly instead.",
DeprecationWarning,
)
return _connection_stack.top
def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis':
"""
Convenience function to resolve the given or the current connection.
Raises an exception if it cannot resolve a connection now.
Args:
connection (Optional[Redis], optional): A Redis connection. Defaults to None.
Raises:
NoRedisConnectionException: If connection couldn't be resolved.
Returns:
Redis: A Redis Connection
"""
warnings.warn(
"The `resolve_connection` function is deprecated. Pass the `connection` explicitly instead.",
DeprecationWarning,
)
if connection is not None:
return connection
connection = get_current_connection()
if connection is None:
raise NoRedisConnectionException('Could not resolve a Redis connection')
return connection
def parse_connection(connection: Redis) -> Tuple[Type[Redis], Type[RedisConnection], dict]:
connection_pool_kwargs = connection.connection_pool.connection_kwargs.copy()
connection_pool_class = connection.connection_pool.connection_class
return connection.__class__, connection_pool_class, connection_pool_kwargs
_connection_stack = LocalStack()
__all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection']
|