summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/redis.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/eventbrokers/redis.py')
-rw-r--r--src/apscheduler/eventbrokers/redis.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py
index 5211181..0f14e3e 100644
--- a/src/apscheduler/eventbrokers/redis.py
+++ b/src/apscheduler/eventbrokers/redis.py
@@ -19,7 +19,7 @@ from .local import LocalEventBroker
class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
client: Redis
serializer: Serializer = attrs.field(factory=JSONSerializer)
- channel: str = attrs.field(kw_only=True, default='apscheduler')
+ channel: str = attrs.field(kw_only=True, default="apscheduler")
message_poll_interval: float = attrs.field(kw_only=True, default=0.05)
_stopped: bool = attrs.field(init=False, default=True)
_ready_future: Future[None] = attrs.field(init=False)
@@ -33,7 +33,9 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
def __enter__(self):
self._stopped = False
self._ready_future = Future()
- self._thread = Thread(target=self._listen_messages, daemon=True, name='Redis subscriber')
+ self._thread = Thread(
+ target=self._listen_messages, daemon=True, name="Redis subscriber"
+ )
self._thread.start()
self._ready_future.result(10)
return super().__enter__()
@@ -62,12 +64,12 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
try:
while not self._stopped:
msg = pubsub.get_message(timeout=self.message_poll_interval)
- if msg and isinstance(msg['data'], bytes):
- event = self.reconstitute_event(msg['data'])
+ if msg and isinstance(msg["data"], bytes):
+ event = self.reconstitute_event(msg["data"])
if event is not None:
self.publish_local(event)
except BaseException:
- self._logger.exception('Subscriber crashed')
+ self._logger.exception("Subscriber crashed")
raise
finally:
pubsub.close()