summaryrefslogtreecommitdiff
path: root/test/lib/ansible_test/_internal/timeout.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/timeout.py')
-rw-r--r--test/lib/ansible_test/_internal/timeout.py73
1 files changed, 55 insertions, 18 deletions
diff --git a/test/lib/ansible_test/_internal/timeout.py b/test/lib/ansible_test/_internal/timeout.py
index 90ba583545..96c0d73d21 100644
--- a/test/lib/ansible_test/_internal/timeout.py
+++ b/test/lib/ansible_test/_internal/timeout.py
@@ -1,6 +1,7 @@
"""Timeout management for tests."""
from __future__ import annotations
+import dataclasses
import datetime
import functools
import os
@@ -19,7 +20,7 @@ from .config import (
from .util import (
display,
- ApplicationError,
+ TimeoutExpiredError,
)
from .thread import (
@@ -35,15 +36,56 @@ from .test import (
)
-def get_timeout() -> t.Optional[dict[str, t.Any]]:
- """Return details about the currently set timeout, if any, otherwise return None."""
- if not os.path.exists(TIMEOUT_PATH):
- return None
+@dataclasses.dataclass(frozen=True)
+class TimeoutDetail:
+ """Details required to enforce a timeout on test execution."""
+
+ _DEADLINE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' # format used to maintain backwards compatibility with previous versions of ansible-test
+
+ deadline: datetime.datetime
+ duration: int | float # minutes
+
+ @property
+ def remaining(self) -> datetime.timedelta:
+ """The amount of time remaining before the timeout occurs. If the timeout has passed, this will be a negative duration."""
+ return self.deadline - datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
+
+ def to_dict(self) -> dict[str, t.Any]:
+ """Return timeout details as a dictionary suitable for JSON serialization."""
+ return dict(
+ deadline=self.deadline.strftime(self._DEADLINE_FORMAT),
+ duration=self.duration,
+ )
- data = read_json_file(TIMEOUT_PATH)
- data['deadline'] = datetime.datetime.strptime(data['deadline'], '%Y-%m-%dT%H:%M:%SZ')
+ @staticmethod
+ def from_dict(value: dict[str, t.Any]) -> TimeoutDetail:
+ """Return a TimeoutDetail instance using the value previously returned by to_dict."""
+ return TimeoutDetail(
+ deadline=datetime.datetime.strptime(value['deadline'], TimeoutDetail._DEADLINE_FORMAT).replace(tzinfo=datetime.timezone.utc),
+ duration=value['duration'],
+ )
- return data
+ @staticmethod
+ def create(duration: int | float) -> TimeoutDetail | None:
+ """Return a new TimeoutDetail instance for the specified duration (in minutes), or None if the duration is zero."""
+ if not duration:
+ return None
+
+ if duration == int(duration):
+ duration = int(duration)
+
+ return TimeoutDetail(
+ deadline=datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0) + datetime.timedelta(seconds=int(duration * 60)),
+ duration=duration,
+ )
+
+
+def get_timeout() -> TimeoutDetail | None:
+ """Return details about the currently set timeout, if any, otherwise return None."""
+ try:
+ return TimeoutDetail.from_dict(read_json_file(TIMEOUT_PATH))
+ except FileNotFoundError:
+ return None
def configure_timeout(args: CommonConfig) -> None:
@@ -59,27 +101,22 @@ def configure_test_timeout(args: TestConfig) -> None:
if not timeout:
return
- timeout_start = datetime.datetime.utcnow()
- timeout_duration = timeout['duration']
- timeout_deadline = timeout['deadline']
- timeout_remaining = timeout_deadline - timeout_start
+ timeout_remaining = timeout.remaining
- test_timeout = TestTimeout(timeout_duration)
+ test_timeout = TestTimeout(timeout.duration)
if timeout_remaining <= datetime.timedelta():
test_timeout.write(args)
- raise ApplicationError('The %d minute test timeout expired %s ago at %s.' % (
- timeout_duration, timeout_remaining * -1, timeout_deadline))
+ raise TimeoutExpiredError(f'The {timeout.duration} minute test timeout expired {timeout_remaining * -1} ago at {timeout.deadline}.')
- display.info('The %d minute test timeout expires in %s at %s.' % (
- timeout_duration, timeout_remaining, timeout_deadline), verbosity=1)
+ display.info(f'The {timeout.duration} minute test timeout expires in {timeout_remaining} at {timeout.deadline}.', verbosity=1)
def timeout_handler(_dummy1: t.Any, _dummy2: t.Any) -> None:
"""Runs when SIGUSR1 is received."""
test_timeout.write(args)
- raise ApplicationError('Tests aborted after exceeding the %d minute time limit.' % timeout_duration)
+ raise TimeoutExpiredError(f'Tests aborted after exceeding the {timeout.duration} minute time limit.')
def timeout_waiter(timeout_seconds: int) -> None:
"""Background thread which will kill the current process if the timeout elapses."""