1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
from datetime import datetime, timedelta, tzinfo
from typing import Optional, Union
from ..abc import Trigger
from ..marshalling import marshal_date, unmarshal_date
from ..validators import as_aware_datetime, as_timezone, require_state_version
class IntervalTrigger(Trigger):
"""
Triggers on specified intervals.
The first trigger time is on ``start_time`` which is the moment the trigger was created unless
specifically overridden. If ``end_time`` is specified, the last trigger time will be at or
before that time. If no ``end_time`` has been given, the trigger will produce new trigger times
as long as the resulting datetimes are valid datetimes in Python.
:param weeks: number of weeks to wait
:param days: number of days to wait
:param hours: number of hours to wait
:param minutes: number of minutes to wait
:param seconds: number of seconds to wait
:param microseconds: number of microseconds to wait
:param start_time: first trigger date/time
:param end_time: latest possible date/time to trigger on
:param timezone: time zone used to make any passed naive datetimes timezone aware
"""
__slots__ = ('weeks', 'days', 'hours', 'minutes', 'seconds', 'microseconds', 'start_time',
'end_time', '_interval', '_last_fire_time')
def __init__(self, *, weeks: float = 0, days: float = 0, hours: float = 0, minutes: float = 0,
seconds: float = 0, microseconds: float = 0,
start_time: Optional[datetime] = None, end_time: Optional[datetime] = None,
timezone: Union[tzinfo, str] = 'local'):
self.weeks = weeks
self.days = days
self.hours = hours
self.minutes = minutes
self.seconds = seconds
self.microseconds = microseconds
timezone = as_timezone(timezone)
self.start_time = as_aware_datetime(start_time or datetime.now(), timezone)
self.end_time = as_aware_datetime(end_time, timezone)
self._interval = timedelta(weeks=self.weeks, days=self.days, hours=self.hours,
minutes=self.minutes, seconds=self.seconds,
microseconds=self.microseconds)
self._last_fire_time = None
if self._interval.total_seconds() <= 0:
raise ValueError('The time interval must be positive')
if self.end_time and self.end_time < self.start_time:
raise ValueError('end_time cannot be earlier than start_time')
def next(self) -> Optional[datetime]:
if self._last_fire_time is None:
self._last_fire_time = self.start_time
else:
self._last_fire_time = self._last_fire_time + self._interval
if self.end_time is None or self._last_fire_time <= self.end_time:
return self._last_fire_time
else:
return None
def __getstate__(self):
return {
'version': 1,
'interval': [self.weeks, self.days, self.hours, self.minutes, self.seconds,
self.microseconds],
'start_time': marshal_date(self.start_time),
'end_time': marshal_date(self.end_time),
'last_fire_time': marshal_date(self._last_fire_time)
}
def __setstate__(self, state):
require_state_version(self, state, 1)
self.weeks, self.days, self.hours, self.minutes, self.seconds, self.microseconds = \
state['interval']
self.start_time = unmarshal_date(state['start_time'])
self.end_time = unmarshal_date(state['end_time'])
self._last_fire_time = unmarshal_date(state['last_fire_time'])
self._interval = timedelta(weeks=self.weeks, days=self.days, hours=self.hours,
minutes=self.minutes, seconds=self.seconds,
microseconds=self.microseconds)
def __repr__(self):
fields = []
for field in 'weeks', 'days', 'hours', 'minutes', 'seconds', 'microseconds':
value = getattr(self, field)
if value > 0:
fields.append(f'{field}={value}')
fields.append(f"start_time='{self.start_time}'")
if self.end_time:
fields.append(f"end_time='{self.end_time}'")
return f'{self.__class__.__name__}({", ".join(fields)})'
|