diff options
author | Anthon van der Neut <anthon@mnt.org> | 2017-06-04 14:27:02 +0200 |
---|---|---|
committer | Anthon van der Neut <anthon@mnt.org> | 2017-06-04 14:27:02 +0200 |
commit | a0ff58af5242fddc39236168b18aafb7c8b324da (patch) | |
tree | b7ba0da53b720dde7be6e3b32ad927456c6f5dcf /main.py | |
parent | 6ba66a44af41d072f5ceddfcdf2c21611c2a7cd0 (diff) | |
download | ruamel.yaml-a0ff58af5242fddc39236168b18aafb7c8b324da.tar.gz |
initial version with YAML class0.15.0
Some new tests are still required
Diffstat (limited to 'main.py')
-rw-r--r-- | main.py | 380 |
1 files changed, 380 insertions, 0 deletions
@@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals +import sys import warnings import ruamel.yaml @@ -25,8 +26,387 @@ if False: # MYPY from typing import List, Set, Dict, Union, Any # NOQA from ruamel.yaml.compat import StreamType, StreamTextType, VersionType # NOQA +try: + from _ruamel_yaml import CParser, CEmitter # type: ignore +except: + CParser = CEmitter = None + # import io +enforce = object() + + +# YAML is an acronym, i.e. spoken: rhymes with "camel". And thus a +# subset of abbreviations, which should all caps according to PEP8 + +class YAML(object): + def __init__(self, _kw=enforce, typ=None, pure=False): + # type: (Any, Any, Any) -> None + """ + _kw: not used, forces keyword arguments in 2.7 (in 3 you can do (*, safe_load=..) + typ: 'rt'/None -> RoundTripLoader/RoundTripDumper, (default) + 'safe' -> SafeLoader/SafeDumper, + 'unsafe' -> normal/unsafe Loader/Dumper + 'base' -> baseloader + pure: if True only use Python modules + """ + if _kw is not enforce: + raise TypeError("{}.__init__() takes no positional argument but at least " + "one was given ({!r})".format(self.__class__.__name__, _kw)) + + self.typ = 'rt' if typ is None else typ + self.Resolver = ruamel.yaml.resolver.VersionedResolver + self.allow_unicode = True + self.Reader = None # type: Any + self.Scanner = None # type: Any + self.Serializer = None # type: Any + if self.typ == 'rt': + # no optimized rt-dumper yet + self.Emitter = ruamel.yaml.emitter.Emitter # type: Any + self.Serializer = ruamel.yaml.serializer.Serializer # type: Any + self.Representer = ruamel.yaml.representer.RoundTripRepresenter # type: Any + self.Scanner = ruamel.yaml.scanner.RoundTripScanner # type: Any + # no optimized rt-parser yet + self.Parser = ruamel.yaml.parser.RoundTripParser # type: Any + self.Composer = ruamel.yaml.composer.Composer # type: Any + self.Constructor = ruamel.yaml.constructor.RoundTripConstructor # type: Any + elif self.typ == 'safe': + self.Emitter = ruamel.yaml.emitter.Emitter if pure or CEmitter is None \ + else CEmitter + self.Representer = ruamel.yaml.representer.SafeRepresenter + self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser + self.Composer = ruamel.yaml.composer.Composer + self.Constructor = ruamel.yaml.constructor.SafeConstructor + elif self.typ == 'base': + self.Emitter = ruamel.yaml.emitter.Emitter + self.Representer = ruamel.yaml.representer.BaseRepresenter + self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser + self.Composer = ruamel.yaml.composer.Composer + self.Constructor = ruamel.yaml.constructor.BaseConstructor + elif self.typ == 'unsafe': + self.Emitter = ruamel.yaml.emitter.Emitter + self.Representer = ruamel.yaml.representer.Representer + self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser + self.Composer = ruamel.yaml.composer.Composer + self.Constructor = ruamel.yaml.constructor.Constructor + self.stream = None + self.canonical = None + self.indent = None + self.width = None + self.line_break = None + self.block_seq_indent = None + self.top_level_colon_align = None + self.prefix_colon = None + self.version = None + self.preserve_quotes = None + self.encoding = None + self.explicit_start = None + self.explicit_end = None + self.tags = None + self.default_style = None + self.default_flow_style = None + + @property + def reader(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + setattr(self, attr, self.Reader(None, loader=self)) + return getattr(self, attr) + + @property + def scanner(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + setattr(self, attr, self.Scanner(loader=self)) + return getattr(self, attr) + + @property + def parser(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + if self.Parser is not CParser: + setattr(self, attr, self.Parser(loader=self)) + else: + if getattr(self, '_stream', None) is None: + # wait for the stream + return None + else: + # if not hasattr(self._stream, 'read') and hasattr(self._stream, 'open'): + # # pathlib.Path() instance + # setattr(self, attr, CParser(self._stream)) + # else: + setattr(self, attr, CParser(self._stream)) # type: ignore + # self._parser = self._composer = self + # print('scanner', self.loader.scanner) + + return getattr(self, attr) + + @property + def composer(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + setattr(self, attr, self.Composer(loader=self)) + return getattr(self, attr) + + @property + def constructor(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + setattr(self, attr, self.Constructor( + preserve_quotes=self.preserve_quotes, loader=self)) + return getattr(self, attr) + + @property + def resolver(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + setattr(self, attr, self.Resolver( + version=self.version, loader=self)) + return getattr(self, attr) + + @property + def emitter(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + if self.Emitter is not CEmitter: + setattr(self, attr, self.Emitter( + None, canonical=self.canonical, + indent=self.indent, width=self.width, + allow_unicode=self.allow_unicode, line_break=self.line_break, + block_seq_indent=self.block_seq_indent, + dumper=self)) + else: + if getattr(self, '_stream', None) is None: + # wait for the stream + return None + return None + return getattr(self, attr) + + @property + def serializer(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + setattr(self, attr, self.Serializer( + encoding=self.encoding, + explicit_start=self.explicit_start, explicit_end=self.explicit_end, + version=self.version, tags=self.tags, dumper=self)) + return getattr(self, attr) + + @property + def representer(self): + # type: () -> Any + attr = '_' + sys._getframe().f_code.co_name + if not hasattr(self, attr): + setattr(self, attr, self.Representer( + default_style=self.default_style, + default_flow_style=self.default_flow_style, + dumper=self)) + return getattr(self, attr) + + # separate output resolver? + + def load(self, stream): + # type: (StreamTextType) -> Any + """ + at this point you either have the non-pure Parser (which has its own reader and + scanner) or you have the pure Parser. + If the pure Parser is set, then set the Reader and Scanner, if not already set. + If either the Scanner or Reader are set, you cannot use the non-pure Parser, + so reset it to the pure parser and set the Reader resp. Scanner if necessary + """ + if not hasattr(stream, 'read') and hasattr(stream, 'open'): + # pathlib.Path() instance + with stream.open('r') as fp: # type: ignore + return self.load(fp) + constructor, parser = self.get_constructor_parser(stream) + try: + return constructor.get_single_data() + finally: + parser.dispose() + + def load_all(self, stream, _kw=enforce): # , skip=None): + # type: (StreamTextType, Any) -> Any + if _kw is not enforce: + raise TypeError("{}.__init__() takes no positional argument but at least " + "one was given ({!r})".format(self.__class__.__name__, _kw)) + if not hasattr(stream, 'read') and hasattr(stream, 'open'): + # pathlib.Path() instance + with stream.open('r') as fp: # type: ignore + yield self.load_all(fp, _kw=enforce) + # if skip is None: + # skip = [] + # elif isinstance(skip, int): + # skip = [skip] + constructor, parser = self.get_constructor_parser(stream) + try: + while constructor.check_data(): + yield constructor.get_data() + finally: + parser.dispose() + + def get_constructor_parser(self, stream): + # type: (StreamTextType) -> Any + """ + the old cyaml needs special setup, and therefore the stream + """ + if self.Parser is not CParser: + if self.Reader is None: + self.Reader = ruamel.yaml.reader.Reader + if self.Scanner is None: + self.Scanner = ruamel.yaml.scanner.Scanner + self.reader.stream = stream + else: + if self.Reader is not None: + if self.Scanner is None: + self.Scanner = ruamel.yaml.scanner.Scanner + self.Parser = ruamel.yaml.parser.Parser + self.reader.stream = stream + elif self.Scanner is not None: + if self.Reader is None: + self.Reader = ruamel.yaml.reader.Reader + self.Parser = ruamel.yaml.parser.Parser + self.reader.stream = stream + else: + # combined C level reader>scanner>parser + # does some calls to the resolver, e.g. BaseResolver.descend_resolver + # if you just initialise the CParser, to much of resolver.py + # is actually used + rslvr = self.Resolver + if rslvr is ruamel.yaml.resolver.VersionedResolver: + rslvr = ruamel.yaml.resolver.Resolver + + class XLoader(self.Parser, self.Constructor, rslvr): # type: ignore + def __init__(selfx, stream, version=None, preserve_quotes=None): + # type: (StreamTextType, VersionType, bool) -> None + CParser.__init__(selfx, stream) + selfx._parser = selfx._composer = selfx + self.Constructor.__init__(selfx, loader=selfx) + rslvr.__init__(selfx, loadumper=selfx) + self._stream = stream + loader = XLoader(stream) + return loader, loader + return self.constructor, self.parser + + def dump(self, data, stream=None): + # type: (Any, StreamType) -> Any + return self.dump_all([data], stream) + + def dump_all(self, documents, stream=None): + # type: (Any, StreamType) -> Any + """ + Serialize a sequence of Python objects into a YAML stream. + If stream is None, return the produced string instead. + """ + # The stream should have the methods `write` and possibly `flush`. + if not hasattr(stream, 'write') and hasattr(stream, 'open'): + # pathlib.Path() instance + with stream.open('w') as fp: # type: ignore + return self.dump_all(documents, fp) + getvalue = None + if self.top_level_colon_align is True: + tlca = max([len(str(x)) for x in documents[0]]) # type: Any + else: + tlca = self.top_level_colon_align + if stream is None: + if self.encoding is None: + stream = StringIO() + else: + stream = BytesIO() + getvalue = stream.getvalue + serializer, representer, emitter = \ + self.get_serializer_representer_emitter(stream, tlca) + try: + self.serializer.open() + for data in documents: + try: + self.representer.represent(data) + except AttributeError: + # print(dir(dumper._representer)) + raise + self.serializer.close() + finally: + try: + self.emitter.dispose() + except AttributeError: + raise + # self.dumper.dispose() # cyaml + delattr(self, "_serializer") + delattr(self, "_emitter") + if getvalue is not None: + return getvalue() + return None + + def get_serializer_representer_emitter(self, stream, tlca): + # type: (StreamType, Any) -> Any + # we have only .Serializer to deal with (vs .Reader & .Scanner), much simpler + if self.Emitter is not CEmitter: + if self.Serializer is None: + self.Serializer = ruamel.yaml.serializer.Serializer + self.emitter.stream = stream + self.emitter.top_level_colon_align = tlca + return self.serializer, self.representer, self.emitter + if self.Serializer is not None: + # cannot set serializer with CEmitter + self.Emitter = ruamel.yaml.emitter.Emitter + self.emitter.stream = stream + self.emitter.top_level_colon_align = tlca + return self.serializer, self.representer, self.emitter + # C routines + + rslvr = ruamel.yaml.resolver.BaseResolver if self.typ == 'base' \ + else ruamel.yaml.resolver.Resolver + + class XDumper(CEmitter, self.Representer, rslvr): # type: ignore + def __init__(selfx, stream, + default_style=None, default_flow_style=None, + canonical=None, indent=None, width=None, + allow_unicode=None, line_break=None, + encoding=None, explicit_start=None, explicit_end=None, + version=None, tags=None, block_seq_indent=None, + top_level_colon_align=None, prefix_colon=None): + # type: (StreamType, Any, Any, Any, bool, Union[None, int], Union[None, int], bool, Any, Any, Union[None, bool], Union[None, bool], Any, Any, Any, Any, Any) -> None # NOQA + CEmitter.__init__(selfx, stream, canonical=canonical, + indent=indent, width=width, encoding=encoding, + allow_unicode=allow_unicode, line_break=line_break, + explicit_start=explicit_start, + explicit_end=explicit_end, + version=version, tags=tags) + selfx._emitter = selfx._serializer = selfx._representer = selfx + Representer.__init__(selfx, default_style=default_style, + default_flow_style=default_flow_style) + Resolver.__init__(selfx) + self._stream = stream + dumper = XDumper(stream) + self._emitter = self._serializer = dumper + return dumper, dumper, dumper + + # basic types + def map(self, **kw): + # type: (Any) -> Any + if self.typ == 'rt': + from ruamel.yaml.comments import CommentedMap + return CommentedMap(**kw) + else: + return dict(**kw) + + def seq(self, *args): + # type: (Any) -> Any + if self.typ == 'rt': + from ruamel.yaml.comments import CommentedSeq + return CommentedSeq(*args) + else: + return list(*args) + + +######################################################################################## def scan(stream, Loader=Loader): # type: (StreamTextType, Any) -> Any |