blob: 5c7ef7daeedccf3cd0c012cebda388f54ef6b3f3 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
from __future__ import annotations
from contextlib import AsyncExitStack
from logging import Logger, getLogger
import attrs
from .._retry import RetryMixin
from ..abc import DataStore, EventBroker, Serializer
from ..serializers.pickle import PickleSerializer
@attrs.define(kw_only=True)
class BaseDataStore(DataStore):
"""
Base class for data stores.
:param lock_expiration_delay: maximum amount of time (in seconds) that a scheduler
or worker can keep a lock on a schedule or task
"""
lock_expiration_delay: float = 30
_event_broker: EventBroker = attrs.field(init=False)
_logger: Logger = attrs.field(init=False)
async def start(
self, exit_stack: AsyncExitStack, event_broker: EventBroker
) -> None:
self._event_broker = event_broker
def __attrs_post_init__(self):
self._logger = getLogger(self.__class__.__name__)
@attrs.define(kw_only=True)
class BaseExternalDataStore(BaseDataStore, RetryMixin):
"""
Base class for data stores using an external service such as a database.
:param serializer: the serializer used to (de)serialize tasks, schedules and jobs
for storage
:param start_from_scratch: erase all existing data during startup (useful for test
suites)
"""
serializer: Serializer = attrs.field(factory=PickleSerializer)
start_from_scratch: bool = attrs.field(default=False)
|