""" flexparser.flexparser ~~~~~~~~~~~~~~~~~~~~~ Classes and functions to create parsers. The idea is quite simple. You write a class for every type of content (called here ``ParsedStatement``) you need to parse. Each class should have a ``from_string`` constructor. We used extensively the ``typing`` module to make the output structure easy to use and less error prone. For more information, take a look at https://github.com/hgrecco/flexparser :copyright: 2022 by flexparser Authors, see AUTHORS for more details. :license: BSD, see LICENSE for more details. """ from __future__ import annotations import collections import dataclasses import enum import functools import hashlib import hmac import inspect import logging import pathlib import re import sys import typing as ty from collections.abc import Iterator from dataclasses import dataclass from functools import cached_property from importlib import resources from typing import Optional, Tuple, Type _LOGGER = logging.getLogger("flexparser") _SENTINEL = object() ################ # Exceptions ################ @dataclass(frozen=True) class Statement: """Base class for parsed elements within a source file.""" start_line: int = dataclasses.field(init=False, default=None) start_col: int = dataclasses.field(init=False, default=None) end_line: int = dataclasses.field(init=False, default=None) end_col: int = dataclasses.field(init=False, default=None) raw: str = dataclasses.field(init=False, default=None) @classmethod def from_statement(cls, statement: Statement): out = cls() out.set_position(*statement.get_position()) out.set_raw(statement.raw) return out @classmethod def from_statement_iterator_element(cls, values: ty.Tuple[int, int, int, int, str]): out = cls() out.set_position(*values[:-1]) out.set_raw(values[-1]) return out @property def format_position(self): if self.start_line is None: return "N/A" return "%d,%d-%d,%d" % self.get_position() @property def raw_strip(self): return self.raw.strip() def get_position(self): return self.start_line, self.start_col, self.end_line, self.end_col def set_position(self, start_line, start_col, end_line, end_col): object.__setattr__(self, "start_line", start_line) object.__setattr__(self, "start_col", start_col) object.__setattr__(self, "end_line", end_line) object.__setattr__(self, "end_col", end_col) return self def set_raw(self, raw): object.__setattr__(self, "raw", raw) return self def set_simple_position(self, line, col, width): return self.set_position(line, col, line, col + width) @dataclass(frozen=True) class ParsingError(Statement, Exception): """Base class for all parsing exceptions in this package.""" def __str__(self): return Statement.__str__(self) @dataclass(frozen=True) class UnknownStatement(ParsingError): """A string statement could not bee parsed.""" def __str__(self): return f"Could not parse '{self.raw}' ({self.format_position})" @dataclass(frozen=True) class UnhandledParsingError(ParsingError): """Base class for all parsing exceptions in this package.""" ex: Exception def __str__(self): return f"Unhandled exception while parsing '{self.raw}' ({self.format_position}): {self.ex}" @dataclass(frozen=True) class UnexpectedEOF(ParsingError): """End of file was found within an open block.""" ############################# # Useful methods and classes ############################# @dataclass(frozen=True) class Hash: algorithm_name: str hexdigest: str def __eq__(self, other: Hash): return ( isinstance(other, Hash) and self.algorithm_name != "" and self.algorithm_name == other.algorithm_name and hmac.compare_digest(self.hexdigest, other.hexdigest) ) @classmethod def from_bytes(cls, algorithm, b: bytes): hasher = algorithm(b) return cls(hasher.name, hasher.hexdigest()) @classmethod def from_file_pointer(cls, algorithm, fp: ty.BinaryIO): return cls.from_bytes(algorithm, fp.read()) @classmethod def nullhash(cls): return cls("", "") def _yield_types( obj, valid_subclasses=(object,), recurse_origin=(tuple, list, ty.Union) ): """Recursively transverse type annotation if the origin is any of the types in `recurse_origin` and yield those type which are subclasses of `valid_subclasses`. """ if ty.get_origin(obj) in recurse_origin: for el in ty.get_args(obj): yield from _yield_types(el, valid_subclasses, recurse_origin) else: if inspect.isclass(obj) and issubclass(obj, valid_subclasses): yield obj class classproperty: # noqa N801 """Decorator for a class property In Python 3.9+ can be replaced by @classmethod @property def myprop(self): return 42 """ def __init__(self, fget): self.fget = fget def __get__(self, owner_self, owner_cls): return self.fget(owner_cls) def is_relative_to(self, *other): """Return True if the path is relative to another path or False. In Python 3.9+ can be replaced by path.is_relative_to(other) """ try: self.relative_to(*other) return True except ValueError: return False class DelimiterInclude(enum.IntEnum): """Specifies how to deal with delimiters while parsing.""" #: Split at delimiter, not including in any string SPLIT = enum.auto() #: Split after, keeping the delimiter with previous string. SPLIT_AFTER = enum.auto() #: Split before, keeping the delimiter with next string. SPLIT_BEFORE = enum.auto() #: Do not split at delimiter. DO_NOT_SPLIT = enum.auto() class DelimiterAction(enum.IntEnum): """Specifies how to deal with delimiters while parsing.""" #: Continue parsing normally. CONTINUE = enum.auto() #: Capture everything til end of line as a whole. CAPTURE_NEXT_TIL_EOL = enum.auto() #: Stop parsing line and move to next. STOP_PARSING_LINE = enum.auto() #: Stop parsing content. STOP_PARSING = enum.auto() DO_NOT_SPLIT_EOL = { "\r\n": (DelimiterInclude.DO_NOT_SPLIT, DelimiterAction.CONTINUE), "\n": (DelimiterInclude.DO_NOT_SPLIT, DelimiterAction.CONTINUE), "\r": (DelimiterInclude.DO_NOT_SPLIT, DelimiterAction.CONTINUE), } SPLIT_EOL = { "\r\n": (DelimiterInclude.SPLIT, DelimiterAction.CONTINUE), "\n": (DelimiterInclude.SPLIT, DelimiterAction.CONTINUE), "\r": (DelimiterInclude.SPLIT, DelimiterAction.CONTINUE), } _EOLs_set = set(DO_NOT_SPLIT_EOL.keys()) @functools.lru_cache def _build_delimiter_pattern(delimiters: ty.Tuple[str, ...]) -> re.Pattern: """Compile a tuple of delimiters into a regex expression with a capture group around the delimiter. """ return re.compile("|".join(f"({re.escape(el)})" for el in delimiters)) ############ # Iterators ############ DelimiterDictT = ty.Dict[str, ty.Tuple[DelimiterInclude, DelimiterAction]] class Spliter: """Content iterator splitting according to given delimiters. The pattern can be changed dynamically sending a new pattern to the generator, see DelimiterInclude and DelimiterAction for more information. The current scanning position can be changed at any time. Parameters ---------- content : str delimiters : ty.Dict[str, ty.Tuple[DelimiterInclude, DelimiterAction]] Yields ------ start_line : int line number of the start of the content (zero-based numbering). start_col : int column number of the start of the content (zero-based numbering). end_line : int line number of the end of the content (zero-based numbering). end_col : int column number of the end of the content (zero-based numbering). part : str part of the text between delimiters. """ _pattern: ty.Optional[re.Pattern] _delimiters: DelimiterDictT __stop_searching_in_line = False __pending = "" __first_line_col = None __lines = () __lineno = 0 __colno = 0 def __init__(self, content: str, delimiters: DelimiterDictT): self.set_delimiters(delimiters) self.__lines = content.splitlines(keepends=True) def set_position(self, lineno: int, colno: int): self.__lineno, self.__colno = lineno, colno def set_delimiters(self, delimiters: DelimiterDictT): for k, v in delimiters.items(): if v == (DelimiterInclude.DO_NOT_SPLIT, DelimiterAction.STOP_PARSING): raise ValueError( f"The delimiter action for {k} is not a valid combination ({v})" ) # Build a pattern but removing eols _pat_dlm = tuple(set(delimiters.keys()) - _EOLs_set) if _pat_dlm: self._pattern = _build_delimiter_pattern(_pat_dlm) else: self._pattern = None # We add the end of line as delimiters if not present. self._delimiters = {**DO_NOT_SPLIT_EOL, **delimiters} def __iter__(self): return self def __next__(self): if self.__lineno >= len(self.__lines): raise StopIteration while True: if self.__stop_searching_in_line: # There must be part of a line pending to parse # due to stop line = self.__lines[self.__lineno] mo = None self.__stop_searching_in_line = False else: # We get the current line and the find the first delimiter. line = self.__lines[self.__lineno] if self._pattern is None: mo = None else: mo = self._pattern.search(line, self.__colno) if mo is None: # No delimiter was found, # which should happen at end of the content or end of line for k in DO_NOT_SPLIT_EOL.keys(): if line.endswith(k): dlm = line[-len(k) :] end_col, next_col = len(line) - len(k), 0 break else: # No EOL found, this is end of content dlm = None end_col, next_col = len(line), 0 next_line = self.__lineno + 1 else: next_line = self.__lineno end_col, next_col = mo.span() dlm = mo.group() part = line[self.__colno : end_col] include, action = self._delimiters.get( dlm, (DelimiterInclude.SPLIT, DelimiterAction.STOP_PARSING) ) if include == DelimiterInclude.SPLIT: next_pending = "" elif include == DelimiterInclude.SPLIT_AFTER: end_col += len(dlm) part = part + dlm next_pending = "" elif include == DelimiterInclude.SPLIT_BEFORE: next_pending = dlm elif include == DelimiterInclude.DO_NOT_SPLIT: self.__pending += line[self.__colno : end_col] + dlm next_pending = "" else: raise ValueError(f"Unknown action {include}.") if action == DelimiterAction.STOP_PARSING: # this will raise a StopIteration in the next call. next_line = len(self.__lines) elif action == DelimiterAction.STOP_PARSING_LINE: next_line = self.__lineno + 1 next_col = 0 start_line = self.__lineno start_col = self.__colno end_line = self.__lineno self.__lineno = next_line self.__colno = next_col if action == DelimiterAction.CAPTURE_NEXT_TIL_EOL: self.__stop_searching_in_line = True if include == DelimiterInclude.DO_NOT_SPLIT: self.__first_line_col = start_line, start_col else: if self.__first_line_col is None: out = ( start_line, start_col - len(self.__pending), end_line, end_col, self.__pending + part, ) else: out = ( *self.__first_line_col, end_line, end_col, self.__pending + part, ) self.__first_line_col = None self.__pending = next_pending return out class StatementIterator: """Content peekable iterator splitting according to given delimiters. The pattern can be changed dynamically sending a new pattern to the generator, see DelimiterInclude and DelimiterAction for more information. Parameters ---------- content : str delimiters : dict[str, ty.Tuple[DelimiterInclude, DelimiterAction]] Yields ------ Statement """ _cache: ty.Deque[Statement] def __init__( self, content: str, delimiters: DelimiterDictT, strip_spaces: bool = True ): self._cache = collections.deque() self._spliter = Spliter(content, delimiters) self._strip_spaces = strip_spaces def __iter__(self): return self def set_delimiters(self, delimiters: DelimiterDictT): self._spliter.set_delimiters(delimiters) if self._cache: value = self.peek() # Elements are 1 based indexing, while splitter is 0 based. self._spliter.set_position(value.start_line - 1, value.start_col) self._cache.clear() def _get_next_strip(self) -> Statement: part = "" while not part: start_line, start_col, end_line, end_col, part = next(self._spliter) lo = len(part) part = part.lstrip() start_col += lo - len(part) lo = len(part) part = part.rstrip() end_col -= lo - len(part) return Statement.from_statement_iterator_element( (start_line + 1, start_col, end_line + 1, end_col, part) ) def _get_next(self) -> Statement: if self._strip_spaces: return self._get_next_strip() part = "" while not part: start_line, start_col, end_line, end_col, part = next(self._spliter) return Statement.from_statement_iterator_element( (start_line + 1, start_col, end_line + 1, end_col, part) ) def peek(self, default=_SENTINEL) -> Statement: """Return the item that will be next returned from ``next()``. Return ``default`` if there are no items left. If ``default`` is not provided, raise ``StopIteration``. """ if not self._cache: try: self._cache.append(self._get_next()) except StopIteration: if default is _SENTINEL: raise return default return self._cache[0] def __next__(self) -> Statement: if self._cache: return self._cache.popleft() else: return self._get_next() ########### # Parsing ########### # Configuration type CT = ty.TypeVar("CT") PST = ty.TypeVar("PST", bound="ParsedStatement") LineColStr = Tuple[int, int, str] FromString = ty.Union[None, PST, ParsingError] Consume = ty.Union[PST, ParsingError] NullableConsume = ty.Union[None, PST, ParsingError] Single = ty.Union[PST, ParsingError] Multi = ty.Tuple[ty.Union[PST, ParsingError], ...] @dataclass(frozen=True) class ParsedStatement(ty.Generic[CT], Statement): """A single parsed statement. In order to write your own, you need to subclass it as a frozen dataclass and implement the parsing logic by overriding `from_string` classmethod. Takes two arguments: the string to parse and an object given by the parser which can be used to store configuration information. It should return an instance of this class if parsing was successful or None otherwise """ @classmethod def from_string(cls: Type[PST], s: str) -> FromString[PST]: """Parse a string into a ParsedStatement. Return files and their meaning: 1. None: the string cannot be parsed with this class. 2. A subclass of ParsedStatement: the string was parsed successfully 3. A subclass of ParsingError the string could be parsed with this class but there is an error. """ raise NotImplementedError( "ParsedStatement subclasses must implement " "'from_string' or 'from_string_and_config'" ) @classmethod def from_string_and_config(cls: Type[PST], s: str, config: CT) -> FromString[PST]: """Parse a string into a ParsedStatement. Return files and their meaning: 1. None: the string cannot be parsed with this class. 2. A subclass of ParsedStatement: the string was parsed successfully 3. A subclass of ParsingError the string could be parsed with this class but there is an error. """ return cls.from_string(s) @classmethod def from_statement_and_config( cls: Type[PST], statement: Statement, config: CT ) -> FromString[PST]: try: out = cls.from_string_and_config(statement.raw, config) except Exception as ex: out = UnhandledParsingError(ex) if out is None: return None out.set_position(*statement.get_position()) out.set_raw(statement.raw) return out @classmethod def consume( cls: Type[PST], statement_iterator: StatementIterator, config: CT ) -> NullableConsume[PST]: """Peek into the iterator and try to parse. Return files and their meaning: 1. None: the string cannot be parsed with this class, the iterator is kept an the current place. 2. a subclass of ParsedStatement: the string was parsed successfully, advance the iterator. 3. a subclass of ParsingError: the string could be parsed with this class but there is an error, advance the iterator. """ statement = statement_iterator.peek() parsed_statement = cls.from_statement_and_config(statement, config) if parsed_statement is None: return None next(statement_iterator) return parsed_statement OPST = ty.TypeVar("OPST", bound="ParsedStatement") IPST = ty.TypeVar("IPST", bound="ParsedStatement") CPST = ty.TypeVar("CPST", bound="ParsedStatement") BT = ty.TypeVar("BT", bound="Block") RBT = ty.TypeVar("RBT", bound="RootBlock") @dataclass(frozen=True) class Block(ty.Generic[OPST, IPST, CPST, CT]): """A sequence of statements with an opening, body and closing.""" opening: Consume[OPST] body: Tuple[Consume[IPST], ...] closing: Consume[CPST] delimiters = {} @property def start_line(self): return self.opening.start_line @property def start_col(self): return self.opening.start_col @property def end_line(self): return self.closing.end_line @property def end_col(self): return self.closing.end_col def get_position(self): return self.start_line, self.start_col, self.end_line, self.end_col @property def format_position(self): if self.start_line is None: return "N/A" return "%d,%d-%d,%d" % self.get_position() @classmethod def subclass_with(cls, *, opening=None, body=None, closing=None): @dataclass(frozen=True) class CustomBlock(Block): pass if opening: CustomBlock.__annotations__["opening"] = Single[ty.Union[opening]] if body: CustomBlock.__annotations__["body"] = Multi[ty.Union[body]] if closing: CustomBlock.__annotations__["closing"] = Single[ty.Union[closing]] return CustomBlock def __iter__(self) -> Iterator[Statement]: yield self.opening for el in self.body: if isinstance(el, Block): yield from el else: yield el yield self.closing def iter_blocks(self) -> Iterator[ty.Union[Block, Statement]]: yield self.opening yield from self.body yield self.closing ################################################### # Convenience methods to iterate parsed statements ################################################### _ElementT = ty.TypeVar("_ElementT", bound=Statement) def filter_by(self, *klass: Type[_ElementT]) -> Iterator[_ElementT]: """Yield elements of a given class or classes.""" yield from (el for el in self if isinstance(el, klass)) # noqa Bug in pycharm. @cached_property def errors(self) -> ty.Tuple[ParsingError, ...]: """Tuple of errors found.""" return tuple(self.filter_by(ParsingError)) @property def has_errors(self) -> bool: """True if errors were found during parsing.""" return bool(self.errors) #################### # Statement classes #################### @classproperty def opening_classes(cls) -> Iterator[Type[OPST]]: """Classes representing any of the parsed statement that can open this block.""" opening = ty.get_type_hints(cls)["opening"] yield from _yield_types(opening, ParsedStatement) @classproperty def body_classes(cls) -> Iterator[Type[IPST]]: """Classes representing any of the parsed statement that can be in the body.""" body = ty.get_type_hints(cls)["body"] yield from _yield_types(body, (ParsedStatement, Block)) @classproperty def closing_classes(cls) -> Iterator[Type[CPST]]: """Classes representing any of the parsed statement that can close this block.""" closing = ty.get_type_hints(cls)["closing"] yield from _yield_types(closing, ParsedStatement) ########## # Consume ########## @classmethod def consume_opening( cls: Type[BT], statement_iterator: StatementIterator, config: CT ) -> NullableConsume[OPST]: """Peek into the iterator and try to parse with any of the opening classes. See `ParsedStatement.consume` for more details. """ for c in cls.opening_classes: el = c.consume(statement_iterator, config) if el is not None: return el return None @classmethod def consume_body( cls, statement_iterator: StatementIterator, config: CT ) -> Consume[IPST]: """Peek into the iterator and try to parse with any of the body classes. If the statement cannot be parsed, a UnknownStatement is returned. """ for c in cls.body_classes: el = c.consume(statement_iterator, config) if el is not None: return el el = next(statement_iterator) return UnknownStatement.from_statement(el) @classmethod def consume_closing( cls: Type[BT], statement_iterator: StatementIterator, config: CT ) -> NullableConsume[CPST]: """Peek into the iterator and try to parse with any of the opening classes. See `ParsedStatement.consume` for more details. """ for c in cls.closing_classes: el = c.consume(statement_iterator, config) if el is not None: return el return None @classmethod def consume_body_closing( cls: Type[BT], opening: OPST, statement_iterator: StatementIterator, config: CT ) -> BT: body = [] closing = None last_line = opening.end_line while closing is None: try: closing = cls.consume_closing(statement_iterator, config) if closing is not None: continue el = cls.consume_body(statement_iterator, config) body.append(el) last_line = el.end_line except StopIteration: closing = cls.on_stop_iteration(config) closing.set_position(last_line + 1, 0, last_line + 1, 0) return cls(opening, tuple(body), closing) @classmethod def consume( cls: Type[BT], statement_iterator: StatementIterator, config: CT ) -> Optional[BT]: """Try consume the block. Possible outcomes: 1. The opening was not matched, return None. 2. A subclass of Block, where body and closing migh contain errors. """ opening = cls.consume_opening(statement_iterator, config) if opening is None: return None return cls.consume_body_closing(opening, statement_iterator, config) @classmethod def on_stop_iteration(cls, config): return UnexpectedEOF() @dataclass(frozen=True) class BOS(ParsedStatement[CT]): """Beginning of source.""" # Hasher algorithm name and hexdigest content_hash: Hash @classmethod def from_string_and_config(cls: Type[PST], s: str, config: CT) -> FromString[PST]: raise RuntimeError("BOS cannot be constructed from_string_and_config") @property def location(self) -> SourceLocationT: return "" @dataclass(frozen=True) class BOF(BOS): """Beginning of file.""" path: pathlib.Path # Modification time of the file. mtime: float @property def location(self) -> SourceLocationT: return self.path @dataclass(frozen=True) class BOR(BOS): """Beginning of resource.""" package: str resource_name: str @property def location(self) -> SourceLocationT: return self.package, self.resource_name @dataclass(frozen=True) class EOS(ParsedStatement[CT]): """End of sequence.""" @classmethod def from_string_and_config(cls: Type[PST], s: str, config: CT) -> FromString[PST]: return cls() class RootBlock(ty.Generic[IPST, CT], Block[BOS, IPST, EOS, CT]): """A sequence of statement flanked by the beginning and ending of stream.""" opening: Single[BOS] closing: Single[EOS] @classmethod def subclass_with(cls, *, body=None): @dataclass(frozen=True) class CustomRootBlock(RootBlock): pass if body: CustomRootBlock.__annotations__["body"] = Multi[ty.Union[body]] return CustomRootBlock @classmethod def consume_opening( cls: Type[RBT], statement_iterator: StatementIterator, config: CT ) -> NullableConsume[BOS]: raise RuntimeError( "Implementation error, 'RootBlock.consume_opening' should never be called" ) @classmethod def consume( cls: Type[RBT], statement_iterator: StatementIterator, config: CT ) -> RBT: block = super().consume(statement_iterator, config) if block is None: raise RuntimeError( "Implementation error, 'RootBlock.consume' should never return None" ) return block @classmethod def consume_closing( cls: Type[RBT], statement_iterator: StatementIterator, config: CT ) -> NullableConsume[EOS]: return None @classmethod def on_stop_iteration(cls, config): return EOS() ################# # Source parsing ################# ResourceT = ty.Tuple[str, str] # package name, resource name StrictLocationT = ty.Union[pathlib.Path, ResourceT] SourceLocationT = ty.Union[str, StrictLocationT] @dataclass(frozen=True) class ParsedSource(ty.Generic[RBT, CT]): parsed_source: RBT # Parser configuration. config: CT @property def location(self) -> StrictLocationT: return self.parsed_source.opening.location @cached_property def has_errors(self) -> bool: return self.parsed_source.has_errors def errors(self): yield from self.parsed_source.errors @dataclass(frozen=True) class CannotParseResourceAsFile(Exception): """The requested python package resource cannot be located as a file in the file system. """ package: str resource_name: str class Parser(ty.Generic[RBT, CT]): """Parser class.""" #: class to iterate through statements in a source unit. _statement_iterator_class: Type[StatementIterator] = StatementIterator #: Delimiters. _delimiters: DelimiterDictT = SPLIT_EOL _strip_spaces: bool = True #: root block class containing statements and blocks can be parsed. _root_block_class: Type[RBT] #: source file text encoding. _encoding = "utf-8" #: configuration passed to from_string functions. _config: CT #: try to open resources as files. _prefer_resource_as_file: bool #: parser algorithm to us. Must be a callable member of hashlib _hasher = hashlib.blake2b def __init__(self, config: CT, prefer_resource_as_file=True): self._config = config self._prefer_resource_as_file = prefer_resource_as_file def parse(self, source_location: SourceLocationT) -> ParsedSource[RBT, CT]: """Parse a file into a ParsedSourceFile or ParsedResource. Parameters ---------- source_location: if str or pathlib.Path is interpreted as a file. if (str, str) is interpreted as (package, resource) using the resource python api. """ if isinstance(source_location, tuple) and len(source_location) == 2: if self._prefer_resource_as_file: try: return self.parse_resource_from_file(*source_location) except CannotParseResourceAsFile: pass return self.parse_resource(*source_location) if isinstance(source_location, str): return self.parse_file(pathlib.Path(source_location)) if isinstance(source_location, pathlib.Path): return self.parse_file(source_location) raise TypeError( f"Unknown type {type(source_location)}, " "use str or pathlib.Path for files or " "(package: str, resource_name: str) tuple " "for a resource." ) def parse_bytes(self, b: bytes, bos: BOS = None) -> ParsedSource[RBT, CT]: if bos is None: bos = BOS(Hash.from_bytes(self._hasher, b)).set_simple_position(0, 0, 0) sic = self._statement_iterator_class( b.decode(self._encoding), self._delimiters, self._strip_spaces ) parsed = self._root_block_class.consume_body_closing(bos, sic, self._config) return ParsedSource( parsed, self._config, ) def parse_file(self, path: pathlib.Path) -> ParsedSource[RBT, CT]: """Parse a file into a ParsedSourceFile. Parameters ---------- path path of the file. """ with path.open(mode="rb") as fi: content = fi.read() bos = BOF( Hash.from_bytes(self._hasher, content), path, path.stat().st_mtime ).set_simple_position(0, 0, 0) return self.parse_bytes(content, bos) def parse_resource_from_file( self, package: str, resource_name: str ) -> ParsedSource[RBT, CT]: """Parse a resource into a ParsedSourceFile, opening as a file. Parameters ---------- package package name where the resource is located. resource_name name of the resource """ if sys.version_info < (3, 9): # Remove when Python 3.8 is dropped with resources.path(package, resource_name) as p: path = p.resolve() else: with resources.as_file( resources.files(package).joinpath(resource_name) ) as p: path = p.resolve() if path.exists(): return self.parse_file(path) raise CannotParseResourceAsFile(package, resource_name) def parse_resource(self, package: str, resource_name: str) -> ParsedSource[RBT, CT]: """Parse a resource into a ParsedResource. Parameters ---------- package package name where the resource is located. resource_name name of the resource """ if sys.version_info < (3, 9): # Remove when Python 3.8 is dropped with resources.open_binary(package, resource_name) as fi: content = fi.read() else: with resources.files(package).joinpath(resource_name).open("rb") as fi: content = fi.read() bos = BOR( Hash.from_bytes(self._hasher, content), package, resource_name ).set_simple_position(0, 0, 0) return self.parse_bytes(content, bos) ########## # Project ########## class IncludeStatement(ParsedStatement): """ "Include statements allow to merge files.""" @property def target(self) -> str: raise NotImplementedError( "IncludeStatement subclasses must implement target property." ) class ParsedProject( ty.Dict[ ty.Optional[ty.Tuple[StrictLocationT, str]], ParsedSource, ] ): """Collection of files, independent or connected via IncludeStatement. Keys are either an absolute pathname or a tuple package name, resource name. None is the name of the root. """ @cached_property def has_errors(self) -> bool: return any(el.has_errors for el in self.values()) def errors(self): for el in self.values(): yield from el.errors() def _iter_statements(self, items, seen, include_only_once): """Iter all definitions in the order they appear, going into the included files. """ for source_location, parsed in items: seen.add(source_location) for parsed_statement in parsed.parsed_source: if isinstance(parsed_statement, IncludeStatement): location = parsed.location, parsed_statement.target if location in seen and include_only_once: raise ValueError(f"{location} was already included.") yield from self._iter_statements( ((location, self[location]),), seen, include_only_once ) else: yield parsed_statement def iter_statements(self, include_only_once=True): """Iter all definitions in the order they appear, going into the included files. Parameters ---------- include_only_once if true, each file cannot be included more than once. """ yield from self._iter_statements([(None, self[None])], set(), include_only_once) def _iter_blocks(self, items, seen, include_only_once): """Iter all definitions in the order they appear, going into the included files. """ for source_location, parsed in items: seen.add(source_location) for parsed_statement in parsed.parsed_source.iter_blocks(): if isinstance(parsed_statement, IncludeStatement): location = parsed.location, parsed_statement.target if location in seen and include_only_once: raise ValueError(f"{location} was already included.") yield from self._iter_blocks( ((location, self[location]),), seen, include_only_once ) else: yield parsed_statement def iter_blocks(self, include_only_once=True): """Iter all definitions in the order they appear, going into the included files. Parameters ---------- include_only_once if true, each file cannot be included more than once. """ yield from self._iter_blocks([(None, self[None])], set(), include_only_once) def default_locator(source_location: StrictLocationT, target: str) -> StrictLocationT: """Return a new location from current_location and target.""" if isinstance(source_location, pathlib.Path): current_location = pathlib.Path(source_location).resolve() if current_location.is_file(): current_path = current_location.parent else: current_path = current_location target_path = pathlib.Path(target) if target_path.is_absolute(): raise ValueError( f"Cannot refer to absolute paths in import statements ({source_location}, {target})." ) tmp = (current_path / target_path).resolve() if not is_relative_to(tmp, current_path): raise ValueError( f"Cannot refer to locations above the current location ({source_location}, {target})" ) return tmp.absolute() elif isinstance(source_location, tuple) and len(source_location) == 2: return source_location[0], target raise TypeError( f"Cannot handle type {type(source_location)}, " "use str or pathlib.Path for files or " "(package: str, resource_name: str) tuple " "for a resource." ) DefinitionT = ty.Union[ty.Type[Block], ty.Type[ParsedStatement]] SpecT = ty.Union[ ty.Type[Parser], DefinitionT, ty.Iterable[DefinitionT], ty.Type[RootBlock], ] def build_parser_class(spec: SpecT, *, strip_spaces: bool = True, delimiters=None): """Build a custom parser class. Parameters ---------- spec specification of the content to parse. Can be one of the following things: - Parser class. - Block or ParsedStatement derived class. - Iterable of Block or ParsedStatement derived class. - RootBlock derived class. strip_spaces : bool if True, spaces will be stripped for each statement before calling ``from_string_and_config``. delimiters : dict Specify how the source file is split into statements (See below). Delimiters dictionary --------------------- The delimiters are specified with the keys of the delimiters dict. The dict files can be used to further customize the iterator. Each consist of a tuple of two elements: 1. A value of the DelimiterMode to indicate what to do with the delimiter string: skip it, attach keep it with previous or next string 2. A boolean indicating if parsing should stop after fiSBT encountering this delimiter. """ if delimiters is None: delimiters = SPLIT_EOL if isinstance(spec, type) and issubclass(spec, Parser): CustomParser = spec else: if isinstance(spec, (tuple, list)): for el in spec: if not issubclass(el, (Block, ParsedStatement)): raise TypeError( "Elements in root_block_class must be of type Block or ParsedStatement, " f"not {el}" ) @dataclass(frozen=True) class CustomRootBlock(RootBlock): pass CustomRootBlock.__annotations__["body"] = Multi[ty.Union[spec]] elif isinstance(spec, type) and issubclass(spec, RootBlock): CustomRootBlock = spec elif isinstance(spec, type) and issubclass(spec, (Block, ParsedStatement)): @dataclass(frozen=True) class CustomRootBlock(RootBlock): pass CustomRootBlock.__annotations__["body"] = Multi[spec] else: raise TypeError( "`spec` must be of type RootBlock or tuple of type Block or ParsedStatement, " f"not {type(spec)}" ) class CustomParser(Parser): _delimiters = delimiters _root_block_class = CustomRootBlock _strip_spaces = strip_spaces return CustomParser def parse( entry_point: SourceLocationT, spec: SpecT, config=None, *, strip_spaces: bool = True, delimiters=None, locator: ty.Callable[[StrictLocationT, str], StrictLocationT] = default_locator, prefer_resource_as_file: bool = True, **extra_parser_kwargs, ) -> ParsedProject: """Parse sources into a ParsedProject dictionary. Parameters ---------- entry_point file or resource, given as (package_name, resource_name). spec specification of the content to parse. Can be one of the following things: - Parser class. - Block or ParsedStatement derived class. - Iterable of Block or ParsedStatement derived class. - RootBlock derived class. config a configuration object that will be passed to `from_string_and_config` classmethod. strip_spaces : bool if True, spaces will be stripped for each statement before calling ``from_string_and_config``. delimiters : dict Specify how the source file is split into statements (See below). locator : Callable function that takes the current location and a target of an IncludeStatement and returns a new location. prefer_resource_as_file : bool if True, resources will try to be located in the filesystem if available. extra_parser_kwargs extra keyword arguments to be given to the parser. Delimiters dictionary --------------------- The delimiters are specified with the keys of the delimiters dict. The dict files can be used to further customize the iterator. Each consist of a tuple of two elements: 1. A value of the DelimiterMode to indicate what to do with the delimiter string: skip it, attach keep it with previous or next string 2. A boolean indicating if parsing should stop after fiSBT encountering this delimiter. """ CustomParser = build_parser_class( spec, strip_spaces=strip_spaces, delimiters=delimiters ) parser = CustomParser( config, prefer_resource_as_file=prefer_resource_as_file, **extra_parser_kwargs ) pp = ParsedProject() # : ty.List[Optional[ty.Union[LocatorT, str]], ...] pending: ty.List[ty.Tuple[StrictLocationT, str]] = [] if isinstance(entry_point, (str, pathlib.Path)): entry_point = pathlib.Path(entry_point) if not entry_point.is_absolute(): entry_point = pathlib.Path.cwd() / entry_point elif not (isinstance(entry_point, tuple) and len(entry_point) == 2): raise TypeError( f"Cannot handle type {type(entry_point)}, " "use str or pathlib.Path for files or " "(package: str, resource_name: str) tuple " "for a resource." ) pp[None] = parsed = parser.parse(entry_point) pending.extend( (parsed.location, el.target) for el in parsed.parsed_source.filter_by(IncludeStatement) ) while pending: source_location, target = pending.pop(0) pp[(source_location, target)] = parsed = parser.parse( locator(source_location, target) ) pending.extend( (parsed.location, el.target) for el in parsed.parsed_source.filter_by(IncludeStatement) ) return pp def parse_bytes( content: bytes, spec: SpecT, config=None, *, strip_spaces: bool = True, delimiters=None, **extra_parser_kwargs, ) -> ParsedProject: """Parse sources into a ParsedProject dictionary. Parameters ---------- content bytes. spec specification of the content to parse. Can be one of the following things: - Parser class. - Block or ParsedStatement derived class. - Iterable of Block or ParsedStatement derived class. - RootBlock derived class. config a configuration object that will be passed to `from_string_and_config` classmethod. strip_spaces : bool if True, spaces will be stripped for each statement before calling ``from_string_and_config``. delimiters : dict Specify how the source file is split into statements (See below). """ CustomParser = build_parser_class( spec, strip_spaces=strip_spaces, delimiters=delimiters ) parser = CustomParser(config, prefer_resource_as_file=False, **extra_parser_kwargs) pp = ParsedProject() pp[None] = parsed = parser.parse_bytes(content) if any(parsed.parsed_source.filter_by(IncludeStatement)): raise ValueError("parse_bytes does not support using an IncludeStatement") return pp