diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 15:18:58 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 15:18:58 +0300 |
commit | 0a6b0f683edee8bf22d85dc655ad61a8285fd312 (patch) | |
tree | e011f50fa9fe3ced09efc3ff41d8c4773c3de9ee /src | |
parent | 4c7dab12eb64d23709df9ce1a2e248ce88f54f4a (diff) | |
download | apscheduler-0a6b0f683edee8bf22d85dc655ad61a8285fd312.tar.gz |
Guard subscriptions in the synchronous local event broker with a lock
This allows the local event broker to safely iterate through the original list of subscriptions while publishing an event, instead of having to atomically make a shallow copy every time.
Diffstat (limited to 'src')
-rw-r--r-- | src/apscheduler/eventbrokers/local.py | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py index e5db7cd..24de3eb 100644 --- a/src/apscheduler/eventbrokers/local.py +++ b/src/apscheduler/eventbrokers/local.py @@ -3,6 +3,7 @@ from __future__ import annotations from asyncio import iscoroutinefunction from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack +from threading import Lock from typing import Any, Callable, Iterable, Optional import attr @@ -18,6 +19,7 @@ from .base import BaseEventBroker class LocalEventBroker(BaseEventBroker): _executor: ThreadPoolExecutor = attr.field(init=False) _exit_stack: ExitStack = attr.field(init=False) + _subscriptions_lock: Lock = attr.field(init=False, factory=Lock) def __enter__(self): self._exit_stack = ExitStack() @@ -34,16 +36,22 @@ class LocalEventBroker(BaseEventBroker): raise ValueError('Coroutine functions are not supported as callbacks on a synchronous ' 'event source') - return super().subscribe(callback, event_types) + with self._subscriptions_lock: + return super().subscribe(callback, event_types) + + def unsubscribe(self, token: object) -> None: + with self._subscriptions_lock: + super().unsubscribe(token) def publish(self, event: Event) -> None: self.publish_local(event) def publish_local(self, event: Event) -> None: event_type = type(event) - for subscription in list(self._subscriptions.values()): - if subscription.event_types is None or event_type in subscription.event_types: - self._executor.submit(self._deliver_event, subscription.callback, event) + with self._subscriptions_lock: + for subscription in self._subscriptions.values(): + if subscription.event_types is None or event_type in subscription.event_types: + self._executor.submit(self._deliver_event, subscription.callback, event) def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None: try: |