summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/asyncpg.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-12 22:09:05 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-21 02:40:02 +0300
commitc5727432736b55b7d76753307f14efdb962c2edf (patch)
tree005bd129694b56bd601d65c4cdf43828cfcd4381 /src/apscheduler/eventbrokers/asyncpg.py
parent26c4db062145fcb4f623ecfda96c42ce2414e8e1 (diff)
downloadapscheduler-c5727432736b55b7d76753307f14efdb962c2edf.tar.gz
Major refactoring
- Made SyncScheduler a synchronous wrapper for AsyncScheduler - Removed workers as a user interface - Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface - Added the current_async_scheduler contextvar - Added job executors
Diffstat (limited to 'src/apscheduler/eventbrokers/asyncpg.py')
-rw-r--r--src/apscheduler/eventbrokers/asyncpg.py52
1 files changed, 11 insertions, 41 deletions
diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py
index 7e3045e..59f7292 100644
--- a/src/apscheduler/eventbrokers/asyncpg.py
+++ b/src/apscheduler/eventbrokers/asyncpg.py
@@ -5,10 +5,8 @@ from contextlib import AsyncExitStack
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, cast
-import anyio
import asyncpg
import attrs
-import tenacity
from anyio import (
TASK_STATUS_IGNORED,
EndOfStream,
@@ -18,20 +16,16 @@ from anyio import (
from anyio.streams.memory import MemoryObjectSendStream
from asyncpg import Connection, InterfaceError
-from .. import RetrySettings
from .._events import Event
from .._exceptions import SerializationError
-from ..abc import Serializer
-from ..serializers.json import JSONSerializer
-from .async_local import LocalAsyncEventBroker
-from .base import DistributedEventBrokerMixin
+from .base import BaseExternalEventBroker
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncEngine
@attrs.define(eq=False)
-class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
+class AsyncpgEventBroker(BaseExternalEventBroker):
"""
An asynchronous, asyncpg_ based event broker that uses a PostgreSQL server to
broadcast events using its ``NOTIFY`` mechanism.
@@ -39,16 +33,13 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
.. _asyncpg: https://pypi.org/project/asyncpg/
:param connection_factory: a callable that creates an asyncpg connection
- :param serializer: the serializer used to (de)serialize events for transport
:param channel: the ``NOTIFY`` channel to use
:param max_idle_time: maximum time to let the connection go idle, before sending a
``SELECT 1`` query to prevent a connection timeout
"""
connection_factory: Callable[[], Awaitable[Connection]]
- serializer: Serializer = attrs.field(kw_only=True, factory=JSONSerializer)
channel: str = attrs.field(kw_only=True, default="apscheduler")
- retry_settings: RetrySettings = attrs.field(default=RetrySettings())
max_idle_time: float = attrs.field(kw_only=True, default=10)
_send: MemoryObjectSendStream[str] = attrs.field(init=False)
@@ -111,38 +102,17 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
factory = partial(asyncpg.connect, **connect_args)
return cls(factory, **kwargs)
- def _retry(self) -> tenacity.AsyncRetrying:
- def after_attempt(retry_state: tenacity.RetryCallState) -> None:
- self._logger.warning(
- f"{self.__class__.__name__}: connection failure "
- f"(attempt {retry_state.attempt_number}): "
- f"{retry_state.outcome.exception()}",
- )
+ @property
+ def _temporary_failure_exceptions(self) -> tuple[type[Exception]]:
+ return OSError, InterfaceError
- return tenacity.AsyncRetrying(
- stop=self.retry_settings.stop,
- wait=self.retry_settings.wait,
- retry=tenacity.retry_if_exception_type((OSError, InterfaceError)),
- after=after_attempt,
- sleep=anyio.sleep,
- reraise=True,
+ async def start(self, exit_stack: AsyncExitStack) -> None:
+ await super().start(exit_stack)
+ self._send = cast(
+ MemoryObjectSendStream[str],
+ await self._task_group.start(self._listen_notifications),
)
-
- async def start(self) -> None:
- await super().start()
- try:
- self._send = cast(
- MemoryObjectSendStream[str],
- await self._task_group.start(self._listen_notifications),
- )
- except BaseException:
- await super().stop(force=True)
- raise
-
- async def stop(self, *, force: bool = False) -> None:
- self._send.close()
- await super().stop(force=force)
- self._logger.info("Stopped event broker")
+ await exit_stack.enter_async_context(self._send)
async def _listen_notifications(self, *, task_status=TASK_STATUS_IGNORED) -> None:
conn: Connection