summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2020-05-23 13:51:26 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2020-05-23 14:47:58 +0300
commit17a353f073a5e34bf033ad0013577063fcf6741c (patch)
treeae56d3f2bfff206945ece6adf78870775e9bb506
parent367068f6a9bd0b63b21b5a1832a0b2af4344f4c2 (diff)
downloadapscheduler-17a353f073a5e34bf033ad0013577063fcf6741c.tar.gz
Refactored the triggers
Triggers are now stateful, allowing for correct operation of the combining triggers. They are also JSON serializable now.
-rw-r--r--apscheduler/abc.py256
-rw-r--r--apscheduler/events.py21
-rw-r--r--apscheduler/exceptions.py42
-rw-r--r--apscheduler/serializers/__init__.py (renamed from apscheduler/jobstores/__init__.py)0
-rw-r--r--apscheduler/serializers/cbor.py36
-rw-r--r--apscheduler/serializers/json.py45
-rw-r--r--apscheduler/serializers/pickle.py16
-rw-r--r--apscheduler/triggers/base.py46
-rw-r--r--apscheduler/triggers/calendarinterval.py102
-rw-r--r--apscheduler/triggers/combining.py178
-rw-r--r--apscheduler/triggers/cron/__init__.py245
-rw-r--r--apscheduler/triggers/cron/expressions.py190
-rw-r--r--apscheduler/triggers/cron/fields.py90
-rw-r--r--apscheduler/triggers/date.py59
-rw-r--r--apscheduler/triggers/interval.py170
-rw-r--r--apscheduler/util.py187
-rw-r--r--apscheduler/validators.py166
-rw-r--r--docs/modules/triggers/cron.rst15
-rw-r--r--docs/modules/triggers/interval.rst16
-rw-r--r--setup.cfg39
-rw-r--r--tests/conftest.py41
-rw-r--r--tests/test_expressions.py173
-rw-r--r--tests/test_triggers.py693
-rw-r--r--tests/test_util.py147
-rw-r--r--tests/triggers/test_calendarinterval.py95
-rw-r--r--tests/triggers/test_combining.py94
-rw-r--r--tests/triggers/test_cron.py321
-rw-r--r--tests/triggers/test_date.py15
-rw-r--r--tests/triggers/test_interval.py45
29 files changed, 1642 insertions, 1901 deletions
diff --git a/apscheduler/abc.py b/apscheduler/abc.py
new file mode 100644
index 0000000..0b7e89a
--- /dev/null
+++ b/apscheduler/abc.py
@@ -0,0 +1,256 @@
+from abc import ABCMeta, abstractmethod
+from base64 import b64encode, b64decode
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta
+from typing import (
+ Callable, Iterable, Iterator, Mapping, Any, NoReturn, Optional, Union, AsyncIterable, Dict,
+ FrozenSet, List, Set)
+
+from .events import Event
+
+
+class Trigger(Iterator[datetime], metaclass=ABCMeta):
+ """Abstract base class that defines the interface that every trigger must implement."""
+
+ __slots__ = ()
+
+ @abstractmethod
+ def next(self) -> Optional[datetime]:
+ """
+ Return the next datetime to fire on.
+
+ If no such datetime can be calculated, ``None`` is returned.
+ :raises apscheduler.exceptions.MaxIterationsReached:
+ """
+
+ @abstractmethod
+ def __getstate__(self):
+ """Return the (JSON compatible) serializable state of the trigger."""
+
+ @abstractmethod
+ def __setstate__(self, state):
+ """Initialize an empty instance from an existing state."""
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> datetime:
+ dateval = self.next()
+ if dateval is None:
+ raise StopIteration
+ else:
+ return dateval
+
+
+@dataclass
+class Task:
+ id: str
+ func: Callable
+ max_instances: Optional[int] = None
+ metadata_arg: Optional[str] = None
+ stateful: bool = False
+ misfire_grace_time: Optional[timedelta] = None
+
+
+@dataclass
+class Schedule:
+ id: str
+ task_id: str
+ trigger: Trigger
+ args: tuple = ()
+ kwargs: Dict[str, Any] = field(default_factory=dict)
+ coalesce: bool = True
+ misfire_grace_time: Optional[timedelta] = None
+ tags: Optional[FrozenSet[str]] = frozenset()
+ last_fire_time: Optional[datetime] = field(init=False, default=None)
+ next_fire_time: Optional[datetime] = field(init=False, default=None)
+
+
+@dataclass(frozen=True)
+class Job:
+ func_ref: str
+ args: Optional[tuple] = None
+ kwargs: Optional[Dict[str, Any]] = None
+ schedule_id: Optional[str] = None
+ scheduled_start_time: Optional[datetime] = None
+ start_deadline: Optional[datetime] = None
+ tags: Optional[FrozenSet[str]] = frozenset()
+
+
+class Serializer(metaclass=ABCMeta):
+ __slots__ = ()
+
+ @abstractmethod
+ def serialize(self, obj) -> bytes:
+ pass
+
+ def serialize_to_unicode(self, obj) -> str:
+ return b64encode(self.serialize(obj)).decode('ascii')
+
+ @abstractmethod
+ def deserialize(self, serialized: bytes):
+ pass
+
+ def deserialize_from_unicode(self, serialized: str):
+ return self.deserialize(b64decode(serialized))
+
+
+class DataStore(metaclass=ABCMeta):
+ __slots__ = ()
+
+ async def __aenter__(self):
+ await self.start()
+ return self
+
+ async def __aexit__(self):
+ await self.stop()
+
+ async def start(self) -> None:
+ pass
+
+ async def stop(self) -> None:
+ pass
+
+ @abstractmethod
+ async def add_or_update_schedule(self, schedule: Schedule) -> None:
+ """Add or update the given schedule in the store."""
+
+ @abstractmethod
+ async def remove_schedule(self, schedule_id: str) -> None:
+ """Remove the designated schedule from the store."""
+
+ @abstractmethod
+ async def remove_all_schedules(self) -> None:
+ """Remove all schedules from the store."""
+
+ @abstractmethod
+ async def get_all_schedules(self) -> List[Schedule]:
+ """Get a list of all schedules, sorted on the "id" attribute."""
+
+ @abstractmethod
+ async def get_next_fire_time(self) -> Optional[datetime]:
+ """
+ Return the earliest fire time among all unclaimed schedules.
+
+ If no running, unclaimed schedules exist, ``None`` is returned.
+ """
+
+ @abstractmethod
+ async def acquire_due_schedules(self, scheduler_id: str) -> List[Schedule]:
+ """
+ Acquire an undefined amount of due schedules not claimed by any other scheduler.
+
+ This method claims due schedules for the given scheduler and returns them.
+ When the scheduler has updated the objects, it calls :meth:`release_due_schedules` to
+ release the claim on them.
+ """
+
+ @abstractmethod
+ async def release_due_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
+ """
+ Update the given schedules and release the claim on them held by this scheduler.
+
+ This method should do the following:
+
+ #. Remove any of the schedules in the store that have no next fire time
+ #. Update the schedules that do have a next fire time
+ #. Release any locks held on the schedules by this scheduler
+
+ :param scheduler_id: identifier of the scheduler
+ :param schedules: schedules previously acquired using :meth:`acquire_due_schedules`
+ """
+
+ @abstractmethod
+ async def acquire_job(self, worker_id: str, tags: Set[str]) -> Job:
+ """
+ Claim and return the next matching job from the queue.
+
+ :return: the acquired job
+ """
+
+ @abstractmethod
+ async def release_job(self, job: Job) -> None:
+ """Remove the given job from the queue."""
+
+
+class EventHub(metaclass=ABCMeta):
+ __slots__ = ()
+
+ async def start(self) -> None:
+ pass
+
+ async def stop(self) -> None:
+ pass
+
+ @abstractmethod
+ async def publish(self, event: Event) -> None:
+ """Publish an event."""
+
+ @abstractmethod
+ async def subscribe(self) -> AsyncIterable[Event]:
+ """Return an asynchronous iterable yielding newly received events."""
+
+
+class AsyncScheduler(metaclass=ABCMeta):
+ __slots__ = ()
+
+ @abstractmethod
+ def define_task(self, func: Callable, task_id: Optional[str] = None, *,
+ max_instances: Optional[int],
+ misfire_grace_time: Union[float, timedelta]) -> str:
+ if not task_id:
+ task_id = f'{func.__module__}.{func.__qualname__}'
+ if isinstance(misfire_grace_time, float):
+ misfire_grace_time = timedelta(misfire_grace_time)
+
+ task = Task(id=task_id, func=func, max_instances=max_instances,
+ misfire_grace_time=misfire_grace_time)
+
+ return task_id
+
+ @abstractmethod
+ async def add_schedule(self, task: Union[str, Callable], trigger: Trigger, *, args: Iterable,
+ kwargs: Mapping[str, Any]) -> str:
+ """
+
+
+
+ :param task: callable or ID of a predefined task
+ :param trigger: trigger to define the run times of the schedule
+ :param args: positional arguments to pass to the task callable
+ :param kwargs: keyword arguments to pass to the task callable
+ :return: identifier of the created schedule
+ """
+
+ @abstractmethod
+ async def remove_schedule(self, schedule_id: str) -> None:
+ """Removes the designated schedule."""
+
+ @abstractmethod
+ async def run(self) -> NoReturn:
+ """
+ Runs the scheduler loop.
+
+ This method does not return.
+ """
+
+
+class SyncScheduler(metaclass=ABCMeta):
+ __slots__ = ()
+
+ @abstractmethod
+ def add_schedule(self, task: Callable, trigger: Trigger, *, args: Iterable,
+ kwargs: Mapping[str, Any]) -> str:
+ pass
+
+ @abstractmethod
+ def remove_schedule(self, schedule_id: str) -> None:
+ pass
+
+ @abstractmethod
+ def run(self) -> NoReturn:
+ pass
+
+ add_schedule.__doc__ = AsyncScheduler.add_schedule.__doc__
+ remove_schedule.__doc__ = AsyncScheduler.remove_schedule.__doc__
+ run.__doc__ = AsyncScheduler.run.__doc__
diff --git a/apscheduler/events.py b/apscheduler/events.py
index 3919674..9997176 100644
--- a/apscheduler/events.py
+++ b/apscheduler/events.py
@@ -5,6 +5,10 @@ __all__ = ('EVENT_SCHEDULER_STARTED', 'EVENT_SCHEDULER_SHUTDOWN', 'EVENT_SCHEDUL
'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', 'EVENT_JOB_SUBMITTED', 'EVENT_JOB_MAX_INSTANCES',
'SchedulerEvent', 'JobEvent', 'JobExecutionEvent', 'JobSubmissionEvent')
+from datetime import datetime
+from typing import Optional
+
+from dataclasses import dataclass
EVENT_SCHEDULER_STARTED = EVENT_SCHEDULER_START = 2 ** 0
EVENT_SCHEDULER_SHUTDOWN = 2 ** 1
@@ -30,6 +34,23 @@ EVENT_ALL = (EVENT_SCHEDULER_STARTED | EVENT_SCHEDULER_SHUTDOWN | EVENT_SCHEDULE
EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_SUBMITTED | EVENT_JOB_MAX_INSTANCES)
+@dataclass
+class Event:
+ type: int
+ scheduler_id: str
+
+
+@dataclass
+class ScheduleEvent(Event):
+ schedule_id: str
+ next_fire_time: Optional[datetime]
+
+
+@dataclass
+class JobEent(Event):
+ job_id: str
+
+
class SchedulerEvent:
"""
An event that concerns the scheduler itself.
diff --git a/apscheduler/exceptions.py b/apscheduler/exceptions.py
new file mode 100644
index 0000000..48fe9c1
--- /dev/null
+++ b/apscheduler/exceptions.py
@@ -0,0 +1,42 @@
+
+
+class JobLookupError(KeyError):
+ """Raised when the job store cannot find a job for update or removal."""
+
+ def __init__(self, job_id):
+ super().__init__(u'No job by the id of %s was found' % job_id)
+
+
+class ConflictingIdError(KeyError):
+ """Raised when the uniqueness of job IDs is being violated."""
+
+ def __init__(self, job_id):
+ super().__init__(
+ u'Job identifier (%s) conflicts with an existing job' % job_id)
+
+
+class TransientJobError(ValueError):
+ """
+ Raised when an attempt to add transient (with no func_ref) job to a persistent job store is
+ detected.
+ """
+
+ def __init__(self, job_id):
+ super().__init__(
+ f'Job ({job_id}) cannot be added to this job store because a reference to the '
+ f'callable could not be determined.')
+
+
+class SerializationError(Exception):
+ """Raised when a serializer fails to serialize the given object."""
+
+
+class DeserializationError(Exception):
+ """Raised when a serializer fails to deserialize the given object."""
+
+
+class MaxIterationsReached(Exception):
+ """
+ Raised when a trigger has reached its maximum number of allowed computation iterations when
+ trying to calculate the next fire time.
+ """
diff --git a/apscheduler/jobstores/__init__.py b/apscheduler/serializers/__init__.py
index e69de29..e69de29 100644
--- a/apscheduler/jobstores/__init__.py
+++ b/apscheduler/serializers/__init__.py
diff --git a/apscheduler/serializers/cbor.py b/apscheduler/serializers/cbor.py
new file mode 100644
index 0000000..7e4b9bb
--- /dev/null
+++ b/apscheduler/serializers/cbor.py
@@ -0,0 +1,36 @@
+from dataclasses import dataclass, field
+from typing import Dict, Any
+
+from cbor2 import dumps, loads, CBOREncodeTypeError, CBORTag
+
+from ..abc import Serializer
+from ..util import marshal_object, unmarshal_object
+
+
+@dataclass
+class CBORSerializer(Serializer):
+ type_tag: int = 4664
+ dump_options: Dict[str, Any] = field(default_factory=dict)
+ load_options: Dict[str, Any] = field(default_factory=dict)
+
+ def __post_init__(self):
+ self.dump_options.setdefault('default', self._default_hook)
+ self.load_options.setdefault('tag_hook', self._tag_hook)
+
+ def _default_hook(self, encoder, value):
+ if hasattr(value, '__getstate__'):
+ marshalled = marshal_object(value)
+ encoder.encode(CBORTag(self.type_tag, marshalled))
+ else:
+ raise CBOREncodeTypeError(f'cannot serialize type {value.__class__.__name__}')
+
+ def _tag_hook(self, decoder, tag: CBORTag, shareable_index: int = None):
+ if tag.tag == self.type_tag:
+ cls_ref, state = tag.value
+ return unmarshal_object(cls_ref, state)
+
+ def serialize(self, obj) -> bytes:
+ return dumps(obj, **self.dump_options)
+
+ def deserialize(self, serialized: bytes):
+ return loads(serialized, **self.load_options)
diff --git a/apscheduler/serializers/json.py b/apscheduler/serializers/json.py
new file mode 100644
index 0000000..bdd7b94
--- /dev/null
+++ b/apscheduler/serializers/json.py
@@ -0,0 +1,45 @@
+from dataclasses import dataclass, field
+from json import dumps, loads
+from typing import Dict, Any
+
+from ..abc import Serializer
+from ..util import marshal_object, unmarshal_object
+
+
+@dataclass
+class JSONSerializer(Serializer):
+ magic_key: str = '_apscheduler_json'
+ dump_options: Dict[str, Any] = field(default_factory=dict)
+ load_options: Dict[str, Any] = field(default_factory=dict)
+
+ def __post_init__(self):
+ self.dump_options['default'] = self._default_hook
+ self.load_options['object_hook'] = self._object_hook
+
+ @classmethod
+ def _default_hook(cls, obj):
+ if hasattr(obj, '__getstate__'):
+ cls_ref, state = marshal_object(obj)
+ return {cls.magic_key: [cls_ref, state]}
+
+ raise TypeError(f'Object of type {obj.__class__.__name__!r} is not JSON serializable')
+
+ @classmethod
+ def _object_hook(cls, obj_state: Dict[str, Any]):
+ if cls.magic_key in obj_state:
+ ref, *rest = obj_state[cls.magic_key]
+ return unmarshal_object(ref, *rest)
+
+ return obj_state
+
+ def serialize(self, obj) -> bytes:
+ return dumps(obj, ensure_ascii=False, **self.dump_options).encode('utf-8')
+
+ def deserialize(self, serialized: bytes):
+ return loads(serialized, encoding='utf-8', **self.load_options)
+
+ def serialize_to_unicode(self, obj) -> str:
+ return dumps(obj, ensure_ascii=False, **self.dump_options)
+
+ def deserialize_from_unicode(self, serialized: str):
+ return loads(serialized, **self.load_options)
diff --git a/apscheduler/serializers/pickle.py b/apscheduler/serializers/pickle.py
new file mode 100644
index 0000000..c6ace86
--- /dev/null
+++ b/apscheduler/serializers/pickle.py
@@ -0,0 +1,16 @@
+from pickle import dumps, loads
+
+from dataclasses import dataclass
+
+from ..abc import Serializer
+
+
+@dataclass(frozen=True)
+class PickleSerializer(Serializer):
+ protocol: int = 4
+
+ def serialize(self, obj) -> bytes:
+ return dumps(obj, self.protocol)
+
+ def deserialize(self, serialized: bytes):
+ return loads(serialized)
diff --git a/apscheduler/triggers/base.py b/apscheduler/triggers/base.py
deleted file mode 100644
index bbfabbd..0000000
--- a/apscheduler/triggers/base.py
+++ /dev/null
@@ -1,46 +0,0 @@
-from abc import ABCMeta, abstractmethod
-from datetime import timedelta
-import random
-
-
-class BaseTrigger(metaclass=ABCMeta):
- """Abstract base class that defines the interface that every trigger must implement."""
-
- __slots__ = ()
-
- @abstractmethod
- def get_next_fire_time(self, previous_fire_time, now):
- """
- Returns the next datetime to fire on, If no such datetime can be calculated, returns
- ``None``.
-
- :param datetime.datetime previous_fire_time: the previous time the trigger was fired
- :param datetime.datetime now: current datetime
- """
-
- def _apply_jitter(self, next_fire_time, jitter, now):
- """
- Randomize ``next_fire_time`` by adding or subtracting a random value (the jitter). If the
- resulting datetime is in the past, returns the initial ``next_fire_time`` without jitter.
-
- ``next_fire_time - jitter <= result <= next_fire_time + jitter``
-
- :param datetime.datetime|None next_fire_time: next fire time without jitter applied. If
- ``None``, returns ``None``.
- :param int|None jitter: maximum number of seconds to add or subtract to
- ``next_fire_time``. If ``None`` or ``0``, returns ``next_fire_time``
- :param datetime.datetime now: current datetime
- :return datetime.datetime|None: next fire time with a jitter.
- """
- if next_fire_time is None or not jitter:
- return next_fire_time
-
- next_fire_time_with_jitter = next_fire_time + timedelta(
- seconds=random.uniform(-jitter, jitter))
-
- if next_fire_time_with_jitter < now:
- # Next fire time with jitter is in the past.
- # Ignore jitter to avoid false misfire.
- return next_fire_time
-
- return next_fire_time_with_jitter
diff --git a/apscheduler/triggers/calendarinterval.py b/apscheduler/triggers/calendarinterval.py
index 9609c99..0d08263 100644
--- a/apscheduler/triggers/calendarinterval.py
+++ b/apscheduler/triggers/calendarinterval.py
@@ -1,12 +1,15 @@
-from datetime import timedelta, datetime, date, time
+from datetime import timedelta, datetime, date, time, tzinfo
+from typing import Optional, Union
from pytz.exceptions import AmbiguousTimeError, NonExistentTimeError
-from apscheduler.triggers.base import BaseTrigger
-from apscheduler.util import astimezone
+from ..abc import Trigger
+from ..validators import (
+ require_state_version, as_timezone, as_date, as_aware_datetime,
+ as_timestamp, as_ordinal_date)
-class CalendarIntervalTrigger(BaseTrigger):
+class CalendarIntervalTrigger(Trigger):
"""
Runs the task on specified calendar-based intervals always at the same exact time of day.
@@ -45,44 +48,44 @@ class CalendarIntervalTrigger(BaseTrigger):
:param hour: hour to run the task at
:param minute: minute to run the task at
:param second: second to run the task at
- :param start_date: starting point for the interval calculation (defaults to current date if
- omitted)
+ :param start_date: first date to trigger on (defaults to current date if omitted)
:param end_date: latest possible date to trigger on
:param timezone: time zone to use for calculating the next fire time
- (defaults to the scheduling's default time zone)
"""
- __slots__ = 'years', 'months', 'weeks', 'days', 'time', 'start_date', 'end_date', 'timezone'
+ __slots__ = ('years', 'months', 'weeks', 'days', 'start_date', 'end_date', 'timezone', '_time',
+ '_last_fire_date')
def __init__(self, *, years: int = 0, months: int = 0, weeks: int = 0, days: int = 0,
- hour: int = 0, minute: int = 0, second: int = 0, start_date: date = None,
- end_date: date = None, timezone=None) -> None:
+ hour: int = 0, minute: int = 0, second: int = 0,
+ start_date: Union[date, str, None] = None,
+ end_date: Union[date, str, None] = None,
+ timezone: Union[str, tzinfo, None] = None):
self.years = years
self.months = months
self.weeks = weeks
self.days = days
- self.time = time(hour, minute, second)
- self.start_date = start_date
- self.end_date = end_date
- self.timezone = astimezone(timezone)
+ self.timezone = as_timezone(timezone)
+ self.start_date = as_date(start_date) or datetime.now(self.timezone).date()
+ self.end_date = as_date(end_date)
+ self._time = time(hour, minute, second)
+ self._last_fire_date: Optional[date] = None
if self.years == self.months == self.weeks == self.days == 0:
raise ValueError('interval must be at least 1 day long')
+
if self.start_date and self.end_date and self.start_date > self.end_date:
raise ValueError('end_date cannot be earlier than start_date')
- def get_next_fire_time(self, previous_fire_time, now):
- # Determine the starting point of the calculations
- timezone = self.timezone or now.tzinfo
- previous_date = previous_fire_time.date() if previous_fire_time else None
-
+ def next(self) -> Optional[datetime]:
+ previous_date: date = self._last_fire_date
while True:
if previous_date:
year, month = previous_date.year, previous_date.month
while True:
month += self.months
- year += self.years + month // 12
- month %= 12
+ year += self.years + (month - 1) // 12
+ month = (month - 1) % 12 + 1
try:
next_date = date(year, month, previous_date.day)
except ValueError:
@@ -91,18 +94,19 @@ class CalendarIntervalTrigger(BaseTrigger):
next_date += timedelta(self.days + self.weeks * 7)
break
else:
- next_date = self.start_date if self.start_date else now.date()
+ next_date = self.start_date
# Don't return any date past end_date
if self.end_date and next_date > self.end_date:
return None
- next_time = datetime.combine(next_date, self.time)
+ next_time = datetime.combine(next_date, self._time)
try:
- return timezone.localize(next_time, is_dst=None)
+ self._last_fire_date = next_date
+ return self.timezone.localize(next_time, is_dst=None)
except AmbiguousTimeError:
# Return the daylight savings occurrence of the datetime
- return timezone.localize(next_time, is_dst=True)
+ return self.timezone.localize(next_time, is_dst=True)
except NonExistentTimeError:
# This datetime does not exist (the DST shift jumps over it)
previous_date = next_date
@@ -111,35 +115,33 @@ class CalendarIntervalTrigger(BaseTrigger):
return {
'version': 1,
'interval': [self.years, self.months, self.weeks, self.days],
- 'time': self.time,
- 'start_date': self.start_date,
- 'end_date': self.end_date,
- 'timezone': str(self.timezone)
+ 'time': [self._time.hour, self._time.minute, self._time.second],
+ 'start_date': as_ordinal_date(self.start_date),
+ 'end_date': as_ordinal_date(self.end_date),
+ 'timezone': self.timezone.zone,
+ 'last_fire_date': as_timestamp(self._last_fire_date)
}
def __setstate__(self, state):
- if state.get('version', 1) > 1:
- raise ValueError(
- 'Got serialized data for version %s of %s, but only version 1 can be handled' %
- (state['version'], self.__class__.__name__))
-
+ require_state_version(self, state, 1)
self.years, self.months, self.weeks, self.days = state['interval']
- self.time = state['time']
- self.start_date = state['start_date']
- self.end_date = state['end_date']
- self.timezone = astimezone(state['timezone'])
-
- def __str__(self):
- options = []
- for field, suffix in [('years', 'y'), ('months', 'm'), ('weeks', 'w'), ('days', 'd')]:
+ self.start_date = as_date(state['start_date'])
+ self.end_date = as_date(state['end_date'])
+ self.timezone = as_timezone(state['timezone'])
+ self._time = time(*state['time'])
+ self._last_fire_date = as_aware_datetime(state['last_fire_date'], self.timezone)
+
+ def __repr__(self):
+ fields = []
+ for field in 'years', 'months', 'weeks', 'days':
value = getattr(self, field)
- if value:
- options.append('{}{}'.format(value, suffix))
+ if value > 0:
+ fields.append(f'{field}={value}')
- return 'calendarinterval[{} at {}]'.format(', '.join(options), self.time)
+ fields.append(f'time={self._time.isoformat()!r}')
+ fields.append(f'start_date={self.start_date.isoformat()!r}')
+ if self.end_date:
+ fields.append(f'end_date={self.end_date.isoformat()!r}')
- def __repr__(self):
- fields = 'years', 'months', 'weeks', 'days'
- interval_repr = ', '.join('%s=%d' % (attr, getattr(self, attr))
- for attr in fields if getattr(self, attr))
- return '{self.__class__.__name__}({}, time={self.time})'.format(interval_repr, self=self)
+ fields.append(f'timezone={self.timezone.tzname(None)!r}')
+ return f'CalendarIntervalTrigger({", ".join(fields)})'
diff --git a/apscheduler/triggers/combining.py b/apscheduler/triggers/combining.py
index 64f8301..967496d 100644
--- a/apscheduler/triggers/combining.py
+++ b/apscheduler/triggers/combining.py
@@ -1,95 +1,141 @@
-from apscheduler.triggers.base import BaseTrigger
-from apscheduler.util import obj_to_ref, ref_to_obj
+from abc import abstractmethod
+from datetime import datetime, timedelta
+from typing import Sequence, Optional, List, Union, Dict, Any
+from ..abc import Trigger
+from ..exceptions import MaxIterationsReached
+from ..util import marshal_object, unmarshal_object
+from ..validators import as_timedelta, as_positive_integer, as_list, require_state_version
-class BaseCombiningTrigger(BaseTrigger):
- __slots__ = ('triggers', 'jitter')
- def __init__(self, triggers, jitter=None):
- self.triggers = triggers
- self.jitter = jitter
+class BaseCombiningTrigger(Trigger):
+ __slots__ = 'triggers', '_next_fire_times'
- def __getstate__(self):
+ def __init__(self, triggers: Sequence[Trigger]):
+ self.triggers = as_list(triggers, Trigger, 'triggers')
+ self._next_fire_times: List[Optional[datetime]] = []
+
+ def __getstate__(self) -> Dict[str, Any]:
return {
'version': 1,
- 'triggers': [(obj_to_ref(trigger.__class__), trigger.__getstate__())
- for trigger in self.triggers],
- 'jitter': self.jitter
+ 'triggers': [marshal_object(trigger) for trigger in self.triggers],
+ 'next_fire_times': self._next_fire_times
}
- def __setstate__(self, state):
- if state.get('version', 1) > 1:
- raise ValueError(
- 'Got serialized data for version %s of %s, but only versions up to 1 can be '
- 'handled' % (state['version'], self.__class__.__name__))
-
- self.jitter = state['jitter']
- self.triggers = []
- for clsref, state in state['triggers']:
- cls = ref_to_obj(clsref)
- trigger = cls.__new__(cls)
- trigger.__setstate__(state)
- self.triggers.append(trigger)
-
- def __repr__(self):
- return '<{}({}{})>'.format(self.__class__.__name__, self.triggers,
- ', jitter={}'.format(self.jitter) if self.jitter else '')
+ @abstractmethod
+ def __setstate__(self, state: Dict[str, Any]) -> None:
+ self.triggers = [unmarshal_object(*trigger_state) for trigger_state in state['triggers']]
+ self._next_fire_times = state['next_fire_times']
class AndTrigger(BaseCombiningTrigger):
"""
- Always returns the earliest next fire time that all the given triggers can agree on.
- The trigger is considered to be finished when any of the given triggers has finished its
- schedule.
+ Fires on times produced by the enclosed triggers whenever the fire times are within the given
+ threshold.
- Trigger alias: ``and``
+ If the produced fire times are not within the given threshold of each other, the trigger(s)
+ that produced the earliest fire time will be asked for their next fire time and the iteration
+ is restarted. If instead all of the triggers agree on a fire time, all the triggers are asked
+ for their next fire times and the earliest of the previously produced fire times will be
+ returned.
- :param list triggers: triggers to combine
- :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
+ This trigger will be finished when any of the enclosed trigger has finished.
+
+ :param triggers: triggers to combine
+ :param threshold: maximum time difference between the next fire times of the triggers in order
+ for the earliest of them to be returned from :meth:`next` (in seconds, or as timedelta)
+ :param max_iterations: maximum number of iterations of fire time calculations before giving up
"""
- __slots__ = ()
+ __slots__ = 'threshold', 'max_iterations'
+
+ def __init__(self, triggers: Sequence[Trigger], *, threshold: Union[float, timedelta] = 1,
+ max_iterations: Optional[int] = 10000):
+ super().__init__(triggers)
+ self.threshold = as_timedelta(threshold, 'threshold')
+ self.max_iterations = as_positive_integer(max_iterations, 'max_iterations')
+
+ def next(self) -> Optional[datetime]:
+ if not self._next_fire_times:
+ # Fill out the fire times on the first run
+ self._next_fire_times = [t.next() for t in self.triggers]
+
+ for _ in range(self.max_iterations):
+ # Find the earliest and latest fire times
+ earliest_fire_time: Optional[datetime] = None
+ latest_fire_time: Optional[datetime] = None
+ for fire_time in self._next_fire_times:
+ # If any of the fire times is None, this trigger is finished
+ if fire_time is None:
+ return None
+
+ if earliest_fire_time is None or earliest_fire_time > fire_time:
+ earliest_fire_time = fire_time
+
+ if latest_fire_time is None or latest_fire_time < fire_time:
+ latest_fire_time = fire_time
+
+ # Replace all the fire times that were within the threshold
+ for i, trigger in enumerate(self.triggers):
+ if self._next_fire_times[i] - earliest_fire_time <= self.threshold:
+ self._next_fire_times[i] = self.triggers[i].next()
+
+ # If all the fire times were within the threshold, return the earliest one
+ if latest_fire_time - earliest_fire_time <= self.threshold:
+ self._next_fire_times = [t.next() for t in self.triggers]
+ return earliest_fire_time
+ else:
+ raise MaxIterationsReached
- def get_next_fire_time(self, previous_fire_time, now):
- while True:
- fire_times = [trigger.get_next_fire_time(previous_fire_time, now)
- for trigger in self.triggers]
- if None in fire_times:
- return None
- elif min(fire_times) == max(fire_times):
- return self._apply_jitter(fire_times[0], self.jitter, now)
- else:
- now = max(fire_times)
+ def __getstate__(self) -> Dict[str, Any]:
+ state = super().__getstate__()
+ state['threshold'] = self.threshold.total_seconds()
+ state['max_iterations'] = self.max_iterations
+ return state
- def __str__(self):
- return 'and[{}]'.format(', '.join(str(trigger) for trigger in self.triggers))
+ def __setstate__(self, state: Dict[str, Any]) -> None:
+ require_state_version(self, state, 1)
+ super().__setstate__(state)
+ self.threshold = timedelta(seconds=state['threshold'])
+ self.max_iterations = state['max_iterations']
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({self.triggers}, ' \
+ f'threshold={self.threshold.total_seconds()}, max_iterations={self.max_iterations})'
class OrTrigger(BaseCombiningTrigger):
"""
- Always returns the earliest next fire time produced by any of the given triggers.
- The trigger is considered finished when all the given triggers have finished their schedules.
-
- Trigger alias: ``or``
+ Fires on every fire time of every trigger in chronological order.
+ If two or more triggers produce the same fire time, it will only be used once.
- :param list triggers: triggers to combine
- :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
+ This trigger will be finished when none of the enclosed triggers can produce any new fire
+ times.
- .. note:: Triggers that depends on the previous fire time, such as the interval trigger, may
- seem to behave strangely since they are always passed the previous fire time produced by
- any of the given triggers.
+ :param triggers: triggers to combine
"""
__slots__ = ()
- def get_next_fire_time(self, previous_fire_time, now):
- fire_times = [trigger.get_next_fire_time(previous_fire_time, now)
- for trigger in self.triggers]
- fire_times = [fire_time for fire_time in fire_times if fire_time is not None]
- if fire_times:
- return self._apply_jitter(min(fire_times), self.jitter, now)
- else:
- return None
+ def next(self) -> Optional[datetime]:
+ # Fill out the fire times on the first run
+ if not self._next_fire_times:
+ self._next_fire_times = [t.next() for t in self.triggers]
+
+ # Find out the earliest of the fire times
+ earliest_time: Optional[datetime] = min([fire_time for fire_time in self._next_fire_times
+ if fire_time is not None], default=None)
+ if earliest_time is not None:
+ # Generate new fire times for the trigger(s) that generated the earliest fire time
+ for i, fire_time in enumerate(self._next_fire_times):
+ if fire_time == earliest_time:
+ self._next_fire_times[i] = self.triggers[i].next()
- def __str__(self):
- return 'or[{}]'.format(', '.join(str(trigger) for trigger in self.triggers))
+ return earliest_time
+
+ def __setstate__(self, state: Dict[str, Any]) -> None:
+ require_state_version(self, state, 1)
+ super().__setstate__(state)
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({self.triggers})'
diff --git a/apscheduler/triggers/cron/__init__.py b/apscheduler/triggers/cron/__init__.py
index b12edbd..3f33a70 100644
--- a/apscheduler/triggers/cron/__init__.py
+++ b/apscheduler/triggers/cron/__init__.py
@@ -1,123 +1,107 @@
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, tzinfo
+from typing import Optional, Union, Tuple, Sequence, ClassVar, List
-from tzlocal import get_localzone
-
-from apscheduler.triggers.base import BaseTrigger
-from apscheduler.triggers.cron.fields import (
+from .fields import (
BaseField, MonthField, WeekField, DayOfMonthField, DayOfWeekField, DEFAULT_VALUES)
-from apscheduler.util import datetime_ceil, convert_to_datetime, datetime_repr, astimezone
+from ...abc import Trigger
+from ...util import datetime_ceil
+from ...validators import require_state_version, as_timezone, as_aware_datetime, as_timestamp
-class CronTrigger(BaseTrigger):
+class CronTrigger(Trigger):
"""
- Triggers when current time matches all specified time constraints,
- similarly to how the UNIX cron scheduler works.
-
- :param int|str year: 4-digit year
- :param int|str month: month (1-12)
- :param int|str day: day of the (1-31)
- :param int|str week: ISO week (1-53)
- :param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
- :param int|str hour: hour (0-23)
- :param int|str minute: minute (0-59)
- :param int|str second: second (0-59)
- :param datetime|str start_date: earliest possible date/time to trigger on (inclusive)
- :param datetime|str end_date: latest possible date/time to trigger on (inclusive)
- :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (defaults
- to scheduler timezone)
- :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
+ Triggers when current time matches all specified time constraints, similarly to how the UNIX
+ cron scheduler works.
+
+ :param year: 4-digit year
+ :param month: month (1-12)
+ :param day: day of the (1-31)
+ :param week: ISO week (1-53)
+ :param day_of_week: number or name of weekday (0-7 or sun,mon,tue,wed,thu,fri,sat,sun)
+ :param hour: hour (0-23)
+ :param minute: minute (0-59)
+ :param second: second (0-59)
+ :param start_time: earliest possible date/time to trigger on (defaults to current time)
+ :param end_time: latest possible date/time to trigger on
+ :param timezone: time zone to use for the date/time calculations
+ (defaults to the local timezone)
.. note:: The first weekday is always **monday**.
"""
- FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour', 'minute', 'second')
- FIELDS_MAP = {
- 'year': BaseField,
- 'month': MonthField,
- 'week': WeekField,
- 'day': DayOfMonthField,
- 'day_of_week': DayOfWeekField,
- 'hour': BaseField,
- 'minute': BaseField,
- 'second': BaseField
- }
-
- __slots__ = 'timezone', 'start_date', 'end_date', 'fields', 'jitter'
-
- def __init__(self, year=None, month=None, day=None, week=None, day_of_week=None, hour=None,
- minute=None, second=None, start_date=None, end_date=None, timezone=None,
- jitter=None):
- if timezone:
- self.timezone = astimezone(timezone)
- elif isinstance(start_date, datetime) and start_date.tzinfo:
- self.timezone = start_date.tzinfo
- elif isinstance(end_date, datetime) and end_date.tzinfo:
- self.timezone = end_date.tzinfo
- else:
- self.timezone = get_localzone()
-
- self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
- self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
-
- self.jitter = jitter
-
- values = dict((key, value) for (key, value) in locals().items()
- if key in self.FIELD_NAMES and value is not None)
- self.fields = []
- assign_defaults = False
- for field_name in self.FIELD_NAMES:
- if field_name in values:
- exprs = values.pop(field_name)
- is_default = False
- assign_defaults = not values
- elif assign_defaults:
- exprs = DEFAULT_VALUES[field_name]
- is_default = True
- else:
- exprs = '*'
- is_default = True
-
- field_class = self.FIELDS_MAP[field_name]
- field = field_class(field_name, exprs, is_default)
- self.fields.append(field)
+ __slots__ = 'timezone', 'start_time', 'end_time', '_fields', '_last_fire_time'
+
+ FIELDS_MAP: ClassVar[List] = [
+ ('year', BaseField),
+ ('month', MonthField),
+ ('day', DayOfMonthField),
+ ('week', WeekField),
+ ('day_of_week', DayOfWeekField),
+ ('hour', BaseField),
+ ('minute', BaseField),
+ ('second', BaseField)
+ ]
+
+ def __init__(self, *, year: Union[int, str, None] = None, month: Union[int, str, None] = None,
+ day: Union[int, str, None] = None, week: Union[int, str, None] = None,
+ day_of_week: Union[int, str, None] = None, hour: Union[int, str, None] = None,
+ minute: Union[int, str, None] = None, second: Union[int, str, None] = None,
+ start_time: Union[datetime, str, None] = None,
+ end_time: Union[datetime, str, None] = None,
+ timezone: Union[str, tzinfo, None] = None):
+ self.timezone = as_timezone(timezone)
+ self.start_time = (as_aware_datetime(start_time, self.timezone)
+ or datetime.now(self.timezone))
+ self.end_time = as_aware_datetime(end_time, self.timezone)
+ self._set_fields([year, month, day, week, day_of_week, hour, minute, second])
+ self._last_fire_time: Optional[datetime] = None
+
+ def _set_fields(self, values: Sequence[Union[int, str, None]]) -> None:
+ self._fields = []
+ assigned_values = {field_name: value
+ for (field_name, _), value in zip(self.FIELDS_MAP, values)
+ if value is not None}
+ for field_name, field_class in self.FIELDS_MAP:
+ exprs = assigned_values.pop(field_name, None)
+ if exprs is None:
+ exprs = '*' if assigned_values else DEFAULT_VALUES[field_name]
+
+ field = field_class(field_name, exprs)
+ self._fields.append(field)
@classmethod
- def from_crontab(cls, expr, timezone=None):
+ def from_crontab(cls, expr: str, timezone: Union[str, tzinfo, None] = None) -> 'CronTrigger':
"""
Create a :class:`~CronTrigger` from a standard crontab expression.
See https://en.wikipedia.org/wiki/Cron for more information on the format accepted here.
:param expr: minute, hour, day of month, month, day of week
- :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (
- defaults to scheduler timezone)
- :return: a :class:`~CronTrigger` instance
+ :param timezone: time zone to use for the date/time calculations
+ (defaults to local timezone if omitted)
"""
values = expr.split()
if len(values) != 5:
- raise ValueError('Wrong number of fields; got {}, expected 5'.format(len(values)))
+ raise ValueError(f'Wrong number of fields; got {len(values)}, expected 5')
return cls(minute=values[0], hour=values[1], day=values[2], month=values[3],
day_of_week=values[4], timezone=timezone)
- def _increment_field_value(self, dateval, fieldnum):
+ def _increment_field_value(self, dateval: datetime, fieldnum: int) -> Tuple[datetime, int]:
"""
Increments the designated field and resets all less significant fields to their minimum
values.
- :type dateval: datetime
- :type fieldnum: int
:return: a tuple containing the new date, and the number of the field that was actually
incremented
- :rtype: tuple
"""
values = {}
i = 0
- while i < len(self.fields):
- field = self.fields[i]
- if not field.REAL:
+ while i < len(self._fields):
+ field = self._fields[i]
+ if not field.real:
if i == fieldnum:
fieldnum -= 1
i -= 1
@@ -146,8 +130,8 @@ class CronTrigger(BaseTrigger):
def _set_field_value(self, dateval, fieldnum, new_value):
values = {}
- for i, field in enumerate(self.fields):
- if field.REAL:
+ for i, field in enumerate(self._fields):
+ if field.real:
if i < fieldnum:
values[field.name] = field.get_value(dateval)
elif i > fieldnum:
@@ -157,81 +141,64 @@ class CronTrigger(BaseTrigger):
return self.timezone.localize(datetime(**values))
- def get_next_fire_time(self, previous_fire_time, now):
- if previous_fire_time:
- start_date = min(now, previous_fire_time + timedelta(microseconds=1))
- if start_date == previous_fire_time:
- start_date += timedelta(microseconds=1)
+ def next(self) -> Optional[datetime]:
+ if self._last_fire_time:
+ start_time = self._last_fire_time + timedelta(microseconds=1)
else:
- start_date = max(now, self.start_date) if self.start_date else now
+ start_time = self.start_time
fieldnum = 0
- next_date = datetime_ceil(start_date).astimezone(self.timezone)
- while 0 <= fieldnum < len(self.fields):
- field = self.fields[fieldnum]
- curr_value = field.get_value(next_date)
- next_value = field.get_next_value(next_date)
+ next_time = datetime_ceil(start_time).astimezone(self.timezone)
+ while 0 <= fieldnum < len(self._fields):
+ field = self._fields[fieldnum]
+ curr_value = field.get_value(next_time)
+ next_value = field.get_next_value(next_time)
if next_value is None:
# No valid value was found
- next_date, fieldnum = self._increment_field_value(next_date, fieldnum - 1)
+ next_time, fieldnum = self._increment_field_value(next_time, fieldnum - 1)
elif next_value > curr_value:
# A valid, but higher than the starting value, was found
- if field.REAL:
- next_date = self._set_field_value(next_date, fieldnum, next_value)
+ if field.real:
+ next_time = self._set_field_value(next_time, fieldnum, next_value)
fieldnum += 1
else:
- next_date, fieldnum = self._increment_field_value(next_date, fieldnum)
+ next_time, fieldnum = self._increment_field_value(next_time, fieldnum)
else:
# A valid value was found, no changes necessary
fieldnum += 1
# Return if the date has rolled past the end date
- if self.end_date and next_date > self.end_date:
+ if self.end_time and next_time > self.end_time:
return None
if fieldnum >= 0:
- next_date = self._apply_jitter(next_date, self.jitter, now)
- return min(next_date, self.end_date) if self.end_date else next_date
+ self._last_fire_time = next_time
+ return next_time
def __getstate__(self):
return {
- 'version': 2,
- 'timezone': self.timezone,
- 'start_date': self.start_date,
- 'end_date': self.end_date,
- 'fields': self.fields,
- 'jitter': self.jitter,
+ 'version': 1,
+ 'timezone': self.timezone.zone,
+ 'fields': [str(f) for f in self._fields],
+ 'start_time': as_timestamp(self.start_time),
+ 'end_time': as_timestamp(self.end_time),
+ 'last_fire_time': as_timestamp(self._last_fire_time)
}
def __setstate__(self, state):
- # This is for compatibility with APScheduler 3.0.x
- if isinstance(state, tuple):
- state = state[1]
-
- if state.get('version', 1) > 2:
- raise ValueError(
- 'Got serialized data for version %s of %s, but only versions up to 2 can be '
- 'handled' % (state['version'], self.__class__.__name__))
-
- self.timezone = state['timezone']
- self.start_date = state['start_date']
- self.end_date = state['end_date']
- self.fields = state['fields']
- self.jitter = state.get('jitter')
-
- def __str__(self):
- options = ["%s='%s'" % (f.name, f) for f in self.fields if not f.is_default]
- return 'cron[%s]' % (', '.join(options))
+ require_state_version(self, state, 1)
+ self.timezone = as_timezone(state['timezone'])
+ self.start_time = as_aware_datetime(state['start_time'], self.timezone)
+ self.end_time = as_aware_datetime(state['end_time'], self.timezone)
+ self._last_fire_time = as_aware_datetime(state['last_fire_time'], self.timezone)
+ self._set_fields(state['fields'])
def __repr__(self):
- options = ["%s='%s'" % (f.name, f) for f in self.fields if not f.is_default]
- if self.start_date:
- options.append("start_date=%r" % datetime_repr(self.start_date))
- if self.end_date:
- options.append("end_date=%r" % datetime_repr(self.end_date))
- if self.jitter:
- options.append('jitter=%s' % self.jitter)
-
- return "<%s (%s, timezone='%s')>" % (
- self.__class__.__name__, ', '.join(options), self.timezone)
+ fields = [f'{field.name}={str(field)!r}' for field in self._fields]
+ fields.append(f'start_time={self.start_time.isoformat()!r}')
+ if self.end_time:
+ fields.append(f'end_time={self.end_time.isoformat()!r}')
+
+ fields.append(f'timezone={self.timezone.zone!r}')
+ return f'CronTrigger({", ".join(fields)})'
diff --git a/apscheduler/triggers/cron/expressions.py b/apscheduler/triggers/cron/expressions.py
index ebd565a..3ad9366 100644
--- a/apscheduler/triggers/cron/expressions.py
+++ b/apscheduler/triggers/cron/expressions.py
@@ -1,90 +1,78 @@
"""This module contains the expressions applicable for CronTrigger's fields."""
-from calendar import monthrange
import re
+from calendar import monthrange
+from datetime import datetime
+from typing import Optional, Union
-from apscheduler.util import asint
-
-__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression',
- 'WeekdayPositionExpression', 'LastDayOfMonthExpression')
-
+from ...validators import as_int
WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
MONTHS = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
class AllExpression:
+ __slots__ = 'step'
+
value_re = re.compile(r'\*(?:/(?P<step>\d+))?$')
- def __init__(self, step=None):
- self.step = asint(step)
+ def __init__(self, step: Union[str, int, None] = None):
+ self.step = as_int(step)
if self.step == 0:
- raise ValueError('Increment must be higher than 0')
+ raise ValueError('Step must be higher than 0')
- def validate_range(self, field_name):
- from apscheduler.triggers.cron.fields import MIN_VALUES, MAX_VALUES
-
- value_range = MAX_VALUES[field_name] - MIN_VALUES[field_name]
+ def validate_range(self, field_name: str, min_value: int, max_value: int) -> None:
+ value_range = max_value - min_value
if self.step and self.step > value_range:
- raise ValueError('the step value ({}) is higher than the total range of the '
- 'expression ({})'.format(self.step, value_range))
+ raise ValueError(f'the step value ({self.step}) is higher than the total range of the '
+ f'expression ({value_range})')
- def get_next_value(self, date, field):
- start = field.get_value(date)
- minval = field.get_min(date)
- maxval = field.get_max(date)
+ def get_next_value(self, dateval: datetime, field) -> Optional[int]:
+ start = field.get_value(dateval)
+ minval = field.get_min(dateval)
+ maxval = field.get_max(dateval)
start = max(start, minval)
if not self.step:
- next = start
+ nextval = start
else:
distance_to_next = (self.step - (start - minval)) % self.step
- next = start + distance_to_next
-
- if next <= maxval:
- return next
+ nextval = start + distance_to_next
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.step == other.step
+ return nextval if nextval <= maxval else None
def __str__(self):
- if self.step:
- return '*/%d' % self.step
- return '*'
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, self.step)
+ return f'*/{self.step}' if self.step else '*'
class RangeExpression(AllExpression):
- value_re = re.compile(
- r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$')
+ __slots__ = 'first', 'last'
- def __init__(self, first, last=None, step=None):
+ value_re = re.compile(r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$')
+
+ def __init__(self, first: Union[str, int], last: Union[str, int, None] = None,
+ step: Union[str, int, None] = None):
super().__init__(step)
- first = asint(first)
- last = asint(last)
- if last is None and step is None:
- last = first
- if last is not None and first > last:
+ self.first = as_int(first)
+ self.last = as_int(last)
+
+ if self.last is None and self.step is None:
+ self.last = self.first
+ if self.last is not None and self.first > self.last:
raise ValueError('The minimum value in a range must not be higher than the maximum')
- self.first = first
- self.last = last
-
- def validate_range(self, field_name):
- from apscheduler.triggers.cron.fields import MIN_VALUES, MAX_VALUES
-
- super().validate_range(field_name)
- if self.first < MIN_VALUES[field_name]:
- raise ValueError('the first value ({}) is lower than the minimum value ({})'
- .format(self.first, MIN_VALUES[field_name]))
- if self.last is not None and self.last > MAX_VALUES[field_name]:
- raise ValueError('the last value ({}) is higher than the maximum value ({})'
- .format(self.last, MAX_VALUES[field_name]))
- value_range = (self.last or MAX_VALUES[field_name]) - self.first
+
+ def validate_range(self, field_name: str, min_value: int, max_value: int) -> None:
+ super().validate_range(field_name, min_value, max_value)
+ if self.first < min_value:
+ raise ValueError(f'the first value ({self.first}) is lower than the minimum value '
+ f'({min_value})')
+ if self.last is not None and self.last > max_value:
+ raise ValueError(f'the last value ({self.last}) is higher than the maximum value '
+ f'({max_value})')
+ value_range = (self.last or max_value) - self.first
if self.step and self.step > value_range:
- raise ValueError('the step value ({}) is higher than the total range of the '
- 'expression ({})'.format(self.step, value_range))
+ raise ValueError(f'the step value ({self.step}) is higher than the total range of the '
+ f'expression ({value_range})')
def get_next_value(self, date, field):
startval = field.get_value(date)
@@ -103,43 +91,34 @@ class RangeExpression(AllExpression):
return nextval if nextval <= maxval else None
- def __eq__(self, other):
- return (isinstance(other, self.__class__) and self.first == other.first and
- self.last == other.last)
-
def __str__(self):
if self.last != self.first and self.last is not None:
- range = '%d-%d' % (self.first, self.last)
+ rangeval = f'{self.first}-{self.last}'
else:
- range = str(self.first)
+ rangeval = str(self.first)
if self.step:
- return '%s/%d' % (range, self.step)
- return range
+ return f'{rangeval}/{self.step}'
- def __repr__(self):
- args = [str(self.first)]
- if self.last != self.first and self.last is not None or self.step:
- args.append(str(self.last))
- if self.step:
- args.append(str(self.step))
- return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
+ return rangeval
class MonthRangeExpression(RangeExpression):
+ __slots__ = ()
+
value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?', re.IGNORECASE)
def __init__(self, first, last=None):
try:
first_num = MONTHS.index(first.lower()) + 1
except ValueError:
- raise ValueError('Invalid month name "%s"' % first)
+ raise ValueError(f'Invalid month name {first!r}') from None
if last:
try:
last_num = MONTHS.index(last.lower()) + 1
except ValueError:
- raise ValueError('Invalid month name "%s"' % last)
+ raise ValueError(f'Invalid month name {last!r}') from None
else:
last_num = None
@@ -147,30 +126,27 @@ class MonthRangeExpression(RangeExpression):
def __str__(self):
if self.last != self.first and self.last is not None:
- return '%s-%s' % (MONTHS[self.first - 1], MONTHS[self.last - 1])
- return MONTHS[self.first - 1]
+ return f'{MONTHS[self.first - 1]}-{MONTHS[self.last - 1]}'
- def __repr__(self):
- args = ["'%s'" % MONTHS[self.first]]
- if self.last != self.first and self.last is not None:
- args.append("'%s'" % MONTHS[self.last - 1])
- return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
+ return MONTHS[self.first - 1]
class WeekdayRangeExpression(RangeExpression):
+ __slots__ = ()
+
value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?', re.IGNORECASE)
- def __init__(self, first, last=None):
+ def __init__(self, first: str, last: Optional[str] = None):
try:
first_num = WEEKDAYS.index(first.lower())
except ValueError:
- raise ValueError('Invalid weekday name "%s"' % first)
+ raise ValueError(f'Invalid weekday name {first!r}') from None
if last:
try:
last_num = WEEKDAYS.index(last.lower())
except ValueError:
- raise ValueError('Invalid weekday name "%s"' % last)
+ raise ValueError(f'Invalid weekday name {last!r}') from None
else:
last_num = None
@@ -178,36 +154,29 @@ class WeekdayRangeExpression(RangeExpression):
def __str__(self):
if self.last != self.first and self.last is not None:
- return '%s-%s' % (WEEKDAYS[self.first], WEEKDAYS[self.last])
- return WEEKDAYS[self.first]
+ return f'{WEEKDAYS[self.first]}-{WEEKDAYS[self.last]}'
- def __repr__(self):
- args = ["'%s'" % WEEKDAYS[self.first]]
- if self.last != self.first and self.last is not None:
- args.append("'%s'" % WEEKDAYS[self.last])
- return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
+ return WEEKDAYS[self.first]
class WeekdayPositionExpression(AllExpression):
+ __slots__ = 'option_num', 'weekday'
+
options = ['1st', '2nd', '3rd', '4th', '5th', 'last']
value_re = re.compile(r'(?P<option_name>%s) +(?P<weekday_name>(?:\d+|\w+))' %
'|'.join(options), re.IGNORECASE)
- def __init__(self, option_name, weekday_name):
+ def __init__(self, option_name: str, weekday_name: str):
super().__init__(None)
- try:
- self.option_num = self.options.index(option_name.lower())
- except ValueError:
- raise ValueError('Invalid weekday position "%s"' % option_name)
-
+ self.option_num = self.options.index(option_name.lower())
try:
self.weekday = WEEKDAYS.index(weekday_name.lower())
except ValueError:
- raise ValueError('Invalid weekday name "%s"' % weekday_name)
+ raise ValueError(f'Invalid weekday name {weekday_name!r}') from None
- def get_next_value(self, date, field):
+ def get_next_value(self, dateval: datetime, field) -> Optional[int]:
# Figure out the weekday of the month's first day and the number of days in that month
- first_day_wday, last_day = monthrange(date.year, date.month)
+ first_day_wday, last_day = monthrange(dateval.year, dateval.month)
# Calculate which day of the month is the first of the target weekdays
first_hit_day = self.weekday - first_day_wday + 1
@@ -220,32 +189,25 @@ class WeekdayPositionExpression(AllExpression):
else:
target_day = first_hit_day + ((last_day - first_hit_day) // 7) * 7
- if target_day <= last_day and target_day >= date.day:
+ if last_day >= target_day >= dateval.day:
return target_day
-
- def __eq__(self, other):
- return (super().__eq__(other) and
- self.option_num == other.option_num and self.weekday == other.weekday)
+ else:
+ return None
def __str__(self):
- return '%s %s' % (self.options[self.option_num], WEEKDAYS[self.weekday])
-
- def __repr__(self):
- return "%s('%s', '%s')" % (self.__class__.__name__, self.options[self.option_num],
- WEEKDAYS[self.weekday])
+ return f'{self.options[self.option_num]} {WEEKDAYS[self.weekday]}'
class LastDayOfMonthExpression(AllExpression):
+ __slots__ = ()
+
value_re = re.compile(r'last', re.IGNORECASE)
def __init__(self):
super().__init__(None)
- def get_next_value(self, date, field):
- return monthrange(date.year, date.month)[1]
+ def get_next_value(self, dateval: datetime, field):
+ return monthrange(dateval.year, dateval.month)[1]
def __str__(self):
return 'last'
-
- def __repr__(self):
- return "%s()" % self.__class__.__name__
diff --git a/apscheduler/triggers/cron/fields.py b/apscheduler/triggers/cron/fields.py
index 78f8bac..338df9c 100644
--- a/apscheduler/triggers/cron/fields.py
+++ b/apscheduler/triggers/cron/fields.py
@@ -1,17 +1,14 @@
"""Fields represent CronTrigger options which map to :class:`~datetime.datetime` fields."""
-from calendar import monthrange
import re
+from calendar import monthrange
+from datetime import datetime
+from typing import Sequence, ClassVar, Any, Union, Optional
-from apscheduler.triggers.cron.expressions import (
+from .expressions import (
AllExpression, RangeExpression, WeekdayPositionExpression, LastDayOfMonthExpression,
WeekdayRangeExpression, MonthRangeExpression)
-
-__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField', 'WeekField',
- 'DayOfMonthField', 'DayOfWeekField')
-
-
MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1, 'day_of_week': 0, 'hour': 0,
'minute': 0, 'second': 0}
MAX_VALUES = {'year': 9999, 'month': 12, 'day': 31, 'week': 53, 'day_of_week': 6, 'hour': 23,
@@ -22,24 +19,31 @@ SEPARATOR = re.compile(' *, *')
class BaseField:
- REAL = True
- COMPILERS = [AllExpression, RangeExpression]
+ __slots__ = 'name', 'expressions'
+
+ real: ClassVar[bool] = True
+ compilers: ClassVar[Any] = (AllExpression, RangeExpression)
- def __init__(self, name, exprs, is_default=False):
+ def __init_subclass__(cls, real: bool = True, extra_compilers: Sequence = ()):
+ cls.real = real
+ if extra_compilers:
+ cls.compilers += extra_compilers
+
+ def __init__(self, name: str, exprs: Union[int, str]):
self.name = name
- self.is_default = is_default
- self.compile_expressions(exprs)
+ self.expressions = [self.compile_expression(expr)
+ for expr in SEPARATOR.split(str(exprs).strip())]
- def get_min(self, dateval):
+ def get_min(self, dateval: datetime) -> int:
return MIN_VALUES[self.name]
- def get_max(self, dateval):
+ def get_max(self, dateval: datetime) -> int:
return MAX_VALUES[self.name]
- def get_value(self, dateval):
+ def get_value(self, dateval: datetime) -> int:
return getattr(dateval, self.name)
- def get_next_value(self, dateval):
+ def get_next_value(self, dateval: datetime) -> Optional[int]:
smallest = None
for expr in self.expressions:
value = expr.get_next_value(dateval, self)
@@ -48,62 +52,48 @@ class BaseField:
return smallest
- def compile_expressions(self, exprs):
- self.expressions = []
-
- # Split a comma-separated expression list, if any
- for expr in SEPARATOR.split(str(exprs).strip()):
- self.compile_expression(expr)
-
- def compile_expression(self, expr):
- for compiler in self.COMPILERS:
+ def compile_expression(self, expr: str):
+ for compiler in self.compilers:
match = compiler.value_re.match(expr)
if match:
compiled_expr = compiler(**match.groupdict())
try:
- compiled_expr.validate_range(self.name)
- except ValueError as e:
- exc = ValueError('Error validating expression {!r}: {}'.format(expr, e))
- raise exc from None
+ compiled_expr.validate_range(self.name, MIN_VALUES[self.name],
+ MAX_VALUES[self.name])
+ except ValueError as exc:
+ raise ValueError(f'Error validating expression {expr!r}: {exc}') from exc
- self.expressions.append(compiled_expr)
- return
+ return compiled_expr
- raise ValueError('Unrecognized expression "%s" for field "%s"' % (expr, self.name))
-
- def __eq__(self, other):
- return isinstance(self, self.__class__) and self.expressions == other.expressions
+ raise ValueError(f'Unrecognized expression {expr!r} for field {self.name!r}')
def __str__(self):
expr_strings = (str(e) for e in self.expressions)
return ','.join(expr_strings)
- def __repr__(self):
- return "%s('%s', '%s')" % (self.__class__.__name__, self.name, self)
-
-class WeekField(BaseField):
- REAL = False
+class WeekField(BaseField, real=False):
+ __slots__ = ()
- def get_value(self, dateval):
+ def get_value(self, dateval: datetime) -> int:
return dateval.isocalendar()[1]
-class DayOfMonthField(BaseField):
- COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression, LastDayOfMonthExpression]
+class DayOfMonthField(BaseField,
+ extra_compilers=(WeekdayPositionExpression, LastDayOfMonthExpression)):
+ __slots__ = ()
- def get_max(self, dateval):
+ def get_max(self, dateval: datetime) -> int:
return monthrange(dateval.year, dateval.month)[1]
-class DayOfWeekField(BaseField):
- REAL = False
- COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression]
+class DayOfWeekField(BaseField, real=False, extra_compilers=(WeekdayRangeExpression,)):
+ __slots__ = ()
- def get_value(self, dateval):
+ def get_value(self, dateval: datetime) -> int:
return dateval.weekday()
-class MonthField(BaseField):
- COMPILERS = BaseField.COMPILERS + [MonthRangeExpression]
+class MonthField(BaseField, extra_compilers=(MonthRangeExpression,)):
+ __slots__ = ()
diff --git a/apscheduler/triggers/date.py b/apscheduler/triggers/date.py
index 0768100..2e3b052 100644
--- a/apscheduler/triggers/date.py
+++ b/apscheduler/triggers/date.py
@@ -1,51 +1,46 @@
-from datetime import datetime
+from datetime import datetime, tzinfo
+from typing import Optional, Union
-from tzlocal import get_localzone
+from dateutil.parser import parse
-from apscheduler.triggers.base import BaseTrigger
-from apscheduler.util import convert_to_datetime, datetime_repr, astimezone
+from ..abc import Trigger
+from ..validators import require_state_version, as_aware_datetime, as_timezone
-class DateTrigger(BaseTrigger):
+class DateTrigger(Trigger):
"""
- Triggers once on the given datetime. If ``run_date`` is left empty, current time is used.
+ Triggers once on the given date/time.
- :param datetime|str run_date: the date/time to run the job at
- :param datetime.tzinfo|str timezone: time zone for ``run_date`` if it doesn't have one already
+ :param run_time: the date/time to run the job at
+ :param timezone: time zone to use to convert ``run_time`` into a timezone aware datetime, if it
+ isn't already (defaults to the local time zone)
"""
- __slots__ = 'run_date'
+ __slots__ = 'run_time', '_completed'
- def __init__(self, run_date=None, timezone=None):
- timezone = astimezone(timezone) or get_localzone()
- if run_date is not None:
- self.run_date = convert_to_datetime(run_date, timezone, 'run_date')
- else:
- self.run_date = datetime.now(timezone)
+ def __init__(self, run_time: datetime, timezone: Union[str, tzinfo, None] = None):
+ timezone = as_timezone(timezone or run_time.tzinfo)
+ self.run_time = as_aware_datetime(run_time, timezone)
+ self._completed = False
- def get_next_fire_time(self, previous_fire_time, now):
- return self.run_date if previous_fire_time is None else None
+ def next(self) -> Optional[datetime]:
+ if not self._completed:
+ self._completed = True
+ return self.run_time
+ else:
+ return None
def __getstate__(self):
return {
'version': 1,
- 'run_date': self.run_date
+ 'run_time': self.run_time.isoformat(),
+ 'completed': self._completed
}
def __setstate__(self, state):
- # This is for compatibility with APScheduler 3.0.x
- if isinstance(state, tuple):
- state = state[1]
-
- if state.get('version', 1) > 1:
- raise ValueError(
- 'Got serialized data for version %s of %s, but only version 1 can be handled' %
- (state['version'], self.__class__.__name__))
-
- self.run_date = state['run_date']
-
- def __str__(self):
- return 'date[%s]' % datetime_repr(self.run_date)
+ require_state_version(self, state, 1)
+ self.run_time = parse(state['run_time'])
+ self._completed = state['completed']
def __repr__(self):
- return "<%s (run_date='%s')>" % (self.__class__.__name__, datetime_repr(self.run_date))
+ return f'DateTrigger({self.run_time.isoformat()!r})'
diff --git a/apscheduler/triggers/interval.py b/apscheduler/triggers/interval.py
index 831ba38..bdb180b 100644
--- a/apscheduler/triggers/interval.py
+++ b/apscheduler/triggers/interval.py
@@ -1,106 +1,100 @@
-from datetime import timedelta, datetime
-from math import ceil
+from datetime import timedelta, datetime, tzinfo
+from typing import Optional, Union
-from tzlocal import get_localzone
+from ..abc import Trigger
+from ..validators import require_state_version, as_timezone, as_aware_datetime, as_timestamp
-from apscheduler.triggers.base import BaseTrigger
-from apscheduler.util import convert_to_datetime, timedelta_seconds, datetime_repr, astimezone
-
-class IntervalTrigger(BaseTrigger):
+class IntervalTrigger(Trigger):
"""
- Triggers on specified intervals, starting on ``start_date`` if specified, ``datetime.now()`` +
- interval otherwise.
-
- :param int weeks: number of weeks to wait
- :param int days: number of days to wait
- :param int hours: number of hours to wait
- :param int minutes: number of minutes to wait
- :param int seconds: number of seconds to wait
- :param datetime|str start_date: starting point for the interval calculation
- :param datetime|str end_date: latest possible date/time to trigger on
- :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations
- :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
+ Triggers on specified intervals.
+
+ The first trigger time is on ``start_time`` which is the moment the trigger was created unless
+ specifically overridden. If ``end_time`` is specified, the last trigger time will be at or
+ before that time. If no ``end_time`` has been given, the trigger will produce new trigger times
+ as long as the resulting datetimes are valid datetimes in Python.
+
+ :param weeks: number of weeks to wait
+ :param days: number of days to wait
+ :param hours: number of hours to wait
+ :param minutes: number of minutes to wait
+ :param seconds: number of seconds to wait
+ :param microseconds: number of microseconds to wait
+ :param start_time: first trigger date/time
+ :param end_time: latest possible date/time to trigger on
+ :param timezone: time zone to use for normalizing calculated datetimes (defaults to the local
+ timezone)
"""
- __slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter'
-
- def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
- end_date=None, timezone=None, jitter=None):
- self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
- seconds=seconds)
- self.interval_length = timedelta_seconds(self.interval)
- if self.interval_length == 0:
- self.interval = timedelta(seconds=1)
- self.interval_length = 1
-
- if timezone:
- self.timezone = astimezone(timezone)
- elif isinstance(start_date, datetime) and start_date.tzinfo:
- self.timezone = start_date.tzinfo
- elif isinstance(end_date, datetime) and end_date.tzinfo:
- self.timezone = end_date.tzinfo
+ __slots__ = ('weeks', 'days', 'hours', 'minutes', 'seconds', 'microseconds', 'start_time',
+ 'end_time', 'timezone', '_interval', '_last_fire_time')
+
+ def __init__(self, *, weeks: int = 0, days: int = 0, hours: int = 0, minutes: int = 0,
+ seconds: int = 0, microseconds: int = 0, start_time: Optional[datetime] = None,
+ end_time: Optional[datetime] = None, timezone: Union[str, tzinfo, None] = None):
+ self.weeks = weeks
+ self.days = days
+ self.hours = hours
+ self.minutes = minutes
+ self.seconds = seconds
+ self.microseconds = microseconds
+ self.timezone = as_timezone(timezone)
+ self.start_time = as_aware_datetime(start_time or datetime.now(), self.timezone)
+ self.end_time = as_aware_datetime(end_time, self.timezone)
+ self._interval = timedelta(weeks=self.weeks, days=self.days, hours=self.hours,
+ minutes=self.minutes, seconds=self.seconds,
+ microseconds=self.microseconds)
+ self._last_fire_time = None
+
+ if self._interval.total_seconds() <= 0:
+ raise ValueError('The time interval must be positive')
+
+ if self.end_time and self.end_time < self.start_time:
+ raise ValueError('end_time cannot be earlier than start_time')
+
+ def next(self) -> Optional[datetime]:
+ if self._last_fire_time is None:
+ self._last_fire_time = self.start_time
else:
- self.timezone = get_localzone()
-
- start_date = start_date or (datetime.now(self.timezone) + self.interval)
- self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
- self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
+ self._last_fire_time = self.timezone.normalize(self._last_fire_time + self._interval)
- self.jitter = jitter
-
- def get_next_fire_time(self, previous_fire_time, now):
- if previous_fire_time:
- next_fire_time = previous_fire_time + self.interval
- elif self.start_date > now:
- next_fire_time = self.start_date
+ if self.end_time is None or self._last_fire_time <= self.end_time:
+ return self._last_fire_time
else:
- timediff_seconds = timedelta_seconds(now - self.start_date)
- next_interval_num = int(ceil(timediff_seconds / self.interval_length))
- next_fire_time = self.start_date + self.interval * next_interval_num
-
- if self.jitter is not None:
- next_fire_time = self._apply_jitter(next_fire_time, self.jitter, now)
-
- if not self.end_date or next_fire_time <= self.end_date:
- return self.timezone.normalize(next_fire_time)
+ return None
def __getstate__(self):
return {
- 'version': 2,
- 'timezone': self.timezone,
- 'start_date': self.start_date,
- 'end_date': self.end_date,
- 'interval': self.interval,
- 'jitter': self.jitter,
+ 'version': 1,
+ 'interval': [self.weeks, self.days, self.hours, self.minutes, self.seconds,
+ self.microseconds],
+ 'timezone': self.timezone.zone,
+ 'start_time': as_timestamp(self.start_time),
+ 'end_time': as_timestamp(self.end_time),
+ 'last_fire_time': as_timestamp(self._last_fire_time)
}
def __setstate__(self, state):
- # This is for compatibility with APScheduler 3.0.x
- if isinstance(state, tuple):
- state = state[1]
-
- if state.get('version', 1) > 2:
- raise ValueError(
- 'Got serialized data for version %s of %s, but only versions up to 2 can be '
- 'handled' % (state['version'], self.__class__.__name__))
+ require_state_version(self, state, 1)
+ self.weeks, self.days, self.hours, self.minutes, self.seconds, self.microseconds = \
+ state['interval']
+ self.timezone = as_timezone(state['timezone'])
+ self.start_time = as_aware_datetime(state['start_time'], self.timezone)
+ self.end_time = as_aware_datetime(state['end_time'], self.timezone)
+ self._interval = timedelta(weeks=self.weeks, days=self.days, hours=self.hours,
+ minutes=self.minutes, seconds=self.seconds,
+ microseconds=self.microseconds)
+ self._last_fire_time = as_aware_datetime(state['last_fire_time'], self.timezone)
- self.timezone = state['timezone']
- self.start_date = state['start_date']
- self.end_date = state['end_date']
- self.interval = state['interval']
- self.interval_length = timedelta_seconds(self.interval)
- self.jitter = state.get('jitter')
+ def __repr__(self):
+ fields = []
+ for field in 'weeks', 'days', 'hours', 'minutes', 'seconds', 'microseconds':
+ value = getattr(self, field)
+ if value > 0:
+ fields.append(f'{field}={value}')
- def __str__(self):
- return 'interval[%s]' % str(self.interval)
+ fields.append(f'start_time={self.start_time.isoformat()!r}')
+ if self.end_time:
+ fields.append(f'end_time={self.end_time.isoformat()!r}')
- def __repr__(self):
- options = ['interval=%r' % self.interval, 'start_date=%r' % datetime_repr(self.start_date)]
- if self.end_date:
- options.append("end_date=%r" % datetime_repr(self.end_date))
- if self.jitter:
- options.append('jitter=%s' % self.jitter)
-
- return "<%s (%s, timezone='%s')>" % (
- self.__class__.__name__, ', '.join(options), self.timezone)
+ return f'IntervalTrigger({", ".join(fields)})'
diff --git a/apscheduler/util.py b/apscheduler/util.py
index 753c011..9bc3348 100644
--- a/apscheduler/util.py
+++ b/apscheduler/util.py
@@ -1,18 +1,10 @@
"""This module contains several handy functions primarily meant for internal use."""
-from datetime import date, datetime, time, timedelta, tzinfo
-from calendar import timegm
-from inspect import signature
-from functools import partial
-from inspect import isclass, ismethod
import re
-
-from pytz import timezone, utc, FixedOffset
-
-__all__ = ('undefined', 'asint', 'asbool', 'astimezone', 'convert_to_datetime',
- 'datetime_to_utc_timestamp', 'utc_timestamp_to_datetime', 'timedelta_seconds',
- 'datetime_ceil', 'datetime_repr', 'get_callable_name', 'obj_to_ref', 'ref_to_obj',
- 'maybe_ref', 'check_callable_args')
+from datetime import datetime, timedelta
+from functools import partial
+from inspect import signature, isclass, ismethod
+from typing import Tuple, Any
class _Undefined:
@@ -26,58 +18,6 @@ class _Undefined:
undefined = _Undefined() #: a unique object that only signifies that no value is defined
-def asint(text):
- """
- Safely converts a string to an integer, returning ``None`` if the string is ``None``.
-
- :type text: str
- :rtype: int
-
- """
- if text is not None:
- return int(text)
-
-
-def asbool(obj):
- """
- Interprets an object as a boolean value.
-
- :rtype: bool
-
- """
- if isinstance(obj, str):
- obj = obj.strip().lower()
- if obj in ('true', 'yes', 'on', 'y', 't', '1'):
- return True
- if obj in ('false', 'no', 'off', 'n', 'f', '0'):
- return False
- raise ValueError('Unable to interpret value "%s" as boolean' % obj)
- return bool(obj)
-
-
-def astimezone(obj):
- """
- Interprets an object as a timezone.
-
- :rtype: tzinfo
-
- """
- if isinstance(obj, str):
- return timezone(obj)
- if isinstance(obj, tzinfo):
- if not hasattr(obj, 'localize') or not hasattr(obj, 'normalize'):
- raise TypeError('Only timezones from the pytz library are supported')
- if obj.zone == 'local':
- raise ValueError(
- 'Unable to determine the name of the local timezone -- you must explicitly '
- 'specify the name of the local timezone. Please refrain from using timezones like '
- 'EST to prevent problems with daylight saving time. Instead, use a locale based '
- 'timezone name (such as Europe/Helsinki).')
- return obj
- if obj is not None:
- raise TypeError('Expected tzinfo, got %s instead' % obj.__class__.__name__)
-
-
_DATE_REGEX = re.compile(
r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
r'(?:[ T](?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})'
@@ -85,117 +25,13 @@ _DATE_REGEX = re.compile(
r'(?P<timezone>Z|[+-]\d\d:\d\d)?)?$')
-def convert_to_datetime(input, tz, arg_name):
- """
- Converts the given object to a timezone aware datetime object.
-
- If a timezone aware datetime object is passed, it is returned unmodified.
- If a native datetime object is passed, it is given the specified timezone.
- If the input is a string, it is parsed as a datetime with the given timezone.
-
- Date strings are accepted in three different forms: date only (Y-m-d), date with time
- (Y-m-d H:M:S) or with date+time with microseconds (Y-m-d H:M:S.micro). Additionally you can
- override the time zone by giving a specific offset in the format specified by ISO 8601:
- Z (UTC), +HH:MM or -HH:MM.
-
- :param str|datetime input: the datetime or string to convert to a timezone aware datetime
- :param datetime.tzinfo tz: timezone to interpret ``input`` in
- :param str arg_name: the name of the argument (used in an error message)
- :rtype: datetime
-
- """
- if input is None:
- return
- elif isinstance(input, datetime):
- datetime_ = input
- elif isinstance(input, date):
- datetime_ = datetime.combine(input, time())
- elif isinstance(input, str):
- m = _DATE_REGEX.match(input)
- if not m:
- raise ValueError('Invalid date string')
-
- values = m.groupdict()
- tzname = values.pop('timezone')
- if tzname == 'Z':
- tz = utc
- elif tzname:
- hours, minutes = (int(x) for x in tzname[1:].split(':'))
- sign = 1 if tzname[0] == '+' else -1
- tz = FixedOffset(sign * (hours * 60 + minutes))
-
- values = {k: int(v or 0) for k, v in values.items()}
- datetime_ = datetime(**values)
- else:
- raise TypeError('Unsupported type for %s: %s' % (arg_name, input.__class__.__name__))
-
- if datetime_.tzinfo is not None:
- return datetime_
- if tz is None:
- raise ValueError(
- 'The "tz" argument must be specified if %s has no timezone information' % arg_name)
- if isinstance(tz, str):
- tz = timezone(tz)
-
- try:
- return tz.localize(datetime_, is_dst=None)
- except AttributeError:
- raise TypeError(
- 'Only pytz timezones are supported (need the localize() and normalize() methods)')
-
-
-def datetime_to_utc_timestamp(timeval):
- """
- Converts a datetime instance to a timestamp.
-
- :type timeval: datetime
- :rtype: float
-
- """
- if timeval is not None:
- return timegm(timeval.utctimetuple()) + timeval.microsecond / 1000000
-
-
-def utc_timestamp_to_datetime(timestamp):
- """
- Converts the given timestamp to a datetime instance.
-
- :type timestamp: float
- :rtype: datetime
-
- """
- if timestamp is not None:
- return datetime.fromtimestamp(timestamp, utc)
-
-
-def timedelta_seconds(delta):
- """
- Converts the given timedelta to seconds.
-
- :type delta: timedelta
- :rtype: float
-
- """
- return delta.days * 24 * 60 * 60 + delta.seconds + \
- delta.microseconds / 1000000.0
-
-
-def datetime_ceil(dateval):
- """
- Rounds the given datetime object upwards.
-
- :type dateval: datetime
-
- """
+def datetime_ceil(dateval: datetime):
+ """Round the given datetime object upwards."""
if dateval.microsecond > 0:
return dateval + timedelta(seconds=1, microseconds=-dateval.microsecond)
return dateval
-def datetime_repr(dateval):
- return dateval.strftime('%Y-%m-%d %H:%M:%S %Z') if dateval else 'None'
-
-
def get_callable_name(func):
"""
Returns the best available display name for the given function/callable.
@@ -384,3 +220,14 @@ def check_callable_args(func, args, kwargs):
raise ValueError(
'The target callable does not accept the following keyword arguments: %s' %
', '.join(unmatched_kwargs))
+
+
+def marshal_object(obj) -> Tuple[str, Any]:
+ return f'{obj.__class__.__module__}:{obj.__class__.__qualname__}', obj.__getstate__()
+
+
+def unmarshal_object(ref: str, state):
+ cls = ref_to_obj(ref)
+ instance = cls.__new__(cls)
+ instance.__setstate__(state)
+ return instance
diff --git a/apscheduler/validators.py b/apscheduler/validators.py
new file mode 100644
index 0000000..6fe2d41
--- /dev/null
+++ b/apscheduler/validators.py
@@ -0,0 +1,166 @@
+from datetime import datetime, timedelta, tzinfo, date, timezone
+from typing import Dict, Any, Optional, Union
+
+import pytz
+from dateutil.parser import parse
+from tzlocal import get_localzone
+
+from apscheduler.abc import Trigger
+from apscheduler.exceptions import DeserializationError
+
+
+def as_int(value) -> Optional[int]:
+ """Convert the value into an integer."""
+ if value is None:
+ return None
+
+ return int(value)
+
+
+def as_timezone(value: Union[str, tzinfo, None]) -> tzinfo:
+ """
+ Convert the value into a pytz timezone.
+
+ :param value: the value to be converted
+ :return: a timezone object, or if ``None`` was given, the local timezone
+
+ """
+ if value is None or value == 'local':
+ return get_localzone()
+ elif isinstance(value, str):
+ return pytz.timezone(value)
+ elif isinstance(value, tzinfo):
+ if value is timezone.utc:
+ return pytz.utc
+ elif not getattr(value, 'zone', None):
+ raise TypeError('Only named pytz timezones are supported')
+ else:
+ return value
+
+ raise TypeError(f'Expected pytz timezone or timezone.utc, got {value.__class__.__qualname__}'
+ f'instead')
+
+
+def as_date(value: Union[date, str, None]) -> Optional[date]:
+ """
+ Convert the value to a date.
+
+ :param value: the value to convert to a date
+ :return: a date object, or ``None`` if ``None`` was given
+
+ """
+ if value is None:
+ return None
+ elif isinstance(value, int):
+ return date.fromordinal(value)
+ elif isinstance(value, str):
+ return parse(value).date()
+ elif isinstance(value, datetime):
+ return value.date()
+ elif isinstance(value, date):
+ return value
+
+ raise TypeError(f'Expected string or date, got {value.__class__.__qualname__} instead')
+
+
+def as_timestamp(value: Optional[datetime]) -> Optional[float]:
+ if value is None:
+ return None
+
+ return value.timestamp()
+
+
+def as_ordinal_date(value: Optional[date]) -> Optional[int]:
+ if value is None:
+ return None
+
+ return value.toordinal()
+
+
+def as_aware_datetime(value: Union[datetime, str, float, None], tz: tzinfo) -> Optional[datetime]:
+ """
+ Convert the value to a timezone aware datetime.
+
+ :param value: a datetime, an ISO 8601 representation of a datetime, or ``None``
+ :param tz: timezone to use for making the datetime timezone aware
+ :return: a timezone aware datetime, or ``None`` if ``None`` was given
+
+ """
+ if value is None:
+ return None
+
+ if isinstance(value, float):
+ return datetime.fromtimestamp(value, tz)
+
+ if isinstance(value, str):
+ value = parse(value)
+
+ if isinstance(value, datetime):
+ if value.tzinfo:
+ return value.astimezone(tz)
+
+ try:
+ # Works with pytz timezones
+ return tz.localize(value)
+ except AttributeError:
+ # Not a pytz timezone
+ return value.astimezone(tz)
+
+ raise TypeError(f'Expected string or datetime, got {value.__class__.__qualname__} instead')
+
+
+def positive_number(instance, attribute, value) -> None:
+ if value <= 0:
+ raise ValueError(f'Expected positive number, got {value} instead')
+
+
+def non_negative_number(instance, attribute, value) -> None:
+ if value < 0:
+ raise ValueError(f'Expected non-negative number, got {value} instead')
+
+
+def as_positive_integer(value, name: str) -> int:
+ if isinstance(value, int):
+ if value > 0:
+ return value
+ else:
+ raise ValueError(f'{name} must be positive')
+
+ raise TypeError(f'{name} must be an integer, got {value.__class__.__name__} instead')
+
+
+def as_timedelta(value, name: str) -> timedelta:
+ if isinstance(value, (int, float)):
+ value = timedelta(seconds=value)
+
+ if isinstance(value, timedelta):
+ if value.total_seconds() < 0:
+ raise ValueError(f'{name} cannot be negative')
+ else:
+ return value
+
+ raise TypeError(f'{name} must be a timedelta or number of seconds, got '
+ f'{value.__class__.__name__} instead')
+
+
+def as_list(value, element_type: type, name: str) -> list:
+ value = list(value)
+ for i, element in enumerate(value):
+ if not isinstance(element, element_type):
+ raise TypeError(f'Element at index {i} of {name} is not of the expected type '
+ f'({element_type.__name__}')
+
+ return value
+
+
+def require_state_version(trigger: Trigger, state: Dict[str, Any], max_version: int) -> None:
+ try:
+ if state['version'] > max_version:
+ raise DeserializationError(
+ f'{trigger.__class__.__name__} received a serialized state with version '
+ f'{state["version"]}, but it only supports up to version {max_version}. '
+ f'This can happen when an older version of APScheduler is being used with a data '
+ f'store that was previously used with a newer APScheduler version.'
+ )
+ except KeyError as exc:
+ raise DeserializationError('Missing "version" key in the serialized state') from exc
diff --git a/docs/modules/triggers/cron.rst b/docs/modules/triggers/cron.rst
index a410bc2..4758321 100644
--- a/docs/modules/triggers/cron.rst
+++ b/docs/modules/triggers/cron.rst
@@ -19,8 +19,8 @@ This is the most powerful of the built-in triggers in APScheduler. You can speci
on each field, and when determining the next execution time, it finds the earliest possible time that satisfies the
conditions in every field. This behavior resembles the "Cron" utility found in most UNIX-like operating systems.
-You can also specify the starting date and ending dates for the cron-style schedule through the ``start_date`` and
-``end_date`` parameters, respectively. They can be given as a date/datetime object or text (in the
+You can also specify the starting date and ending dates for the cron-style schedule through the ``start_time`` and
+``end_time`` parameters, respectively. They can be given as a date/datetime object or text (in the
`ISO 8601 <https://en.wikipedia.org/wiki/ISO_8601>`_ format).
Unlike with crontab expressions, you can omit fields that you don't need. Fields greater than the least significant
@@ -96,10 +96,10 @@ Examples
sched.start()
-You can use ``start_date`` and ``end_date`` to limit the total time in which the schedule runs::
+You can use ``start_time`` and ``end_time`` to limit the total time in which the schedule runs::
# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
- sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
+ sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_time='2014-05-30')
The :meth:`~apscheduler.schedulers.base.BaseScheduler.scheduled_job` decorator works nicely too::
@@ -111,10 +111,3 @@ The :meth:`~apscheduler.schedulers.base.BaseScheduler.scheduled_job` decorator w
To schedule a job using a standard crontab expression::
sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
-
-The ``jitter`` option enables you to add a random component to the execution time. This might be useful if you have
-multiple servers and don't want them to run a job at the exact same moment or if you want to prevent jobs from running
-at sharp hours::
-
- # Run the `job_function` every sharp hour with an extra-delay picked randomly in a [-120,+120] seconds window.
- sched.add_job(job_function, 'cron', hour='*', jitter=120)
diff --git a/docs/modules/triggers/interval.rst b/docs/modules/triggers/interval.rst
index f7b8ae4..df7a593 100644
--- a/docs/modules/triggers/interval.rst
+++ b/docs/modules/triggers/interval.rst
@@ -17,11 +17,11 @@ Introduction
This method schedules jobs to be run periodically, on selected intervals.
-You can also specify the starting date and ending dates for the schedule through the ``start_date`` and ``end_date``
+You can also specify the starting date and ending dates for the schedule through the ``start_time`` and ``end_time``
parameters, respectively. They can be given as a date/datetime object or text (in the
`ISO 8601 <https://en.wikipedia.org/wiki/ISO_8601>`_ format).
-If the start date is in the past, the trigger will not fire many times retroactively but instead calculates the next
+If the start time is in the past, the trigger will not fire many times retroactively but instead calculates the next
run time from the current time, based on the past start time.
@@ -46,10 +46,10 @@ Examples
sched.start()
-You can use ``start_date`` and ``end_date`` to limit the total time in which the schedule runs::
+You can use ``start_time`` and ``end_time`` to limit the total time in which the schedule runs::
# The same as before, but starts on 2010-10-10 at 9:30 and stops on 2014-06-15 at 11:00
-   sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
+   sched.add_job(job_function, 'interval', hours=2, start_time='2010-10-10 09:30:00', end_time='2014-06-15 11:00:00')
The :meth:`~apscheduler.schedulers.base.BaseScheduler.scheduled_job` decorator works nicely too::
@@ -59,11 +59,3 @@ The :meth:`~apscheduler.schedulers.base.BaseScheduler.scheduled_job` decorator w
@sched.scheduled_job('interval', id='my_job_id', hours=2)
def job_function():
print("Hello World")
-
-
-The ``jitter`` option enables you to add a random component to the execution time. This might be useful if you have
-multiple servers and don't want them to run a job at the exact same moment or if you want to prevent multiple jobs
-with similar options from always running concurrently::
-
- # Run the `job_function` every hour with an extra-delay picked randomly in a [-120,+120] seconds window.
- sched.add_job(job_function, 'interval', hours=1, jitter=120)
diff --git a/setup.cfg b/setup.cfg
index 6293832..19ffc2e 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -11,7 +11,6 @@ classifiers =
License :: OSI Approved :: MIT License
Programming Language :: Python
Programming Language :: Python :: 3 :: Only
- Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
@@ -20,53 +19,29 @@ license = MIT
[options]
packages = find:
-python_requires = >= 3.5
+python_requires = >= 3.6
install_requires =
- setuptools >= 0.7
+ dataclasses >= 0.7; python_version < '3.7'
+ dateutil >= 2.8
pytz
tzlocal >= 1.2
[options.extras_require]
-gevent = gevent
-mongodb = pymongo >= 2.8
+mongodb = motor ~= 2.1
+postgresql = asyncpg >= 0.20
redis = redis
-rethinkdb = rethinkdb >= 2.4.0
-sqlalchemy = sqlalchemy >= 0.8
-tornado = tornado >= 4.3
+sqlalchemy = sqlalchemy >= 1.3
twisted = twisted
zookeeper = kazoo
test =
pytest >= 5.0
pytest-cov
+ pytest-mock
pytest-tornado5
doc =
sphinx
sphinx-rtd-theme
-[options.entry_points]
-apscheduler.triggers =
- date = apscheduler.triggers.date:DateTrigger
- interval = apscheduler.triggers.interval:IntervalTrigger
- cron = apscheduler.triggers.cron:CronTrigger
- and = apscheduler.triggers.combining:AndTrigger
- or = apscheduler.triggers.combining:OrTrigger
- calendarinterval = apscheduler.triggers.calendarinterval:CalendarIntervalTrigger
-apscheduler.executors =
- debug = apscheduler.executors.debug:DebugExecutor
- threadpool = apscheduler.executors.pool:ThreadPoolExecutor
- processpool = apscheduler.executors.pool:ProcessPoolExecutor
- asyncio = apscheduler.executors.asyncio:AsyncIOExecutor
- gevent = apscheduler.executors.gevent:GeventExecutor [gevent]
- tornado = apscheduler.executors.tornado:TornadoExecutor [tornado]
- twisted = apscheduler.executors.twisted:TwistedExecutor [twisted]
-apscheduler.jobstores =
- memory = apscheduler.jobstores.memory:MemoryJobStore
- sqlalchemy = apscheduler.jobstores.sqlalchemy:SQLAlchemyJobStore [sqlalchemy]
- mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore [mongodb]
- rethinkdb = apscheduler.jobstores.rethinkdb:RethinkDBJobStore [rethinkdb]
- redis = apscheduler.jobstores.redis:RedisJobStore [redis]
- zookeeper = apscheduler.jobstores.zookeeper:ZooKeeperJobStore [zookeeper]
-
[tool:pytest]
addopts = -rsx --cov --tb=short
testpaths = tests
diff --git a/tests/conftest.py b/tests/conftest.py
index d2a0ea3..bd37e44 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -4,17 +4,14 @@ from unittest.mock import Mock
import pytest
import pytz
-from apscheduler.job import Job
-from apscheduler.schedulers.base import BaseScheduler
-from apscheduler.schedulers.blocking import BlockingScheduler
+from apscheduler.serializers.cbor import CBORSerializer
+from apscheduler.serializers.json import JSONSerializer
+from apscheduler.serializers.pickle import PickleSerializer
-@pytest.fixture
-def timezone(monkeypatch):
- tz = pytz.timezone('Europe/Berlin')
- monkeypatch.setattr('apscheduler.schedulers.base.get_localzone',
- Mock(return_value=tz))
- return tz
+@pytest.fixture(scope='session')
+def timezone():
+ return pytz.timezone('Europe/Berlin')
@pytest.fixture
@@ -41,30 +38,12 @@ def freeze_time(monkeypatch, timezone):
freezer = TimeFreezer(timezone.localize(datetime(2011, 4, 3, 18, 40)))
fake_datetime = Mock(datetime, now=freezer.get)
- monkeypatch.setattr('apscheduler.schedulers.base.datetime', fake_datetime)
- monkeypatch.setattr('apscheduler.executors.base.datetime', fake_datetime)
monkeypatch.setattr('apscheduler.triggers.interval.datetime', fake_datetime)
monkeypatch.setattr('apscheduler.triggers.date.datetime', fake_datetime)
return freezer
-@pytest.fixture
-def job_defaults(timezone):
- run_date = timezone.localize(datetime(2011, 4, 3, 18, 40))
- return {'trigger': 'date', 'trigger_args': {'run_date': run_date, 'timezone': timezone},
- 'executor': 'default', 'args': (), 'kwargs': {},
- 'id': b't\xc3\xa9st\xc3\xafd'.decode('utf-8'), 'misfire_grace_time': 1,
- 'coalesce': False, 'name': b'n\xc3\xa4m\xc3\xa9'.decode('utf-8'), 'max_instances': 1}
-
-
-@pytest.fixture
-def create_job(job_defaults, timezone):
- def create(**kwargs):
- kwargs.setdefault('scheduler', Mock(BaseScheduler, timezone=timezone))
- job_kwargs = job_defaults.copy()
- job_kwargs.update(kwargs)
- job_kwargs['trigger'] = BlockingScheduler()._create_trigger(job_kwargs.pop('trigger'),
- job_kwargs.pop('trigger_args'))
- job_kwargs.setdefault('next_run_time', None)
- return Job(**job_kwargs)
- return create
+@pytest.fixture(params=[None, PickleSerializer, CBORSerializer, JSONSerializer],
+ ids=['none', 'pickle', 'cbor', 'json'])
+def serializer(request):
+ return request.param() if request.param else None
diff --git a/tests/test_expressions.py b/tests/test_expressions.py
deleted file mode 100644
index 1f460f4..0000000
--- a/tests/test_expressions.py
+++ /dev/null
@@ -1,173 +0,0 @@
-from datetime import datetime
-
-import pytest
-
-from apscheduler.triggers.cron.fields import DayOfMonthField, BaseField, DayOfWeekField
-from apscheduler.triggers.cron.expressions import (
- AllExpression, RangeExpression, WeekdayPositionExpression, WeekdayRangeExpression,
- LastDayOfMonthExpression)
-
-
-def test_all_expression():
- field = DayOfMonthField('day', '*')
- assert repr(field) == "DayOfMonthField('day', '*')"
- date = datetime(2009, 7, 1)
- assert field.get_next_value(date) == 1
- date = datetime(2009, 7, 10)
- assert field.get_next_value(date) == 10
- date = datetime(2009, 7, 30)
- assert field.get_next_value(date) == 30
-
-
-def test_all_expression_step():
- field = BaseField('hour', '*/3')
- assert repr(field) == "BaseField('hour', '*/3')"
- date = datetime(2009, 7, 1, 0)
- assert field.get_next_value(date) == 0
- date = datetime(2009, 7, 1, 2)
- assert field.get_next_value(date) == 3
- date = datetime(2009, 7, 1, 7)
- assert field.get_next_value(date) == 9
-
-
-def test_all_expression_invalid():
- pytest.raises(ValueError, BaseField, 'hour', '*/0')
-
-
-def test_all_expression_repr():
- expr = AllExpression()
- assert repr(expr) == 'AllExpression(None)'
-
-
-def test_all_expression_step_repr():
- expr = AllExpression(2)
- assert repr(expr) == "AllExpression(2)"
-
-
-def test_range_expression():
- field = DayOfMonthField('day', '2-9')
- assert repr(field) == "DayOfMonthField('day', '2-9')"
- date = datetime(2009, 7, 1)
- assert field.get_next_value(date) == 2
- date = datetime(2009, 7, 10)
- assert field.get_next_value(date) is None
- date = datetime(2009, 7, 5)
- assert field.get_next_value(date) == 5
-
-
-def test_range_expression_step():
- field = DayOfMonthField('day', '2-9/3')
- assert repr(field) == "DayOfMonthField('day', '2-9/3')"
- date = datetime(2009, 7, 1)
- assert field.get_next_value(date) == 2
- date = datetime(2009, 7, 3)
- assert field.get_next_value(date) == 5
- date = datetime(2009, 7, 9)
- assert field.get_next_value(date) is None
-
-
-def test_range_expression_single():
- field = DayOfMonthField('day', 9)
- assert repr(field) == "DayOfMonthField('day', '9')"
- date = datetime(2009, 7, 1)
- assert field.get_next_value(date) == 9
- date = datetime(2009, 7, 9)
- assert field.get_next_value(date) == 9
- date = datetime(2009, 7, 10)
- assert field.get_next_value(date) is None
-
-
-def test_range_expression_invalid():
- pytest.raises(ValueError, DayOfMonthField, 'day', '5-3')
-
-
-def test_range_expression_repr():
- expr = RangeExpression(3, 7)
- assert repr(expr) == 'RangeExpression(3, 7)'
-
-
-def test_range_expression_single_repr():
- expr = RangeExpression(4)
- assert repr(expr) == 'RangeExpression(4)'
-
-
-def test_range_expression_step_repr():
- expr = RangeExpression(3, 7, 2)
- assert repr(expr) == 'RangeExpression(3, 7, 2)'
-
-
-def test_weekday_single():
- field = DayOfWeekField('day_of_week', 'WED')
- assert repr(field) == "DayOfWeekField('day_of_week', 'wed')"
- date = datetime(2008, 2, 4)
- assert field.get_next_value(date) == 2
-
-
-def test_weekday_range():
- field = DayOfWeekField('day_of_week', 'TUE-SAT')
- assert repr(field) == "DayOfWeekField('day_of_week', 'tue-sat')"
- date = datetime(2008, 2, 7)
- assert field.get_next_value(date) == 3
-
-
-def test_weekday_pos_1():
- expr = WeekdayPositionExpression('1st', 'Fri')
- assert str(expr) == '1st fri'
- date = datetime(2008, 2, 1)
- assert expr.get_next_value(date, 'day') == 1
-
-
-def test_weekday_pos_2():
- expr = WeekdayPositionExpression('2nd', 'wed')
- assert str(expr) == '2nd wed'
- date = datetime(2008, 2, 1)
- assert expr.get_next_value(date, 'day') == 13
-
-
-def test_weekday_pos_3():
- expr = WeekdayPositionExpression('last', 'fri')
- assert str(expr) == 'last fri'
- date = datetime(2008, 2, 1)
- assert expr.get_next_value(date, 'day') == 29
-
-
-def test_day_of_week_invalid_pos():
- pytest.raises(ValueError, WeekdayPositionExpression, '6th', 'fri')
-
-
-def test_day_of_week_invalid_name():
- pytest.raises(ValueError, WeekdayPositionExpression, '1st', 'moh')
-
-
-def test_weekday_position_expression_repr():
- expr = WeekdayPositionExpression('2nd', 'FRI')
- assert repr(expr) == "WeekdayPositionExpression('2nd', 'fri')"
-
-
-def test_day_of_week_invalid_first():
- pytest.raises(ValueError, WeekdayRangeExpression, 'moh', 'fri')
-
-
-def test_day_of_week_invalid_last():
- pytest.raises(ValueError, WeekdayRangeExpression, 'mon', 'fre')
-
-
-def test_weekday_range_expression_repr():
- expr = WeekdayRangeExpression('tue', 'SUN')
- assert repr(expr) == "WeekdayRangeExpression('tue', 'sun')"
-
-
-def test_weekday_range_expression_single_repr():
- expr = WeekdayRangeExpression('thu')
- assert repr(expr) == "WeekdayRangeExpression('thu')"
-
-
-def test_last_day_of_month_expression():
- expr = LastDayOfMonthExpression()
- date = datetime(2012, 2, 1)
- assert expr.get_next_value(date, 'day') == 29
-
-
-def test_last_day_of_month_expression_invalid():
- expr = LastDayOfMonthExpression()
- assert repr(expr) == "LastDayOfMonthExpression()"
diff --git a/tests/test_triggers.py b/tests/test_triggers.py
deleted file mode 100644
index 9375df7..0000000
--- a/tests/test_triggers.py
+++ /dev/null
@@ -1,693 +0,0 @@
-import pickle
-import random
-import sys
-from datetime import datetime, timedelta, date
-from unittest.mock import Mock
-
-import pytest
-import pytz
-
-from apscheduler.triggers.base import BaseTrigger
-from apscheduler.triggers.combining import AndTrigger, OrTrigger, BaseCombiningTrigger
-from apscheduler.triggers.cron import CronTrigger
-from apscheduler.triggers.date import DateTrigger
-from apscheduler.triggers.interval import IntervalTrigger
-
-
-class _DummyTriggerWithJitter(BaseTrigger):
- def __init__(self, dt, jitter):
- self.dt = dt
- self.jitter = jitter
-
- def get_next_fire_time(self, previous_fire_time, now):
- return self._apply_jitter(self.dt, self.jitter, now)
-
-
-class TestJitter(object):
- def test_jitter_disabled(self):
- dt = datetime(2017, 5, 25, 14, 49, 50)
- trigger = _DummyTriggerWithJitter(dt, None)
-
- now = datetime(2017, 5, 25, 13, 40, 44)
- assert trigger.get_next_fire_time(None, now) == dt
-
- def test_jitter_with_none_next_fire_time(self):
- trigger = _DummyTriggerWithJitter(None, 5)
- now = datetime(2017, 5, 25, 13, 40, 44)
- assert trigger.get_next_fire_time(None, now) is None
-
- def test_jitter_positive(self, monkeypatch):
- monkeypatch.setattr(random, 'uniform', lambda a, b: 30.)
-
- now = datetime(2017, 5, 25, 13, 40, 44)
- dt = datetime(2017, 5, 25, 14, 49, 50)
- expected_dt = datetime(2017, 5, 25, 14, 50, 20)
-
- trigger = _DummyTriggerWithJitter(dt, 60)
- assert trigger.get_next_fire_time(None, now) == expected_dt
-
- def test_jitter_in_past_but_initial_date_in_future(self, monkeypatch):
- monkeypatch.setattr(random, 'uniform', lambda a, b: -30.)
-
- now = datetime(2017, 5, 25, 13, 40, 44)
- dt = datetime(2017, 5, 25, 13, 40, 47)
- expected_dt = dt
-
- trigger = _DummyTriggerWithJitter(dt, 60)
- assert trigger.get_next_fire_time(None, now) == expected_dt
-
- def test_jitter_in_future_but_initial_date_in_past(self, monkeypatch):
- monkeypatch.setattr(random, 'uniform', lambda a, b: 30.)
-
- now = datetime(2017, 5, 25, 13, 40, 44)
- dt = datetime(2017, 5, 25, 13, 40, 30)
- expected_dt = datetime(2017, 5, 25, 13, 41, 0)
-
- trigger = _DummyTriggerWithJitter(dt, 60)
- assert trigger.get_next_fire_time(None, now) == expected_dt
-
- def test_jitter_misfire(self, monkeypatch):
- monkeypatch.setattr(random, 'uniform', lambda a, b: -30.)
-
- now = datetime(2017, 5, 25, 13, 40, 44)
- dt = datetime(2017, 5, 25, 13, 40, 40)
- expected_dt = dt
-
- trigger = _DummyTriggerWithJitter(dt, 60)
- assert trigger.get_next_fire_time(None, now) == expected_dt
-
- def test_jitter_is_now(self, monkeypatch):
- monkeypatch.setattr(random, 'uniform', lambda a, b: 4.)
-
- now = datetime(2017, 5, 25, 13, 40, 44)
- dt = datetime(2017, 5, 25, 13, 40, 40)
- expected_dt = now
-
- trigger = _DummyTriggerWithJitter(dt, 60)
- assert trigger.get_next_fire_time(None, now) == expected_dt
-
- def test_jitter(self):
- now = datetime(2017, 5, 25, 13, 36, 44)
- dt = datetime(2017, 5, 25, 13, 40, 45)
- min_expected_dt = datetime(2017, 5, 25, 13, 40, 40)
- max_expected_dt = datetime(2017, 5, 25, 13, 40, 50)
-
- trigger = _DummyTriggerWithJitter(dt, 5)
- for _ in range(0, 100):
- assert min_expected_dt <= trigger.get_next_fire_time(None, now) <= max_expected_dt
-
-
-class TestCronTrigger(object):
- def test_cron_trigger_1(self, timezone):
- trigger = CronTrigger(year='2009/2', month='1/3', day='5-13', timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009/2', month='1/3', day='5-13', "
- "timezone='Europe/Berlin')>")
- assert str(trigger) == "cron[year='2009/2', month='1/3', day='5-13']"
- start_date = timezone.localize(datetime(2008, 12, 1))
- correct_next_date = timezone.localize(datetime(2009, 1, 5))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_trigger_2(self, timezone):
- trigger = CronTrigger(year='2009/2', month='1/3', day='5-13', timezone=timezone)
- start_date = timezone.localize(datetime(2009, 10, 14))
- correct_next_date = timezone.localize(datetime(2011, 1, 5))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_trigger_3(self, timezone):
- trigger = CronTrigger(year='2009', month='feb-dec', hour='8-10', timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', month='feb-dec', hour='8-10', "
- "timezone='Europe/Berlin')>")
- start_date = timezone.localize(datetime(2009, 1, 1))
- correct_next_date = timezone.localize(datetime(2009, 2, 1, 8))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_trigger_4(self, timezone):
- trigger = CronTrigger(year='2012', month='2', day='last', timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2012', month='2', day='last', "
- "timezone='Europe/Berlin')>")
- start_date = timezone.localize(datetime(2012, 2, 1))
- correct_next_date = timezone.localize(datetime(2012, 2, 29))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_start_end_times_string(self, timezone, monkeypatch):
- monkeypatch.setattr('apscheduler.triggers.cron.get_localzone', Mock(return_value=timezone))
- trigger = CronTrigger(start_date='2016-11-05 05:06:53', end_date='2017-11-05 05:11:32')
- assert trigger.start_date == timezone.localize(datetime(2016, 11, 5, 5, 6, 53))
- assert trigger.end_date == timezone.localize(datetime(2017, 11, 5, 5, 11, 32))
-
- def test_cron_zero_value(self, timezone):
- trigger = CronTrigger(year=2009, month=2, hour=0, timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', month='2', hour='0', "
- "timezone='Europe/Berlin')>")
-
- def test_cron_year_list(self, timezone):
- trigger = CronTrigger(year='2009,2008', timezone=timezone)
- assert repr(trigger) == "<CronTrigger (year='2009,2008', timezone='Europe/Berlin')>"
- assert str(trigger) == "cron[year='2009,2008']"
- start_date = timezone.localize(datetime(2009, 1, 1))
- correct_next_date = timezone.localize(datetime(2009, 1, 1))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_start_date(self, timezone):
- trigger = CronTrigger(year='2009', month='2', hour='8-10',
- start_date='2009-02-03 11:00:00', timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', month='2', hour='8-10', "
- "start_date='2009-02-03 11:00:00 CET', "
- "timezone='Europe/Berlin')>")
- assert str(trigger) == "cron[year='2009', month='2', hour='8-10']"
- start_date = timezone.localize(datetime(2009, 1, 1))
- correct_next_date = timezone.localize(datetime(2009, 2, 4, 8))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_previous_fire_time_1(self, timezone):
- """Test for previous_fire_time arg in get_next_fire_time()"""
- trigger = CronTrigger(day="*", timezone=timezone)
- previous_fire_time = timezone.localize(datetime(2015, 11, 23))
- now = timezone.localize(datetime(2015, 11, 26))
- correct_next_date = timezone.localize(datetime(2015, 11, 24))
- assert trigger.get_next_fire_time(previous_fire_time, now) == correct_next_date
-
- def test_previous_fire_time_2(self, timezone):
- trigger = CronTrigger(day="*", timezone=timezone)
- previous_fire_time = timezone.localize(datetime(2015, 11, 23))
- now = timezone.localize(datetime(2015, 11, 22))
- correct_next_date = timezone.localize(datetime(2015, 11, 22))
- assert trigger.get_next_fire_time(previous_fire_time, now) == correct_next_date
-
- def test_previous_fire_time_3(self, timezone):
- trigger = CronTrigger(day="*", timezone=timezone)
- previous_fire_time = timezone.localize(datetime(2016, 4, 25))
- now = timezone.localize(datetime(2016, 4, 25))
- correct_next_date = timezone.localize(datetime(2016, 4, 26))
- assert trigger.get_next_fire_time(previous_fire_time, now) == correct_next_date
-
- def test_cron_weekday_overlap(self, timezone):
- trigger = CronTrigger(year=2009, month=1, day='6-10', day_of_week='2-4', timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', month='1', day='6-10', "
- "day_of_week='2-4', timezone='Europe/Berlin')>")
- assert str(trigger) == "cron[year='2009', month='1', day='6-10', day_of_week='2-4']"
- start_date = timezone.localize(datetime(2009, 1, 1))
- correct_next_date = timezone.localize(datetime(2009, 1, 7))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_weekday_nomatch(self, timezone):
- trigger = CronTrigger(year=2009, month=1, day='6-10', day_of_week='0,6', timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', month='1', day='6-10', "
- "day_of_week='0,6', timezone='Europe/Berlin')>")
- assert str(trigger) == "cron[year='2009', month='1', day='6-10', day_of_week='0,6']"
- start_date = timezone.localize(datetime(2009, 1, 1))
- correct_next_date = None
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_weekday_positional(self, timezone):
- trigger = CronTrigger(year=2009, month=1, day='4th wed', timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', month='1', day='4th wed', "
- "timezone='Europe/Berlin')>")
- assert str(trigger) == "cron[year='2009', month='1', day='4th wed']"
- start_date = timezone.localize(datetime(2009, 1, 1))
- correct_next_date = timezone.localize(datetime(2009, 1, 28))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_week_1(self, timezone):
- trigger = CronTrigger(year=2009, month=2, week=8, timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', month='2', week='8', "
- "timezone='Europe/Berlin')>")
- assert str(trigger) == "cron[year='2009', month='2', week='8']"
- start_date = timezone.localize(datetime(2009, 1, 1))
- correct_next_date = timezone.localize(datetime(2009, 2, 16))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_week_2(self, timezone):
- trigger = CronTrigger(year=2009, week=15, day_of_week=2, timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', week='15', day_of_week='2', "
- "timezone='Europe/Berlin')>")
- assert str(trigger) == "cron[year='2009', week='15', day_of_week='2']"
- start_date = timezone.localize(datetime(2009, 1, 1))
- correct_next_date = timezone.localize(datetime(2009, 4, 8))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_extra_coverage(self, timezone):
- # This test has no value other than patching holes in test coverage
- trigger = CronTrigger(day='6,8', timezone=timezone)
- assert repr(trigger) == "<CronTrigger (day='6,8', timezone='Europe/Berlin')>"
- assert str(trigger) == "cron[day='6,8']"
- start_date = timezone.localize(datetime(2009, 12, 31))
- correct_next_date = timezone.localize(datetime(2010, 1, 6))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_faulty_expr(self, timezone):
- pytest.raises(ValueError, CronTrigger, year='2009-fault', timezone=timezone)
-
- def test_cron_increment_weekday(self, timezone):
- """
- Tests that incrementing the weekday field in the process of calculating the next matching
- date won't cause problems.
-
- """
- trigger = CronTrigger(hour='5-6', timezone=timezone)
- assert repr(trigger) == "<CronTrigger (hour='5-6', timezone='Europe/Berlin')>"
- assert str(trigger) == "cron[hour='5-6']"
- start_date = timezone.localize(datetime(2009, 9, 25, 7))
- correct_next_date = timezone.localize(datetime(2009, 9, 26, 5))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_cron_bad_kwarg(self, timezone):
- pytest.raises(TypeError, CronTrigger, second=0, third=1, timezone=timezone)
-
- def test_month_rollover(self, timezone):
- trigger = CronTrigger(timezone=timezone, day=30)
- now = timezone.localize(datetime(2016, 2, 1))
- expected = timezone.localize(datetime(2016, 3, 30))
- assert trigger.get_next_fire_time(None, now) == expected
-
- def test_timezone_from_start_date(self, timezone):
- """
- Tests that the trigger takes the timezone from the start_date parameter if no timezone is
- supplied.
-
- """
- start_date = timezone.localize(datetime(2014, 4, 13, 5, 30))
- trigger = CronTrigger(year=2014, hour=4, start_date=start_date)
- assert trigger.timezone == start_date.tzinfo
-
- def test_end_date(self, timezone):
- end_date = timezone.localize(datetime(2014, 4, 13, 3))
- trigger = CronTrigger(year=2014, hour=4, end_date=end_date)
-
- start_date = timezone.localize(datetime(2014, 4, 13, 2, 30))
- assert trigger.get_next_fire_time(None, start_date - timedelta(1)) == \
- start_date.replace(day=12, hour=4, minute=0)
- assert trigger.get_next_fire_time(None, start_date) is None
-
- def test_different_tz(self, timezone):
- alter_tz = pytz.FixedOffset(-600)
- trigger = CronTrigger(year=2009, week=15, day_of_week=2, timezone=timezone)
- assert repr(trigger) == ("<CronTrigger (year='2009', week='15', day_of_week='2', "
- "timezone='Europe/Berlin')>")
- assert str(trigger) == "cron[year='2009', week='15', day_of_week='2']"
- start_date = alter_tz.localize(datetime(2008, 12, 31, 22))
- correct_next_date = timezone.localize(datetime(2009, 4, 8))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- @pytest.mark.parametrize('trigger_args, start_date, start_date_dst, correct_next_date', [
- ({'hour': 8}, datetime(2013, 3, 9, 12), False, datetime(2013, 3, 10, 8)),
- ({'hour': 8}, datetime(2013, 11, 2, 12), True, datetime(2013, 11, 3, 8)),
- ({'minute': '*/30'}, datetime(2013, 3, 10, 1, 35), False, datetime(2013, 3, 10, 3)),
- ({'minute': '*/30'}, datetime(2013, 11, 3, 1, 35), True, datetime(2013, 11, 3, 1))
- ], ids=['absolute_spring', 'absolute_autumn', 'interval_spring', 'interval_autumn'])
- def test_dst_change(self, trigger_args, start_date, start_date_dst, correct_next_date):
- """
- Making sure that CronTrigger works correctly when crossing the DST switch threshold.
- Note that you should explicitly compare datetimes as strings to avoid the internal datetime
- comparison which would test for equality in the UTC timezone.
-
- """
- timezone = pytz.timezone('US/Eastern')
- trigger = CronTrigger(timezone=timezone, **trigger_args)
- start_date = timezone.localize(start_date, is_dst=start_date_dst)
- correct_next_date = timezone.localize(correct_next_date, is_dst=not start_date_dst)
- assert str(trigger.get_next_fire_time(None, start_date)) == str(correct_next_date)
-
- def test_timezone_change(self, timezone):
- """
- Ensure that get_next_fire_time method returns datetimes in the timezone of the trigger and
- not in the timezone of the passed in start_date.
-
- """
- est = pytz.FixedOffset(-300)
- cst = pytz.FixedOffset(-360)
- trigger = CronTrigger(hour=11, minute='*/5', timezone=est)
- start_date = cst.localize(datetime(2009, 9, 26, 10, 16))
- correct_next_date = est.localize(datetime(2009, 9, 26, 11, 20))
- assert str(trigger.get_next_fire_time(None, start_date)) == str(correct_next_date)
-
- def test_pickle(self, timezone):
- """Test that the trigger is pickleable."""
-
- trigger = CronTrigger(year=2016, month='5-6', day='20-28', hour=7, minute=25, second='*',
- timezone=timezone)
- data = pickle.dumps(trigger, 2)
- trigger2 = pickle.loads(data)
-
- for attr in CronTrigger.__slots__:
- assert getattr(trigger2, attr) == getattr(trigger, attr)
-
- def test_jitter_produces_differrent_valid_results(self, timezone):
- trigger = CronTrigger(minute='*', jitter=5)
- now = timezone.localize(datetime(2017, 11, 12, 6, 55, 30))
-
- results = set()
- for _ in range(0, 100):
- next_fire_time = trigger.get_next_fire_time(None, now)
- results.add(next_fire_time)
- assert timedelta(seconds=25) <= (next_fire_time - now) <= timedelta(seconds=35)
-
- assert 1 < len(results)
-
- def test_jitter_with_timezone(self, timezone):
- est = pytz.FixedOffset(-300)
- cst = pytz.FixedOffset(-360)
- trigger = CronTrigger(hour=11, minute='*/5', timezone=est, jitter=5)
- start_date = cst.localize(datetime(2009, 9, 26, 10, 16))
- correct_next_date = est.localize(datetime(2009, 9, 26, 11, 20))
- for _ in range(0, 100):
- assert abs(trigger.get_next_fire_time(None, start_date) -
- correct_next_date) <= timedelta(seconds=5)
-
- @pytest.mark.parametrize('trigger_args, start_date, start_date_dst, correct_next_date', [
- ({'hour': 8}, datetime(2013, 3, 9, 12), False, datetime(2013, 3, 10, 8)),
- ({'hour': 8}, datetime(2013, 11, 2, 12), True, datetime(2013, 11, 3, 8)),
- ({'minute': '*/30'}, datetime(2013, 3, 10, 1, 35), False, datetime(2013, 3, 10, 3)),
- ({'minute': '*/30'}, datetime(2013, 11, 3, 1, 35), True, datetime(2013, 11, 3, 1))
- ], ids=['absolute_spring', 'absolute_autumn', 'interval_spring', 'interval_autumn'])
- def test_jitter_dst_change(self, trigger_args, start_date, start_date_dst, correct_next_date):
- timezone = pytz.timezone('US/Eastern')
- trigger = CronTrigger(timezone=timezone, jitter=5, **trigger_args)
- start_date = timezone.localize(start_date, is_dst=start_date_dst)
- correct_next_date = timezone.localize(correct_next_date, is_dst=not start_date_dst)
-
- for _ in range(0, 100):
- next_fire_time = trigger.get_next_fire_time(None, start_date)
- assert abs(next_fire_time - correct_next_date) <= timedelta(seconds=5)
-
- def test_jitter_with_end_date(self, timezone):
- now = timezone.localize(datetime(2017, 11, 12, 6, 55, 30))
- end_date = timezone.localize(datetime(2017, 11, 12, 6, 56, 0))
- trigger = CronTrigger(minute='*', jitter=5, end_date=end_date)
-
- for _ in range(0, 100):
- next_fire_time = trigger.get_next_fire_time(None, now)
- assert next_fire_time is None or next_fire_time <= end_date
-
- @pytest.mark.parametrize('values, expected', [
- (dict(day='*/31'), r"Error validating expression '\*/31': the step value \(31\) is higher "
- r"than the total range of the expression \(30\)"),
- (dict(day='4-6/3'), r"Error validating expression '4-6/3': the step value \(3\) is higher "
- r"than the total range of the expression \(2\)"),
- (dict(hour='0-24'), r"Error validating expression '0-24': the last value \(24\) is higher "
- r"than the maximum value \(23\)"),
- (dict(day='0-3'), r"Error validating expression '0-3': the first value \(0\) is lower "
- r"than the minimum value \(1\)")
- ], ids=['too_large_step_all', 'too_large_step_range', 'too_high_last', 'too_low_first'])
- def test_invalid_ranges(self, values, expected):
- pytest.raises(ValueError, CronTrigger, **values).match(expected)
-
- @pytest.mark.parametrize('expr, expected_repr', [
- ('* * * * *',
- "<CronTrigger (month='*', day='*', day_of_week='*', hour='*', minute='*', "
- "timezone='Europe/Berlin')>"),
- ('0-14 * 14-28 jul fri',
- "<CronTrigger (month='jul', day='14-28', day_of_week='fri', hour='*', minute='0-14', "
- "timezone='Europe/Berlin')>"),
- (' 0-14 * 14-28 jul fri',
- "<CronTrigger (month='jul', day='14-28', day_of_week='fri', hour='*', minute='0-14', "
- "timezone='Europe/Berlin')>")
- ], ids=['always', 'assorted', 'multiple_spaces_in_format'])
- def test_from_crontab(self, expr, expected_repr, timezone):
- trigger = CronTrigger.from_crontab(expr, timezone)
- assert repr(trigger) == expected_repr
-
-
-class TestDateTrigger(object):
- @pytest.mark.parametrize('run_date,alter_tz,previous,now,expected', [
- (datetime(2009, 7, 6), None, None, datetime(2008, 5, 4), datetime(2009, 7, 6)),
- (datetime(2009, 7, 6), None, None, datetime(2009, 7, 6), datetime(2009, 7, 6)),
- (datetime(2009, 7, 6), None, None, datetime(2009, 9, 2), datetime(2009, 7, 6)),
- ('2009-7-6', None, None, datetime(2009, 9, 2), datetime(2009, 7, 6)),
- (datetime(2009, 7, 6), None, datetime(2009, 7, 6), datetime(2009, 9, 2), None),
- (datetime(2009, 7, 5, 22), pytz.FixedOffset(-60), datetime(2009, 7, 6),
- datetime(2009, 7, 6), None),
- (None, pytz.FixedOffset(-120), None, datetime(2011, 4, 3, 18, 40),
- datetime(2011, 4, 3, 18, 40))
- ], ids=['earlier', 'exact', 'later', 'as text', 'previously fired', 'alternate timezone',
- 'current_time'])
- def test_get_next_fire_time(self, run_date, alter_tz, previous, now, expected, timezone,
- freeze_time):
- trigger = DateTrigger(run_date, alter_tz or timezone)
- previous = timezone.localize(previous) if previous else None
- now = timezone.localize(now)
- expected = timezone.localize(expected) if expected else None
- assert trigger.get_next_fire_time(previous, now) == expected
-
- @pytest.mark.parametrize('is_dst', [True, False], ids=['daylight saving', 'standard time'])
- def test_dst_change(self, is_dst):
- """
- Test that DateTrigger works during the ambiguous "fall-back" DST period.
-
- Note that you should explicitly compare datetimes as strings to avoid the internal datetime
- comparison which would test for equality in the UTC timezone.
-
- """
- eastern = pytz.timezone('US/Eastern')
- run_date = eastern.localize(datetime(2013, 10, 3, 1, 5), is_dst=is_dst)
-
- fire_date = eastern.normalize(run_date + timedelta(minutes=55))
- trigger = DateTrigger(run_date=fire_date, timezone=eastern)
- assert str(trigger.get_next_fire_time(None, fire_date)) == str(fire_date)
-
- def test_repr(self, timezone):
- trigger = DateTrigger(datetime(2009, 7, 6), timezone)
- assert repr(trigger) == "<DateTrigger (run_date='2009-07-06 00:00:00 CEST')>"
-
- def test_str(self, timezone):
- trigger = DateTrigger(datetime(2009, 7, 6), timezone)
- assert str(trigger) == "date[2009-07-06 00:00:00 CEST]"
-
- def test_pickle(self, timezone):
- """Test that the trigger is pickleable."""
- trigger = DateTrigger(date(2016, 4, 3), timezone=timezone)
- data = pickle.dumps(trigger, 2)
- trigger2 = pickle.loads(data)
- assert trigger2.run_date == trigger.run_date
-
-
-class TestIntervalTrigger(object):
- @pytest.fixture()
- def trigger(self, timezone):
- return IntervalTrigger(seconds=1, start_date=datetime(2009, 8, 4, second=2),
- timezone=timezone)
-
- def test_invalid_interval(self, timezone):
- pytest.raises(TypeError, IntervalTrigger, '1-6', timezone=timezone)
-
- def test_start_end_times_string(self, timezone, monkeypatch):
- monkeypatch.setattr('apscheduler.triggers.interval.get_localzone',
- Mock(return_value=timezone))
- trigger = IntervalTrigger(start_date='2016-11-05 05:06:53', end_date='2017-11-05 05:11:32')
- assert trigger.start_date == timezone.localize(datetime(2016, 11, 5, 5, 6, 53))
- assert trigger.end_date == timezone.localize(datetime(2017, 11, 5, 5, 11, 32))
-
- def test_before(self, trigger, timezone):
- """Tests that if "start_date" is later than "now", it will return start_date."""
- now = trigger.start_date - timedelta(seconds=2)
- assert trigger.get_next_fire_time(None, now) == trigger.start_date
-
- def test_within(self, trigger, timezone):
- """
- Tests that if "now" is between "start_date" and the next interval, it will return the next
- interval.
-
- """
- now = trigger.start_date + timedelta(microseconds=1000)
- assert trigger.get_next_fire_time(None, now) == trigger.start_date + trigger.interval
-
- def test_no_start_date(self, timezone):
- trigger = IntervalTrigger(seconds=2, timezone=timezone)
- now = datetime.now(timezone)
- assert (trigger.get_next_fire_time(None, now) - now) <= timedelta(seconds=2)
-
- def test_different_tz(self, trigger, timezone):
- alter_tz = pytz.FixedOffset(-60)
- start_date = alter_tz.localize(datetime(2009, 8, 3, 22, second=2, microsecond=1000))
- correct_next_date = timezone.localize(datetime(2009, 8, 4, 1, second=3))
- assert trigger.get_next_fire_time(None, start_date) == correct_next_date
-
- def test_end_date(self, timezone):
- """Tests that the interval trigger won't return any datetimes past the set end time."""
- start_date = timezone.localize(datetime(2014, 5, 26))
- trigger = IntervalTrigger(minutes=5, start_date=start_date,
- end_date=datetime(2014, 5, 26, 0, 7), timezone=timezone)
- assert trigger.get_next_fire_time(None, start_date + timedelta(minutes=2)) == \
- start_date.replace(minute=5)
- assert trigger.get_next_fire_time(None, start_date + timedelta(minutes=6)) is None
-
- def test_dst_change(self):
- """
- Making sure that IntervalTrigger works during the ambiguous "fall-back" DST period.
- Note that you should explicitly compare datetimes as strings to avoid the internal datetime
- comparison which would test for equality in the UTC timezone.
-
- """
- eastern = pytz.timezone('US/Eastern')
- start_date = datetime(2013, 3, 1) # Start within EDT
- trigger = IntervalTrigger(hours=1, start_date=start_date, timezone=eastern)
-
- datetime_edt = eastern.localize(datetime(2013, 3, 10, 1, 5), is_dst=False)
- correct_next_date = eastern.localize(datetime(2013, 3, 10, 3), is_dst=True)
- assert str(trigger.get_next_fire_time(None, datetime_edt)) == str(correct_next_date)
-
- datetime_est = eastern.localize(datetime(2013, 11, 3, 1, 5), is_dst=True)
- correct_next_date = eastern.localize(datetime(2013, 11, 3, 1), is_dst=False)
- assert str(trigger.get_next_fire_time(None, datetime_est)) == str(correct_next_date)
-
- def test_space_in_expr(self, timezone):
- trigger = CronTrigger(day='1-2, 4-7', timezone=timezone)
- assert repr(trigger) == "<CronTrigger (day='1-2,4-7', timezone='Europe/Berlin')>"
-
- def test_repr(self, trigger):
- if sys.version_info[:2] < (3, 7):
- timedelta_args = '0, 1'
- else:
- timedelta_args = 'seconds=1'
-
- assert repr(trigger) == ("<IntervalTrigger (interval=datetime.timedelta({}), "
- "start_date='2009-08-04 00:00:02 CEST', "
- "timezone='Europe/Berlin')>".format(timedelta_args))
-
- def test_str(self, trigger):
- assert str(trigger) == "interval[0:00:01]"
-
- def test_pickle(self, timezone):
- """Test that the trigger is pickleable."""
-
- trigger = IntervalTrigger(weeks=2, days=6, minutes=13, seconds=2,
- start_date=date(2016, 4, 3), timezone=timezone,
- jitter=12)
- data = pickle.dumps(trigger, 2)
- trigger2 = pickle.loads(data)
-
- for attr in IntervalTrigger.__slots__:
- assert getattr(trigger2, attr) == getattr(trigger, attr)
-
- def test_jitter_produces_different_valid_results(self, timezone):
- trigger = IntervalTrigger(seconds=5, timezone=timezone, jitter=3)
- now = datetime.now(timezone)
-
- results = set()
- for _ in range(0, 100):
- next_fire_time = trigger.get_next_fire_time(None, now)
- results.add(next_fire_time)
- assert timedelta(seconds=2) <= (next_fire_time - now) <= timedelta(seconds=8)
- assert 1 < len(results)
-
- @pytest.mark.parametrize('trigger_args, start_date, start_date_dst, correct_next_date', [
- ({'hours': 1}, datetime(2013, 3, 10, 1, 35), False, datetime(2013, 3, 10, 3, 35)),
- ({'hours': 1}, datetime(2013, 11, 3, 1, 35), True, datetime(2013, 11, 3, 1, 35))
- ], ids=['interval_spring', 'interval_autumn'])
- def test_jitter_dst_change(self, trigger_args, start_date, start_date_dst, correct_next_date):
- timezone = pytz.timezone('US/Eastern')
- epsilon = timedelta(seconds=1)
- start_date = timezone.localize(start_date, is_dst=start_date_dst)
- trigger = IntervalTrigger(timezone=timezone, start_date=start_date, jitter=5,
- **trigger_args)
- correct_next_date = timezone.localize(correct_next_date, is_dst=not start_date_dst)
-
- for _ in range(0, 100):
- next_fire_time = trigger.get_next_fire_time(None, start_date + epsilon)
- assert abs(next_fire_time - correct_next_date) <= timedelta(seconds=5)
-
- def test_jitter_with_end_date(self, timezone):
- now = timezone.localize(datetime(2017, 11, 12, 6, 55, 58))
- end_date = timezone.localize(datetime(2017, 11, 12, 6, 56, 0))
- trigger = IntervalTrigger(seconds=5, jitter=5, end_date=end_date)
-
- for _ in range(0, 100):
- next_fire_time = trigger.get_next_fire_time(None, now)
- assert next_fire_time is None or next_fire_time <= end_date
-
-
-class TestAndTrigger(object):
- @pytest.fixture
- def trigger(self, timezone):
- return AndTrigger([
- CronTrigger(month='5-8', day='6-15',
- end_date=timezone.localize(datetime(2017, 8, 10))),
- CronTrigger(month='6-9', day='*/3', end_date=timezone.localize(datetime(2017, 9, 7)))
- ])
-
- @pytest.mark.parametrize('start_time, expected', [
- (datetime(2017, 8, 6), datetime(2017, 8, 7)),
- (datetime(2017, 8, 10, 1), None)
- ], ids=['firstmatch', 'end'])
- def test_next_fire_time(self, trigger, timezone, start_time, expected):
- expected = timezone.localize(expected) if expected else None
- assert trigger.get_next_fire_time(None, timezone.localize(start_time)) == expected
-
- def test_jitter(self, trigger, timezone):
- trigger.jitter = 5
- start_time = timezone.localize(datetime(2017, 8, 6))
- expected = timezone.localize(datetime(2017, 8, 7))
- for _ in range(100):
- next_fire_time = trigger.get_next_fire_time(None, start_time)
- assert abs(expected - next_fire_time) <= timedelta(seconds=5)
-
- @pytest.mark.parametrize('jitter', [None, 5], ids=['nojitter', 'jitter'])
- def test_repr(self, trigger, jitter):
- trigger.jitter = jitter
- jitter_part = ', jitter={}'.format(jitter) if jitter else ''
- assert repr(trigger) == (
- "<AndTrigger([<CronTrigger (month='5-8', day='6-15', "
- "end_date='2017-08-10 00:00:00 CEST', timezone='Europe/Berlin')>, <CronTrigger "
- "(month='6-9', day='*/3', end_date='2017-09-07 00:00:00 CEST', "
- "timezone='Europe/Berlin')>]{})>".format(jitter_part))
-
- def test_str(self, trigger):
- assert str(trigger) == "and[cron[month='5-8', day='6-15'], cron[month='6-9', day='*/3']]"
-
- @pytest.mark.parametrize('jitter', [None, 5], ids=['nojitter', 'jitter'])
- def test_pickle(self, trigger, jitter):
- """Test that the trigger is pickleable."""
- trigger.jitter = jitter
- data = pickle.dumps(trigger, 2)
- trigger2 = pickle.loads(data)
-
- for attr in BaseCombiningTrigger.__slots__:
- assert repr(getattr(trigger2, attr)) == repr(getattr(trigger, attr))
-
-
-class TestOrTrigger(object):
- @pytest.fixture
- def trigger(self, timezone):
- return OrTrigger([
- CronTrigger(month='5-8', day='6-15',
- end_date=timezone.localize(datetime(2017, 8, 10))),
- CronTrigger(month='6-9', day='*/3', end_date=timezone.localize(datetime(2017, 9, 7)))
- ])
-
- @pytest.mark.parametrize('start_time, expected', [
- (datetime(2017, 8, 6), datetime(2017, 8, 6)),
- (datetime(2017, 9, 7, 1), None)
- ], ids=['earliest', 'end'])
- def test_next_fire_time(self, trigger, timezone, start_time, expected):
- expected = timezone.localize(expected) if expected else None
- assert trigger.get_next_fire_time(None, timezone.localize(start_time)) == expected
-
- def test_jitter(self, trigger, timezone):
- trigger.jitter = 5
- start_time = expected = timezone.localize(datetime(2017, 8, 6))
- for _ in range(100):
- next_fire_time = trigger.get_next_fire_time(None, start_time)
- assert abs(expected - next_fire_time) <= timedelta(seconds=5)
-
- @pytest.mark.parametrize('jitter', [None, 5], ids=['nojitter', 'jitter'])
- def test_repr(self, trigger, jitter):
- trigger.jitter = jitter
- jitter_part = ', jitter={}'.format(jitter) if jitter else ''
- assert repr(trigger) == (
- "<OrTrigger([<CronTrigger (month='5-8', day='6-15', "
- "end_date='2017-08-10 00:00:00 CEST', timezone='Europe/Berlin')>, <CronTrigger "
- "(month='6-9', day='*/3', end_date='2017-09-07 00:00:00 CEST', "
- "timezone='Europe/Berlin')>]{})>".format(jitter_part))
-
- def test_str(self, trigger):
- assert str(trigger) == "or[cron[month='5-8', day='6-15'], cron[month='6-9', day='*/3']]"
-
- @pytest.mark.parametrize('jitter', [None, 5], ids=['nojitter', 'jitter'])
- def test_pickle(self, trigger, jitter):
- """Test that the trigger is pickleable."""
- trigger.jitter = jitter
- data = pickle.dumps(trigger, 2)
- trigger2 = pickle.loads(data)
-
- for attr in BaseCombiningTrigger.__slots__:
- assert repr(getattr(trigger2, attr)) == repr(getattr(trigger, attr))
diff --git a/tests/test_util.py b/tests/test_util.py
index 777e0ce..f80e17b 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -1,18 +1,14 @@
import platform
import sys
-from datetime import date, datetime, timedelta, tzinfo
+from datetime import datetime, timedelta
from functools import partial
from types import ModuleType
-from unittest.mock import Mock
import pytest
import pytz
-from apscheduler.job import Job
from apscheduler.util import (
- asint, asbool, astimezone, convert_to_datetime, datetime_to_utc_timestamp,
- utc_timestamp_to_datetime, timedelta_seconds, datetime_ceil, get_callable_name, obj_to_ref,
- ref_to_obj, maybe_ref, check_callable_args, datetime_repr)
+ datetime_ceil, get_callable_name, obj_to_ref, ref_to_obj, maybe_ref, check_callable_args)
class DummyClass(object):
@@ -36,126 +32,10 @@ class DummyClass(object):
pass
-class InheritedDummyClass(Job):
- pass
-
-
-class TestAsint(object):
- @pytest.mark.parametrize('value', ['5s', 'shplse'], ids=['digit first', 'text'])
- def test_invalid_value(self, value):
- pytest.raises(ValueError, asint, value)
-
- def test_number(self):
- assert asint('539') == 539
-
- def test_none(self):
- assert asint(None) is None
-
-
-class TestAsbool(object):
- @pytest.mark.parametrize(
- 'value',
- [' True', 'true ', 'Yes', ' yes ', '1 ', True],
- ids=['capital true', 'lowercase true', 'capital yes', 'lowercase yes', 'one', 'True'])
- def test_true(self, value):
- assert asbool(value) is True
-
- @pytest.mark.parametrize(
- 'value',
- [' False', 'false ', 'No', ' no ', '0 ', False],
- ids=['capital', 'lowercase false', 'capital no', 'lowercase no', 'zero', 'False'])
- def test_false(self, value):
- assert asbool(value) is False
-
- def test_bad_value(self):
- pytest.raises(ValueError, asbool, 'yep')
-
-
-class TestAstimezone(object):
- def test_str(self):
- value = astimezone('Europe/Helsinki')
- assert isinstance(value, tzinfo)
-
- def test_tz(self):
- tz = pytz.timezone('Europe/Helsinki')
- value = astimezone(tz)
- assert tz is value
-
- def test_none(self):
- assert astimezone(None) is None
-
- def test_bad_timezone_type(self):
- exc = pytest.raises(TypeError, astimezone, tzinfo())
- assert 'Only timezones from the pytz library are supported' in str(exc.value)
-
- def test_bad_local_timezone(self):
- zone = Mock(tzinfo, localize=None, normalize=None, zone='local')
- exc = pytest.raises(ValueError, astimezone, zone)
- assert 'Unable to determine the name of the local timezone' in str(exc.value)
-
- def test_bad_value(self):
- exc = pytest.raises(TypeError, astimezone, 4)
- assert 'Expected tzinfo, got int instead' in str(exc.value)
-
-
-class TestConvertToDatetime(object):
- @pytest.mark.parametrize('input,expected', [
- (None, None),
- (date(2009, 8, 1), datetime(2009, 8, 1)),
- (datetime(2009, 8, 1, 5, 6, 12), datetime(2009, 8, 1, 5, 6, 12)),
- ('2009-8-1', datetime(2009, 8, 1)),
- ('2009-8-1 5:16:12', datetime(2009, 8, 1, 5, 16, 12)),
- ('2009-8-1T5:16:12Z', datetime(2009, 8, 1, 5, 16, 12, tzinfo=pytz.utc)),
- ('2009-8-1T5:16:12+02:30',
- pytz.FixedOffset(150).localize(datetime(2009, 8, 1, 5, 16, 12))),
- ('2009-8-1T5:16:12-05:30',
- pytz.FixedOffset(-330).localize(datetime(2009, 8, 1, 5, 16, 12))),
- (pytz.FixedOffset(-60).localize(datetime(2009, 8, 1)),
- pytz.FixedOffset(-60).localize(datetime(2009, 8, 1)))
- ], ids=['None', 'date', 'datetime', 'date as text', 'datetime as text', 'utc', 'tzoffset',
- 'negtzoffset', 'existing tzinfo'])
- def test_date(self, timezone, input, expected):
- returned = convert_to_datetime(input, timezone, None)
- if expected is not None:
- assert isinstance(returned, datetime)
- expected = timezone.localize(expected) if not expected.tzinfo else expected
-
- assert returned == expected
-
- def test_invalid_input_type(self, timezone):
- exc = pytest.raises(TypeError, convert_to_datetime, 92123, timezone, 'foo')
- assert str(exc.value) == 'Unsupported type for foo: int'
-
- def test_invalid_input_value(self, timezone):
- exc = pytest.raises(ValueError, convert_to_datetime, '19700-12-1', timezone, None)
- assert str(exc.value) == 'Invalid date string'
-
- def test_missing_timezone(self):
- exc = pytest.raises(ValueError, convert_to_datetime, '2009-8-1', None, 'argname')
- assert str(exc.value) == ('The "tz" argument must be specified if argname has no timezone '
- 'information')
-
- def test_text_timezone(self):
- returned = convert_to_datetime('2009-8-1', 'UTC', None)
- assert returned == datetime(2009, 8, 1, tzinfo=pytz.utc)
-
- def test_bad_timezone(self):
- exc = pytest.raises(TypeError, convert_to_datetime, '2009-8-1', tzinfo(), None)
- assert str(exc.value) == ('Only pytz timezones are supported (need the localize() and '
- 'normalize() methods)')
-
-
-def test_datetime_to_utc_timestamp(timezone):
- dt = timezone.localize(datetime(2014, 3, 12, 5, 40, 13, 254012))
- timestamp = datetime_to_utc_timestamp(dt)
- dt2 = utc_timestamp_to_datetime(timestamp)
- assert dt2 == dt
-
-
-def test_timedelta_seconds():
- delta = timedelta(minutes=2, seconds=30)
- seconds = timedelta_seconds(delta)
- assert seconds == 150
+class InheritedDummyClass(DummyClass):
+ @classmethod
+ def classmeth(cls):
+ pass
@pytest.mark.parametrize('input,expected', [
@@ -166,22 +46,13 @@ def test_datetime_ceil(input, expected):
assert datetime_ceil(input) == expected
-@pytest.mark.parametrize('input,expected', [
- (None, 'None'),
- (pytz.timezone('Europe/Helsinki').localize(datetime(2014, 5, 30, 7, 12, 20)),
- '2014-05-30 07:12:20 EEST')
-], ids=['None', 'datetime+tzinfo'])
-def test_datetime_repr(input, expected):
- assert datetime_repr(input) == expected
-
-
class TestGetCallableName(object):
@pytest.mark.parametrize('input,expected', [
- (asint, 'asint'),
+ (open, 'open'),
(DummyClass.staticmeth, 'DummyClass.staticmeth' if
hasattr(DummyClass, '__qualname__') else 'staticmeth'),
(DummyClass.classmeth, 'DummyClass.classmeth'),
- (DummyClass.meth, 'meth' if sys.version_info[:2] == (3, 2) else 'DummyClass.meth'),
+ (DummyClass.meth, 'DummyClass.meth'),
(DummyClass().meth, 'DummyClass.meth'),
(DummyClass, 'DummyClass'),
(DummyClass(), 'DummyClass')
@@ -216,7 +87,7 @@ class TestObjToRef(object):
(DummyClass.InnerDummyClass.innerclassmeth,
'test_util:DummyClass.InnerDummyClass.innerclassmeth'),
(DummyClass.staticmeth, 'test_util:DummyClass.staticmeth'),
- (InheritedDummyClass.pause, 'tests.test_util:InheritedDummyClass.pause'),
+ (InheritedDummyClass.classmeth, 'test_util:InheritedDummyClass.classmeth'),
(timedelta, 'datetime:timedelta'),
], ids=['unbound method', 'class method', 'inner class method', 'static method',
'inherited class method', 'timedelta'])
diff --git a/tests/triggers/test_calendarinterval.py b/tests/triggers/test_calendarinterval.py
index 154b989..ac0d023 100644
--- a/tests/triggers/test_calendarinterval.py
+++ b/tests/triggers/test_calendarinterval.py
@@ -1,95 +1,78 @@
-import pickle
from datetime import datetime, date
import pytest
-from pytz.tzinfo import DstTzInfo
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
-@pytest.fixture
-def trigger(timezone: DstTzInfo):
- return CalendarIntervalTrigger(
- years=1, months=5, weeks=6, days=8, hour=3, second=8, start_date=date(2016, 3, 5),
- end_date=date(2020, 12, 25), timezone=timezone)
-
-
-def test_bad_interval(timezone: DstTzInfo):
+def test_bad_interval(timezone):
exc = pytest.raises(ValueError, CalendarIntervalTrigger, timezone=timezone)
exc.match('interval must be at least 1 day long')
-def test_bad_start_end_dates(timezone: DstTzInfo):
+def test_bad_start_end_dates(timezone):
exc = pytest.raises(ValueError, CalendarIntervalTrigger, days=1,
start_date=date(2016, 3, 4), end_date=date(2016, 3, 3), timezone=timezone)
exc.match('end_date cannot be earlier than start_date')
-def test_pickle(trigger: CalendarIntervalTrigger):
- payload = pickle.dumps(trigger)
- deserialized = pickle.loads(payload)
-
- for attr in CalendarIntervalTrigger.__slots__:
- assert getattr(deserialized, attr) == getattr(trigger, attr)
-
-
-def test_setstate_unhandled_version(trigger: CalendarIntervalTrigger):
- exc = pytest.raises(ValueError, trigger.__setstate__, {'version': 2})
- exc.match('Got serialized data for version 2 of CalendarIntervalTrigger, '
- 'but only version 1 can be handled')
-
-
-def test_start_date(trigger: CalendarIntervalTrigger, timezone: DstTzInfo):
- """Test that start_date is respected."""
- now = timezone.localize(datetime(2016, 1, 15))
- expected = timezone.localize(datetime(2016, 3, 5, 3, 0, 8))
- assert trigger.get_next_fire_time(None, now) == expected
-
-
-def test_end_date(trigger: CalendarIntervalTrigger, timezone: DstTzInfo):
+def test_end_date(timezone, serializer):
"""Test that end_date is respected."""
- now = timezone.localize(datetime(2020, 12, 31))
- previous = timezone.localize(datetime(2020, 12, 25))
- assert trigger.get_next_fire_time(now, previous) is None
+ start_end_date = date(2020, 12, 31)
+ trigger = CalendarIntervalTrigger(days=1, start_date=start_end_date, end_date=start_end_date,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next().date() == start_end_date
+ assert trigger.next() is None
-def test_missing_time(timezone: DstTzInfo):
+def test_missing_time(timezone, serializer):
"""
Test that if the designated time does not exist on a day due to a forward DST shift, the day is
skipped entirely.
"""
- trigger = CalendarIntervalTrigger(days=1, hour=2, minute=30)
- now = timezone.localize(datetime(2016, 3, 27))
- expected = timezone.localize(datetime(2016, 3, 28, 2, 30))
- assert trigger.get_next_fire_time(None, now) == expected
+ trigger = CalendarIntervalTrigger(days=1, hour=2, minute=30, start_date=date(2016, 3, 27),
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+ assert trigger.next() == timezone.localize(datetime(2016, 3, 28, 2, 30))
-def test_repeated_time(timezone: DstTzInfo):
+
+def test_repeated_time(timezone, serializer):
"""
Test that if the designated time is repeated during a day due to a backward DST shift, the task
is executed on the earlier occurrence of that time.
"""
- trigger = CalendarIntervalTrigger(days=2, hour=2, minute=30)
- now = timezone.localize(datetime(2016, 10, 30))
- expected = timezone.localize(datetime(2016, 10, 30, 2, 30), is_dst=True)
- assert trigger.get_next_fire_time(None, now) == expected
+ trigger = CalendarIntervalTrigger(days=2, hour=2, minute=30, start_date=date(2016, 10, 30),
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2016, 10, 30, 2, 30), is_dst=True)
-def test_nonexistent_days(timezone: DstTzInfo):
+def test_nonexistent_days(timezone, serializer):
"""Test that invalid dates are skipped."""
- trigger = CalendarIntervalTrigger(months=1)
- now = timezone.localize(datetime(2016, 4, 30))
- previous = timezone.localize(datetime(2016, 3, 31))
- expected = timezone.localize(datetime(2016, 5, 31))
- assert trigger.get_next_fire_time(previous, now) == expected
+ trigger = CalendarIntervalTrigger(months=1, start_date=date(2016, 3, 31), timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+ assert trigger.next() == timezone.localize(datetime(2016, 3, 31))
+ assert trigger.next() == timezone.localize(datetime(2016, 5, 31))
-def test_str(trigger):
- assert str(trigger) == 'calendarinterval[1y, 5m, 6w, 8d at 03:00:08]'
+def test_repr(timezone, serializer):
+ trigger = CalendarIntervalTrigger(
+ years=1, months=5, weeks=6, days=8, hour=3, second=8, start_date=date(2016, 3, 5),
+ end_date=date(2020, 12, 25), timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
-def test_repr(trigger):
assert repr(trigger) == ("CalendarIntervalTrigger(years=1, months=5, weeks=6, days=8, "
- "time=03:00:08)")
+ "time='03:00:08', start_date='2016-03-05', end_date='2020-12-25', "
+ "timezone='Europe/Berlin')")
diff --git a/tests/triggers/test_combining.py b/tests/triggers/test_combining.py
new file mode 100644
index 0000000..b804102
--- /dev/null
+++ b/tests/triggers/test_combining.py
@@ -0,0 +1,94 @@
+from datetime import datetime, timedelta
+
+import pytest
+
+from apscheduler.exceptions import MaxIterationsReached
+from apscheduler.triggers.combining import AndTrigger, OrTrigger
+from apscheduler.triggers.date import DateTrigger
+from apscheduler.triggers.interval import IntervalTrigger
+
+
+class TestAndTrigger:
+ @pytest.mark.parametrize('threshold', [1, 0])
+ def test_two_datetriggers(self, timezone, serializer, threshold):
+ date1 = timezone.localize(datetime(2020, 5, 16, 14, 17, 30, 254212))
+ date2 = timezone.localize(datetime(2020, 5, 16, 14, 17, 31, 254212))
+ trigger = AndTrigger([DateTrigger(date1), DateTrigger(date2)], threshold=threshold)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ if threshold:
+ # date2 was within the threshold so it will not be used
+ assert trigger.next() == date1
+
+ assert trigger.next() is None
+
+ def test_max_iterations(self, timezone, serializer):
+ start_time = timezone.localize(datetime(2020, 5, 16, 14, 17, 30, 254212))
+ trigger = AndTrigger([
+ IntervalTrigger(seconds=4, start_time=start_time, timezone=timezone),
+ IntervalTrigger(seconds=4, start_time=start_time + timedelta(seconds=2),
+ timezone=timezone)
+ ])
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ pytest.raises(MaxIterationsReached, trigger.next)
+
+ def test_repr(self, timezone, serializer):
+ start_time = timezone.localize(datetime(2020, 5, 16, 14, 17, 30, 254212))
+ trigger = AndTrigger([
+ IntervalTrigger(seconds=4, start_time=start_time, timezone=timezone),
+ IntervalTrigger(seconds=4, start_time=start_time + timedelta(seconds=2),
+ timezone=timezone)
+ ])
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert repr(trigger) == (
+ "AndTrigger([IntervalTrigger(seconds=4, "
+ "start_time='2020-05-16T14:17:30.254212+02:00'), IntervalTrigger(seconds=4, "
+ "start_time='2020-05-16T14:17:32.254212+02:00')], threshold=1.0, max_iterations=10000)"
+ )
+
+
+class TestOrTrigger:
+ def test_two_datetriggers(self, timezone, serializer):
+ date1 = timezone.localize(datetime(2020, 5, 16, 14, 17, 30, 254212))
+ date2 = timezone.localize(datetime(2020, 5, 18, 15, 1, 53, 940564))
+ trigger = OrTrigger([DateTrigger(date1), DateTrigger(date2)])
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == date1
+ assert trigger.next() == date2
+ assert trigger.next() is None
+
+ def test_two_interval_triggers(self, timezone, serializer):
+ start_time = timezone.localize(datetime(2020, 5, 16, 14, 17, 30, 254212))
+ end_time1 = start_time + timedelta(seconds=16)
+ end_time2 = start_time + timedelta(seconds=18)
+ trigger = OrTrigger([
+ IntervalTrigger(seconds=4, start_time=start_time, end_time=end_time1),
+ IntervalTrigger(seconds=6, start_time=start_time, end_time=end_time2)
+ ])
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == start_time
+ assert trigger.next() == start_time + timedelta(seconds=4)
+ assert trigger.next() == start_time + timedelta(seconds=6)
+ assert trigger.next() == start_time + timedelta(seconds=8)
+ assert trigger.next() == start_time + timedelta(seconds=12)
+ assert trigger.next() == start_time + timedelta(seconds=16)
+ # The end time of the 4 second interval has been reached
+ assert trigger.next() == start_time + timedelta(seconds=18)
+ # The end time of the 6 second interval has been reached
+ assert trigger.next() is None
+
+ def test_repr(self, timezone):
+ date1 = timezone.localize(datetime(2020, 5, 16, 14, 17, 30, 254212))
+ date2 = timezone.localize(datetime(2020, 5, 18, 15, 1, 53, 940564))
+ trigger = OrTrigger([DateTrigger(date1), DateTrigger(date2)])
+ assert repr(trigger) == ("OrTrigger([DateTrigger('2020-05-16T14:17:30.254212+02:00'), "
+ "DateTrigger('2020-05-18T15:01:53.940564+02:00')])")
diff --git a/tests/triggers/test_cron.py b/tests/triggers/test_cron.py
new file mode 100644
index 0000000..1cfd523
--- /dev/null
+++ b/tests/triggers/test_cron.py
@@ -0,0 +1,321 @@
+from datetime import datetime
+
+import pytest
+import pytz
+
+from apscheduler.triggers.cron import CronTrigger
+
+
+def test_invalid_expression():
+ exc = pytest.raises(ValueError, CronTrigger, year='2009-fault')
+ exc.match("Unrecognized expression '2009-fault' for field 'year'")
+
+
+def test_invalid_step():
+ exc = pytest.raises(ValueError, CronTrigger, year='2009/0')
+ exc.match("Step must be higher than 0")
+
+
+def test_invalid_range():
+ exc = pytest.raises(ValueError, CronTrigger, year='2009-2008')
+ exc.match("The minimum value in a range must not be higher than the maximum")
+
+
+@pytest.mark.parametrize('expr', ['fab', 'jan-fab'], ids=['start', 'end'])
+def test_invalid_month_name(expr):
+ exc = pytest.raises(ValueError, CronTrigger, month=expr)
+ exc.match("Invalid month name 'fab'")
+
+
+@pytest.mark.parametrize('expr', ['web', 'mon-web'], ids=['start', 'end'])
+def test_invalid_weekday_name(expr):
+ exc = pytest.raises(ValueError, CronTrigger, day_of_week=expr)
+ exc.match("Invalid weekday name 'web'")
+
+
+def test_invalid_weekday_position_name():
+ exc = pytest.raises(ValueError, CronTrigger, day='1st web')
+ exc.match("Invalid weekday name 'web'")
+
+
+@pytest.mark.parametrize('values, expected', [
+ (dict(day='*/31'), r"Error validating expression '\*/31': the step value \(31\) is higher "
+ r"than the total range of the expression \(30\)"),
+ (dict(day='4-6/3'), r"Error validating expression '4-6/3': the step value \(3\) is higher "
+ r"than the total range of the expression \(2\)"),
+ (dict(hour='0-24'), r"Error validating expression '0-24': the last value \(24\) is higher "
+ r"than the maximum value \(23\)"),
+ (dict(day='0-3'), r"Error validating expression '0-3': the first value \(0\) is lower "
+ r"than the minimum value \(1\)")
+], ids=['too_large_step_all', 'too_large_step_range', 'too_high_last', 'too_low_first'])
+def test_invalid_ranges(values, expected):
+ pytest.raises(ValueError, CronTrigger, **values).match(expected)
+
+
+def test_cron_trigger_1(timezone, serializer):
+ start_time = timezone.localize(datetime(2008, 12, 1))
+ trigger = CronTrigger(year='2009/2', month='1-4/3', day='5-6', start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2009, 1, 5))
+ assert trigger.next() == timezone.localize(datetime(2009, 1, 6))
+ assert trigger.next() == timezone.localize(datetime(2009, 4, 5))
+ assert trigger.next() == timezone.localize(datetime(2009, 4, 6))
+ assert trigger.next() == timezone.localize(datetime(2011, 1, 5))
+ assert repr(trigger) == ("CronTrigger(year='2009/2', month='1-4/3', day='5-6', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2008-12-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_cron_trigger_2(timezone, serializer):
+ start_time = timezone.localize(datetime(2009, 10, 14))
+ trigger = CronTrigger(year='2009/2', month='1-3', day='5', start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2011, 1, 5))
+ assert trigger.next() == timezone.localize(datetime(2011, 2, 5))
+ assert trigger.next() == timezone.localize(datetime(2011, 3, 5))
+ assert trigger.next() == timezone.localize(datetime(2013, 1, 5))
+ assert repr(trigger) == ("CronTrigger(year='2009/2', month='1-3', day='5', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2009-10-14T00:00:00+02:00', timezone='Europe/Berlin')")
+
+
+def test_cron_trigger_3(timezone, serializer):
+ start_time = timezone.localize(datetime(2009, 1, 1))
+ trigger = CronTrigger(year='2009', month='feb-dec', hour='8-9', start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2009, 2, 1, 8))
+ assert trigger.next() == timezone.localize(datetime(2009, 2, 1, 9))
+ assert trigger.next() == timezone.localize(datetime(2009, 2, 2, 8))
+ assert repr(trigger) == ("CronTrigger(year='2009', month='feb-dec', day='*', week='*', "
+ "day_of_week='*', hour='8-9', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_cron_trigger_4(timezone, serializer):
+ start_time = timezone.localize(datetime(2012, 2, 1))
+ trigger = CronTrigger(year='2012', month='2', day='last', start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2012, 2, 29))
+ assert repr(trigger) == ("CronTrigger(year='2012', month='2', day='last', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2012-02-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_weekday_overlap(timezone, serializer):
+ start_time = timezone.localize(datetime(2009, 1, 1))
+ trigger = CronTrigger(year=2009, month=1, day='6-10', day_of_week='2-4', start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2009, 1, 7))
+ assert repr(trigger) == ("CronTrigger(year='2009', month='1', day='6-10', week='*', "
+ "day_of_week='2-4', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_weekday_range(timezone, serializer):
+ start_time = timezone.localize(datetime(2020, 1, 1))
+ trigger = CronTrigger(year=2020, month=1, week=1, day_of_week='fri-sun', start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2020, 1, 3))
+ assert trigger.next() == timezone.localize(datetime(2020, 1, 4))
+ assert trigger.next() == timezone.localize(datetime(2020, 1, 5))
+ assert trigger.next() is None
+ assert repr(trigger) == ("CronTrigger(year='2020', month='1', day='*', week='1', "
+ "day_of_week='fri-sun', hour='0', minute='0', second='0', "
+ "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_last_weekday(timezone, serializer):
+ start_time = timezone.localize(datetime(2020, 1, 1))
+ trigger = CronTrigger(year=2020, day='last sun', start_time=start_time, timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2020, 1, 26))
+ assert trigger.next() == timezone.localize(datetime(2020, 2, 23))
+ assert trigger.next() == timezone.localize(datetime(2020, 3, 29))
+ assert repr(trigger) == ("CronTrigger(year='2020', month='*', day='last sun', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_increment_weekday(timezone, serializer):
+ """
+ Tests that incrementing the weekday field in the process of calculating the next matching
+ date won't cause problems.
+
+ """
+ start_time = timezone.localize(datetime(2009, 9, 25, 7))
+ trigger = CronTrigger(hour='5-6', start_time=start_time, timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2009, 9, 26, 5))
+ assert repr(trigger) == ("CronTrigger(year='*', month='*', day='*', week='*', "
+ "day_of_week='*', hour='5-6', minute='0', second='0', "
+ "start_time='2009-09-25T07:00:00+02:00', timezone='Europe/Berlin')")
+
+
+def test_month_rollover(timezone, serializer):
+ start_time = timezone.localize(datetime(2016, 2, 1))
+ trigger = CronTrigger(day=30, start_time=start_time, timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2016, 3, 30))
+ assert trigger.next() == timezone.localize(datetime(2016, 4, 30))
+
+
+def test_weekday_nomatch(timezone, serializer):
+ start_time = timezone.localize(datetime(2009, 1, 1))
+ trigger = CronTrigger(year=2009, month=1, day='6-10', day_of_week='0,6', start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() is None
+ assert repr(trigger) == ("CronTrigger(year='2009', month='1', day='6-10', week='*', "
+ "day_of_week='0,6', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_weekday_positional(timezone, serializer):
+ start_time = timezone.localize(datetime(2009, 1, 1))
+ trigger = CronTrigger(year=2009, month=1, day='4th wed', start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2009, 1, 28))
+ assert repr(trigger) == ("CronTrigger(year='2009', month='1', day='4th wed', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_end_time(timezone, serializer):
+ """Test that next() won't produce"""
+ start_time = timezone.localize(datetime(2014, 4, 13, 2))
+ end_time = timezone.localize(datetime(2014, 4, 13, 4))
+ trigger = CronTrigger(hour=4, start_time=start_time, end_time=end_time, timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2014, 4, 13, 4))
+ assert trigger.next() is None
+ assert repr(trigger) == ("CronTrigger(year='*', month='*', day='*', week='*', "
+ "day_of_week='*', hour='4', minute='0', second='0', "
+ "start_time='2014-04-13T02:00:00+02:00', "
+ "end_time='2014-04-13T04:00:00+02:00', timezone='Europe/Berlin')")
+
+
+def test_week_1(timezone, serializer):
+ start_time = timezone.localize(datetime(2009, 1, 1))
+ trigger = CronTrigger(year=2009, month=2, week=8, start_time=start_time, timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ for day in range(16, 23):
+ assert trigger.next() == timezone.localize(datetime(2009, 2, day))
+
+ assert trigger.next() is None
+ assert repr(trigger) == ("CronTrigger(year='2009', month='2', day='*', week='8', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_week_2(timezone, serializer):
+ start_time = timezone.localize(datetime(2009, 1, 1))
+ trigger = CronTrigger(year=2009, week=15, day_of_week=2, start_time=start_time,
+ timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(datetime(2009, 4, 8))
+ assert trigger.next() is None
+ assert repr(trigger) == ("CronTrigger(year='2009', month='*', day='*', week='15', "
+ "day_of_week='2', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+@pytest.mark.parametrize('trigger_args, start_time, start_time_dst, correct_next_date', [
+ ({'hour': 8}, datetime(2013, 3, 9, 12), False, datetime(2013, 3, 10, 8)),
+ ({'hour': 8}, datetime(2013, 11, 2, 12), True, datetime(2013, 11, 3, 8)),
+ ({'minute': '*/30'}, datetime(2013, 3, 10, 1, 35), False, datetime(2013, 3, 10, 3)),
+ ({'minute': '*/30'}, datetime(2013, 11, 3, 1, 35), True, datetime(2013, 11, 3, 1))
+], ids=['absolute_spring', 'absolute_autumn', 'interval_spring', 'interval_autumn'])
+def test_dst_change(trigger_args, start_time, start_time_dst, correct_next_date, serializer):
+ """
+ Making sure that CronTrigger works correctly when crossing the DST switch threshold.
+ Note that you should explicitly compare datetimes as strings to avoid the internal datetime
+ comparison which would test for equality in the UTC timezone.
+
+ """
+ timezone = pytz.timezone('US/Eastern')
+ start_time = timezone.localize(start_time, is_dst=start_time_dst)
+ trigger = CronTrigger(timezone=timezone, start_time=start_time, **trigger_args)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == timezone.localize(correct_next_date, is_dst=not start_time_dst)
+
+
+def test_zero_value(timezone):
+ start_time = timezone.localize(datetime(2020, 1, 1))
+ trigger = CronTrigger(year=2009, month=2, hour=0, start_time=start_time, timezone=timezone)
+ assert repr(trigger) == ("CronTrigger(year='2009', month='2', day='*', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+
+
+def test_year_list(timezone, serializer):
+ start_time = timezone.localize(datetime(2009, 1, 1))
+ trigger = CronTrigger(year='2009,2008', start_time=start_time, timezone=timezone)
+ assert repr(trigger) == "CronTrigger(year='2009,2008', month='1', day='1', week='*', " \
+ "day_of_week='*', hour='0', minute='0', second='0', " \
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ assert trigger.next() == timezone.localize(datetime(2009, 1, 1))
+ assert trigger.next() is None
+
+
+@pytest.mark.parametrize('expr, expected_repr', [
+ ('* * * * *',
+ "CronTrigger(year='*', month='*', day='*', week='*', day_of_week='*', hour='*', minute='*', "
+ "second='0', start_time='2020-05-19T19:53:22+02:00', timezone='Europe/Berlin')"),
+ ('0-14 * 14-28 jul fri',
+ "CronTrigger(year='*', month='jul', day='14-28', week='*', day_of_week='fri', hour='*', "
+ "minute='0-14', second='0', start_time='2020-05-19T19:53:22+02:00', "
+ "timezone='Europe/Berlin')"),
+ (' 0-14 * 14-28 jul fri',
+ "CronTrigger(year='*', month='jul', day='14-28', week='*', day_of_week='fri', hour='*', "
+ "minute='0-14', second='0', start_time='2020-05-19T19:53:22+02:00', "
+ "timezone='Europe/Berlin')")
+], ids=['always', 'assorted', 'multiple_spaces_in_format'])
+def test_from_crontab(expr, expected_repr, timezone, serializer):
+ trigger = CronTrigger.from_crontab(expr, timezone)
+ trigger.start_time = timezone.localize(datetime(2020, 5, 19, 19, 53, 22))
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert repr(trigger) == expected_repr
+
+
+def test_from_crontab_wrong_number_of_fields():
+ exc = pytest.raises(ValueError, CronTrigger.from_crontab, '*')
+ exc.match('Wrong number of fields; got 1, expected 5')
diff --git a/tests/triggers/test_date.py b/tests/triggers/test_date.py
new file mode 100644
index 0000000..5f4c607
--- /dev/null
+++ b/tests/triggers/test_date.py
@@ -0,0 +1,15 @@
+from datetime import datetime
+
+from apscheduler.triggers.date import DateTrigger
+
+
+def test_run_time(timezone, serializer):
+ run_time = timezone.localize(datetime(2020, 5, 14, 11, 56, 12))
+ trigger = DateTrigger(run_time)
+ if serializer:
+ payload = serializer.serialize(trigger)
+ trigger = serializer.deserialize(payload)
+
+ assert trigger.next() == run_time
+ assert trigger.next() is None
+ assert repr(trigger) == "DateTrigger('2020-05-14T11:56:12+02:00')"
diff --git a/tests/triggers/test_interval.py b/tests/triggers/test_interval.py
new file mode 100644
index 0000000..07ca35d
--- /dev/null
+++ b/tests/triggers/test_interval.py
@@ -0,0 +1,45 @@
+from datetime import datetime, timedelta
+
+import pytest
+
+from apscheduler.triggers.interval import IntervalTrigger
+
+
+def test_bad_interval():
+ exc = pytest.raises(ValueError, IntervalTrigger)
+ exc.match('The time interval must be positive')
+
+
+def test_bad_end_time(timezone):
+ start_time = timezone.localize(datetime(2020, 5, 16))
+ end_time = timezone.localize(datetime(2020, 5, 15))
+ exc = pytest.raises(ValueError, IntervalTrigger, seconds=1, start_time=start_time,
+ end_time=end_time)
+ exc.match('end_time cannot be earlier than start_time')
+
+
+def test_end_time(timezone, serializer):
+ start_time = timezone.localize(datetime(2020, 5, 16, 19, 32, 44, 649521))
+ end_time = timezone.localize(datetime(2020, 5, 16, 22, 33, 1))
+ interval = timedelta(hours=1, seconds=6)
+ trigger = IntervalTrigger(start_time=start_time, end_time=end_time, hours=1, seconds=6)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert trigger.next() == start_time
+ assert trigger.next() == start_time + interval
+ assert trigger.next() == start_time + interval * 2
+ assert trigger.next() is None
+
+
+def test_repr(timezone, serializer):
+ start_time = timezone.localize(datetime(2020, 5, 15, 12, 55, 32, 954032))
+ end_time = timezone.localize(datetime(2020, 6, 4, 16, 18, 49, 306942))
+ trigger = IntervalTrigger(weeks=1, days=2, hours=3, minutes=4, seconds=5, microseconds=123525,
+ start_time=start_time, end_time=end_time, timezone=timezone)
+ if serializer:
+ trigger = serializer.deserialize(serializer.serialize(trigger))
+
+ assert repr(trigger) == ("IntervalTrigger(weeks=1, days=2, hours=3, minutes=4, seconds=5, "
+ "microseconds=123525, start_time='2020-05-15T12:55:32.954032+02:00', "
+ "end_time='2020-06-04T16:18:49.306942+02:00')")