summaryrefslogtreecommitdiff
path: root/tests/test_pubsub.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_pubsub.py')
-rw-r--r--tests/test_pubsub.py340
1 files changed, 162 insertions, 178 deletions
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py
index b019bae..6df0faf 100644
--- a/tests/test_pubsub.py
+++ b/tests/test_pubsub.py
@@ -1,17 +1,14 @@
+import platform
import threading
import time
from unittest import mock
-import platform
import pytest
+
import redis
from redis.exceptions import ConnectionError
-from .conftest import (
- _get_client,
- skip_if_redis_enterprise,
- skip_if_server_version_lt
-)
+from .conftest import _get_client, skip_if_redis_enterprise, skip_if_server_version_lt
def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False):
@@ -19,7 +16,8 @@ def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False):
timeout = now + timeout
while now < timeout:
message = pubsub.get_message(
- ignore_subscribe_messages=ignore_subscribe_messages)
+ ignore_subscribe_messages=ignore_subscribe_messages
+ )
if message is not None:
return message
time.sleep(0.01)
@@ -29,39 +27,39 @@ def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False):
def make_message(type, channel, data, pattern=None):
return {
- 'type': type,
- 'pattern': pattern and pattern.encode('utf-8') or None,
- 'channel': channel and channel.encode('utf-8') or None,
- 'data': data.encode('utf-8') if isinstance(data, str) else data
+ "type": type,
+ "pattern": pattern and pattern.encode("utf-8") or None,
+ "channel": channel and channel.encode("utf-8") or None,
+ "data": data.encode("utf-8") if isinstance(data, str) else data,
}
def make_subscribe_test_data(pubsub, type):
- if type == 'channel':
+ if type == "channel":
return {
- 'p': pubsub,
- 'sub_type': 'subscribe',
- 'unsub_type': 'unsubscribe',
- 'sub_func': pubsub.subscribe,
- 'unsub_func': pubsub.unsubscribe,
- 'keys': ['foo', 'bar', 'uni' + chr(4456) + 'code']
+ "p": pubsub,
+ "sub_type": "subscribe",
+ "unsub_type": "unsubscribe",
+ "sub_func": pubsub.subscribe,
+ "unsub_func": pubsub.unsubscribe,
+ "keys": ["foo", "bar", "uni" + chr(4456) + "code"],
}
- elif type == 'pattern':
+ elif type == "pattern":
return {
- 'p': pubsub,
- 'sub_type': 'psubscribe',
- 'unsub_type': 'punsubscribe',
- 'sub_func': pubsub.psubscribe,
- 'unsub_func': pubsub.punsubscribe,
- 'keys': ['f*', 'b*', 'uni' + chr(4456) + '*']
+ "p": pubsub,
+ "sub_type": "psubscribe",
+ "unsub_type": "punsubscribe",
+ "sub_func": pubsub.psubscribe,
+ "unsub_func": pubsub.punsubscribe,
+ "keys": ["f*", "b*", "uni" + chr(4456) + "*"],
}
- assert False, f'invalid subscribe type: {type}'
+ assert False, f"invalid subscribe type: {type}"
class TestPubSubSubscribeUnsubscribe:
-
- def _test_subscribe_unsubscribe(self, p, sub_type, unsub_type, sub_func,
- unsub_func, keys):
+ def _test_subscribe_unsubscribe(
+ self, p, sub_type, unsub_type, sub_func, unsub_func, keys
+ ):
for key in keys:
assert sub_func(key) is None
@@ -79,15 +77,16 @@ class TestPubSubSubscribeUnsubscribe:
assert wait_for_message(p) == make_message(unsub_type, key, i)
def test_channel_subscribe_unsubscribe(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'channel')
+ kwargs = make_subscribe_test_data(r.pubsub(), "channel")
self._test_subscribe_unsubscribe(**kwargs)
def test_pattern_subscribe_unsubscribe(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'pattern')
+ kwargs = make_subscribe_test_data(r.pubsub(), "pattern")
self._test_subscribe_unsubscribe(**kwargs)
- def _test_resubscribe_on_reconnection(self, p, sub_type, unsub_type,
- sub_func, unsub_func, keys):
+ def _test_resubscribe_on_reconnection(
+ self, p, sub_type, unsub_type, sub_func, unsub_func, keys
+ ):
for key in keys:
assert sub_func(key) is None
@@ -109,10 +108,10 @@ class TestPubSubSubscribeUnsubscribe:
unique_channels = set()
assert len(messages) == len(keys)
for i, message in enumerate(messages):
- assert message['type'] == sub_type
- assert message['data'] == i + 1
- assert isinstance(message['channel'], bytes)
- channel = message['channel'].decode('utf-8')
+ assert message["type"] == sub_type
+ assert message["data"] == i + 1
+ assert isinstance(message["channel"], bytes)
+ channel = message["channel"].decode("utf-8")
unique_channels.add(channel)
assert len(unique_channels) == len(keys)
@@ -120,16 +119,17 @@ class TestPubSubSubscribeUnsubscribe:
assert channel in keys
def test_resubscribe_to_channels_on_reconnection(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'channel')
+ kwargs = make_subscribe_test_data(r.pubsub(), "channel")
self._test_resubscribe_on_reconnection(**kwargs)
@pytest.mark.onlynoncluster
def test_resubscribe_to_patterns_on_reconnection(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'pattern')
+ kwargs = make_subscribe_test_data(r.pubsub(), "pattern")
self._test_resubscribe_on_reconnection(**kwargs)
- def _test_subscribed_property(self, p, sub_type, unsub_type, sub_func,
- unsub_func, keys):
+ def _test_subscribed_property(
+ self, p, sub_type, unsub_type, sub_func, unsub_func, keys
+ ):
assert p.subscribed is False
sub_func(keys[0])
@@ -175,22 +175,22 @@ class TestPubSubSubscribeUnsubscribe:
assert p.subscribed is False
def test_subscribe_property_with_channels(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'channel')
+ kwargs = make_subscribe_test_data(r.pubsub(), "channel")
self._test_subscribed_property(**kwargs)
@pytest.mark.onlynoncluster
def test_subscribe_property_with_patterns(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'pattern')
+ kwargs = make_subscribe_test_data(r.pubsub(), "pattern")
self._test_subscribed_property(**kwargs)
def test_ignore_all_subscribe_messages(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
checks = (
- (p.subscribe, 'foo'),
- (p.unsubscribe, 'foo'),
- (p.psubscribe, 'f*'),
- (p.punsubscribe, 'f*'),
+ (p.subscribe, "foo"),
+ (p.unsubscribe, "foo"),
+ (p.psubscribe, "f*"),
+ (p.punsubscribe, "f*"),
)
assert p.subscribed is False
@@ -204,10 +204,10 @@ class TestPubSubSubscribeUnsubscribe:
p = r.pubsub()
checks = (
- (p.subscribe, 'foo'),
- (p.unsubscribe, 'foo'),
- (p.psubscribe, 'f*'),
- (p.punsubscribe, 'f*'),
+ (p.subscribe, "foo"),
+ (p.unsubscribe, "foo"),
+ (p.psubscribe, "f*"),
+ (p.punsubscribe, "f*"),
)
assert p.subscribed is False
@@ -219,16 +219,17 @@ class TestPubSubSubscribeUnsubscribe:
assert p.subscribed is False
def test_sub_unsub_resub_channels(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'channel')
+ kwargs = make_subscribe_test_data(r.pubsub(), "channel")
self._test_sub_unsub_resub(**kwargs)
@pytest.mark.onlynoncluster
def test_sub_unsub_resub_patterns(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'pattern')
+ kwargs = make_subscribe_test_data(r.pubsub(), "pattern")
self._test_sub_unsub_resub(**kwargs)
- def _test_sub_unsub_resub(self, p, sub_type, unsub_type, sub_func,
- unsub_func, keys):
+ def _test_sub_unsub_resub(
+ self, p, sub_type, unsub_type, sub_func, unsub_func, keys
+ ):
# https://github.com/andymccurdy/redis-py/issues/764
key = keys[0]
sub_func(key)
@@ -241,15 +242,16 @@ class TestPubSubSubscribeUnsubscribe:
assert p.subscribed is True
def test_sub_unsub_all_resub_channels(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'channel')
+ kwargs = make_subscribe_test_data(r.pubsub(), "channel")
self._test_sub_unsub_all_resub(**kwargs)
def test_sub_unsub_all_resub_patterns(self, r):
- kwargs = make_subscribe_test_data(r.pubsub(), 'pattern')
+ kwargs = make_subscribe_test_data(r.pubsub(), "pattern")
self._test_sub_unsub_all_resub(**kwargs)
- def _test_sub_unsub_all_resub(self, p, sub_type, unsub_type, sub_func,
- unsub_func, keys):
+ def _test_sub_unsub_all_resub(
+ self, p, sub_type, unsub_type, sub_func, unsub_func, keys
+ ):
# https://github.com/andymccurdy/redis-py/issues/764
key = keys[0]
sub_func(key)
@@ -271,22 +273,22 @@ class TestPubSubMessages:
def test_published_message_to_channel(self, r):
p = r.pubsub()
- p.subscribe('foo')
- assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
- assert r.publish('foo', 'test message') == 1
+ p.subscribe("foo")
+ assert wait_for_message(p) == make_message("subscribe", "foo", 1)
+ assert r.publish("foo", "test message") == 1
message = wait_for_message(p)
assert isinstance(message, dict)
- assert message == make_message('message', 'foo', 'test message')
+ assert message == make_message("message", "foo", "test message")
def test_published_message_to_pattern(self, r):
p = r.pubsub()
- p.subscribe('foo')
- p.psubscribe('f*')
- assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
- assert wait_for_message(p) == make_message('psubscribe', 'f*', 2)
+ p.subscribe("foo")
+ p.psubscribe("f*")
+ assert wait_for_message(p) == make_message("subscribe", "foo", 1)
+ assert wait_for_message(p) == make_message("psubscribe", "f*", 2)
# 1 to pattern, 1 to channel
- assert r.publish('foo', 'test message') == 2
+ assert r.publish("foo", "test message") == 2
message1 = wait_for_message(p)
message2 = wait_for_message(p)
@@ -294,8 +296,8 @@ class TestPubSubMessages:
assert isinstance(message2, dict)
expected = [
- make_message('message', 'foo', 'test message'),
- make_message('pmessage', 'foo', 'test message', pattern='f*')
+ make_message("message", "foo", "test message"),
+ make_message("pmessage", "foo", "test message", pattern="f*"),
]
assert message1 in expected
@@ -306,67 +308,65 @@ class TestPubSubMessages:
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe(foo=self.message_handler)
assert wait_for_message(p) is None
- assert r.publish('foo', 'test message') == 1
+ assert r.publish("foo", "test message") == 1
assert wait_for_message(p) is None
- assert self.message == make_message('message', 'foo', 'test message')
+ assert self.message == make_message("message", "foo", "test message")
@pytest.mark.onlynoncluster
def test_pattern_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
- p.psubscribe(**{'f*': self.message_handler})
+ p.psubscribe(**{"f*": self.message_handler})
assert wait_for_message(p) is None
- assert r.publish('foo', 'test message') == 1
+ assert r.publish("foo", "test message") == 1
assert wait_for_message(p) is None
- assert self.message == make_message('pmessage', 'foo', 'test message',
- pattern='f*')
+ assert self.message == make_message(
+ "pmessage", "foo", "test message", pattern="f*"
+ )
def test_unicode_channel_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
- channel = 'uni' + chr(4456) + 'code'
+ channel = "uni" + chr(4456) + "code"
channels = {channel: self.message_handler}
p.subscribe(**channels)
assert wait_for_message(p) is None
- assert r.publish(channel, 'test message') == 1
+ assert r.publish(channel, "test message") == 1
assert wait_for_message(p) is None
- assert self.message == make_message('message', channel, 'test message')
+ assert self.message == make_message("message", channel, "test message")
@pytest.mark.onlynoncluster
# see: https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
# #known-limitations-with-pubsub
def test_unicode_pattern_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
- pattern = 'uni' + chr(4456) + '*'
- channel = 'uni' + chr(4456) + 'code'
+ pattern = "uni" + chr(4456) + "*"
+ channel = "uni" + chr(4456) + "code"
p.psubscribe(**{pattern: self.message_handler})
assert wait_for_message(p) is None
- assert r.publish(channel, 'test message') == 1
+ assert r.publish(channel, "test message") == 1
assert wait_for_message(p) is None
- assert self.message == make_message('pmessage', channel,
- 'test message', pattern=pattern)
+ assert self.message == make_message(
+ "pmessage", channel, "test message", pattern=pattern
+ )
def test_get_message_without_subscribe(self, r):
p = r.pubsub()
with pytest.raises(RuntimeError) as info:
p.get_message()
- expect = ('connection not set: '
- 'did you forget to call subscribe() or psubscribe()?')
+ expect = (
+ "connection not set: " "did you forget to call subscribe() or psubscribe()?"
+ )
assert expect in info.exconly()
class TestPubSubAutoDecoding:
"These tests only validate that we get unicode values back"
- channel = 'uni' + chr(4456) + 'code'
- pattern = 'uni' + chr(4456) + '*'
- data = 'abc' + chr(4458) + '123'
+ channel = "uni" + chr(4456) + "code"
+ pattern = "uni" + chr(4456) + "*"
+ data = "abc" + chr(4458) + "123"
def make_message(self, type, channel, data, pattern=None):
- return {
- 'type': type,
- 'channel': channel,
- 'pattern': pattern,
- 'data': data
- }
+ return {"type": type, "channel": channel, "pattern": pattern, "data": data}
def setup_method(self, method):
self.message = None
@@ -381,44 +381,37 @@ class TestPubSubAutoDecoding:
def test_channel_subscribe_unsubscribe(self, r):
p = r.pubsub()
p.subscribe(self.channel)
- assert wait_for_message(p) == self.make_message('subscribe',
- self.channel, 1)
+ assert wait_for_message(p) == self.make_message("subscribe", self.channel, 1)
p.unsubscribe(self.channel)
- assert wait_for_message(p) == self.make_message('unsubscribe',
- self.channel, 0)
+ assert wait_for_message(p) == self.make_message("unsubscribe", self.channel, 0)
def test_pattern_subscribe_unsubscribe(self, r):
p = r.pubsub()
p.psubscribe(self.pattern)
- assert wait_for_message(p) == self.make_message('psubscribe',
- self.pattern, 1)
+ assert wait_for_message(p) == self.make_message("psubscribe", self.pattern, 1)
p.punsubscribe(self.pattern)
- assert wait_for_message(p) == self.make_message('punsubscribe',
- self.pattern, 0)
+ assert wait_for_message(p) == self.make_message("punsubscribe", self.pattern, 0)
def test_channel_publish(self, r):
p = r.pubsub()
p.subscribe(self.channel)
- assert wait_for_message(p) == self.make_message('subscribe',
- self.channel, 1)
+ assert wait_for_message(p) == self.make_message("subscribe", self.channel, 1)
r.publish(self.channel, self.data)
- assert wait_for_message(p) == self.make_message('message',
- self.channel,
- self.data)
+ assert wait_for_message(p) == self.make_message(
+ "message", self.channel, self.data
+ )
@pytest.mark.onlynoncluster
def test_pattern_publish(self, r):
p = r.pubsub()
p.psubscribe(self.pattern)
- assert wait_for_message(p) == self.make_message('psubscribe',
- self.pattern, 1)
+ assert wait_for_message(p) == self.make_message("psubscribe", self.pattern, 1)
r.publish(self.channel, self.data)
- assert wait_for_message(p) == self.make_message('pmessage',
- self.channel,
- self.data,
- pattern=self.pattern)
+ assert wait_for_message(p) == self.make_message(
+ "pmessage", self.channel, self.data, pattern=self.pattern
+ )
def test_channel_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
@@ -426,18 +419,16 @@ class TestPubSubAutoDecoding:
assert wait_for_message(p) is None
r.publish(self.channel, self.data)
assert wait_for_message(p) is None
- assert self.message == self.make_message('message', self.channel,
- self.data)
+ assert self.message == self.make_message("message", self.channel, self.data)
# test that we reconnected to the correct channel
self.message = None
p.connection.disconnect()
assert wait_for_message(p) is None # should reconnect
- new_data = self.data + 'new data'
+ new_data = self.data + "new data"
r.publish(self.channel, new_data)
assert wait_for_message(p) is None
- assert self.message == self.make_message('message', self.channel,
- new_data)
+ assert self.message == self.make_message("message", self.channel, new_data)
def test_pattern_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
@@ -445,24 +436,24 @@ class TestPubSubAutoDecoding:
assert wait_for_message(p) is None
r.publish(self.channel, self.data)
assert wait_for_message(p) is None
- assert self.message == self.make_message('pmessage', self.channel,
- self.data,
- pattern=self.pattern)
+ assert self.message == self.make_message(
+ "pmessage", self.channel, self.data, pattern=self.pattern
+ )
# test that we reconnected to the correct pattern
self.message = None
p.connection.disconnect()
assert wait_for_message(p) is None # should reconnect
- new_data = self.data + 'new data'
+ new_data = self.data + "new data"
r.publish(self.channel, new_data)
assert wait_for_message(p) is None
- assert self.message == self.make_message('pmessage', self.channel,
- new_data,
- pattern=self.pattern)
+ assert self.message == self.make_message(
+ "pmessage", self.channel, new_data, pattern=self.pattern
+ )
def test_context_manager(self, r):
with r.pubsub() as pubsub:
- pubsub.subscribe('foo')
+ pubsub.subscribe("foo")
assert pubsub.connection is not None
assert pubsub.connection is None
@@ -471,86 +462,82 @@ class TestPubSubAutoDecoding:
class TestPubSubRedisDown:
-
def test_channel_subscribe(self, r):
- r = redis.Redis(host='localhost', port=6390)
+ r = redis.Redis(host="localhost", port=6390)
p = r.pubsub()
with pytest.raises(ConnectionError):
- p.subscribe('foo')
+ p.subscribe("foo")
class TestPubSubSubcommands:
-
@pytest.mark.onlynoncluster
- @skip_if_server_version_lt('2.8.0')
+ @skip_if_server_version_lt("2.8.0")
def test_pubsub_channels(self, r):
p = r.pubsub()
- p.subscribe('foo', 'bar', 'baz', 'quux')
+ p.subscribe("foo", "bar", "baz", "quux")
for i in range(4):
- assert wait_for_message(p)['type'] == 'subscribe'
- expected = [b'bar', b'baz', b'foo', b'quux']
+ assert wait_for_message(p)["type"] == "subscribe"
+ expected = [b"bar", b"baz", b"foo", b"quux"]
assert all([channel in r.pubsub_channels() for channel in expected])
@pytest.mark.onlynoncluster
- @skip_if_server_version_lt('2.8.0')
+ @skip_if_server_version_lt("2.8.0")
def test_pubsub_numsub(self, r):
p1 = r.pubsub()
- p1.subscribe('foo', 'bar', 'baz')
+ p1.subscribe("foo", "bar", "baz")
for i in range(3):
- assert wait_for_message(p1)['type'] == 'subscribe'
+ assert wait_for_message(p1)["type"] == "subscribe"
p2 = r.pubsub()
- p2.subscribe('bar', 'baz')
+ p2.subscribe("bar", "baz")
for i in range(2):
- assert wait_for_message(p2)['type'] == 'subscribe'
+ assert wait_for_message(p2)["type"] == "subscribe"
p3 = r.pubsub()
- p3.subscribe('baz')
- assert wait_for_message(p3)['type'] == 'subscribe'
+ p3.subscribe("baz")
+ assert wait_for_message(p3)["type"] == "subscribe"
- channels = [(b'foo', 1), (b'bar', 2), (b'baz', 3)]
- assert r.pubsub_numsub('foo', 'bar', 'baz') == channels
+ channels = [(b"foo", 1), (b"bar", 2), (b"baz", 3)]
+ assert r.pubsub_numsub("foo", "bar", "baz") == channels
- @skip_if_server_version_lt('2.8.0')
+ @skip_if_server_version_lt("2.8.0")
def test_pubsub_numpat(self, r):
p = r.pubsub()
- p.psubscribe('*oo', '*ar', 'b*z')
+ p.psubscribe("*oo", "*ar", "b*z")
for i in range(3):
- assert wait_for_message(p)['type'] == 'psubscribe'
+ assert wait_for_message(p)["type"] == "psubscribe"
assert r.pubsub_numpat() == 3
class TestPubSubPings:
-
- @skip_if_server_version_lt('3.0.0')
+ @skip_if_server_version_lt("3.0.0")
def test_send_pubsub_ping(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
- p.subscribe('foo')
+ p.subscribe("foo")
p.ping()
- assert wait_for_message(p) == make_message(type='pong', channel=None,
- data='',
- pattern=None)
+ assert wait_for_message(p) == make_message(
+ type="pong", channel=None, data="", pattern=None
+ )
- @skip_if_server_version_lt('3.0.0')
+ @skip_if_server_version_lt("3.0.0")
def test_send_pubsub_ping_message(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
- p.subscribe('foo')
- p.ping(message='hello world')
- assert wait_for_message(p) == make_message(type='pong', channel=None,
- data='hello world',
- pattern=None)
+ p.subscribe("foo")
+ p.ping(message="hello world")
+ assert wait_for_message(p) == make_message(
+ type="pong", channel=None, data="hello world", pattern=None
+ )
@pytest.mark.onlynoncluster
class TestPubSubConnectionKilled:
-
- @skip_if_server_version_lt('3.0.0')
+ @skip_if_server_version_lt("3.0.0")
@skip_if_redis_enterprise
def test_connection_error_raised_when_connection_dies(self, r):
p = r.pubsub()
- p.subscribe('foo')
- assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
+ p.subscribe("foo")
+ assert wait_for_message(p) == make_message("subscribe", "foo", 1)
for client in r.client_list():
- if client['cmd'] == 'subscribe':
- r.client_kill_filter(_id=client['id'])
+ if client["cmd"] == "subscribe":
+ r.client_kill_filter(_id=client["id"])
with pytest.raises(ConnectionError):
wait_for_message(p)
@@ -558,15 +545,15 @@ class TestPubSubConnectionKilled:
class TestPubSubTimeouts:
def test_get_message_with_timeout_returns_none(self, r):
p = r.pubsub()
- p.subscribe('foo')
- assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
+ p.subscribe("foo")
+ assert wait_for_message(p) == make_message("subscribe", "foo", 1)
assert p.get_message(timeout=0.01) is None
class TestPubSubWorkerThread:
-
- @pytest.mark.skipif(platform.python_implementation() == 'PyPy',
- reason="Pypy threading issue")
+ @pytest.mark.skipif(
+ platform.python_implementation() == "PyPy", reason="Pypy threading issue"
+ )
def test_pubsub_worker_thread_exception_handler(self, r):
event = threading.Event()
@@ -575,12 +562,10 @@ class TestPubSubWorkerThread:
event.set()
p = r.pubsub()
- p.subscribe(**{'foo': lambda m: m})
- with mock.patch.object(p, 'get_message',
- side_effect=Exception('error')):
+ p.subscribe(**{"foo": lambda m: m})
+ with mock.patch.object(p, "get_message", side_effect=Exception("error")):
pubsub_thread = p.run_in_thread(
- daemon=True,
- exception_handler=exception_handler
+ daemon=True, exception_handler=exception_handler
)
assert event.wait(timeout=1.0)
@@ -589,10 +574,9 @@ class TestPubSubWorkerThread:
class TestPubSubDeadlock:
- @pytest.mark.timeout(30, method='thread')
+ @pytest.mark.timeout(30, method="thread")
def test_pubsub_deadlock(self, master_host):
- pool = redis.ConnectionPool(host=master_host[0],
- port=master_host[1])
+ pool = redis.ConnectionPool(host=master_host[0], port=master_host[1])
r = redis.Redis(connection_pool=pool)
for i in range(60):