summaryrefslogtreecommitdiff
path: root/main.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2017-06-04 14:27:02 +0200
committerAnthon van der Neut <anthon@mnt.org>2017-06-04 14:27:02 +0200
commita0ff58af5242fddc39236168b18aafb7c8b324da (patch)
treeb7ba0da53b720dde7be6e3b32ad927456c6f5dcf /main.py
parent6ba66a44af41d072f5ceddfcdf2c21611c2a7cd0 (diff)
downloadruamel.yaml-a0ff58af5242fddc39236168b18aafb7c8b324da.tar.gz
initial version with YAML class0.15.0
Some new tests are still required
Diffstat (limited to 'main.py')
-rw-r--r--main.py380
1 files changed, 380 insertions, 0 deletions
diff --git a/main.py b/main.py
index dc6ed92..65f933a 100644
--- a/main.py
+++ b/main.py
@@ -2,6 +2,7 @@
from __future__ import absolute_import, unicode_literals
+import sys
import warnings
import ruamel.yaml
@@ -25,8 +26,387 @@ if False: # MYPY
from typing import List, Set, Dict, Union, Any # NOQA
from ruamel.yaml.compat import StreamType, StreamTextType, VersionType # NOQA
+try:
+ from _ruamel_yaml import CParser, CEmitter # type: ignore
+except:
+ CParser = CEmitter = None
+
# import io
+enforce = object()
+
+
+# YAML is an acronym, i.e. spoken: rhymes with "camel". And thus a
+# subset of abbreviations, which should all caps according to PEP8
+
+class YAML(object):
+ def __init__(self, _kw=enforce, typ=None, pure=False):
+ # type: (Any, Any, Any) -> None
+ """
+ _kw: not used, forces keyword arguments in 2.7 (in 3 you can do (*, safe_load=..)
+ typ: 'rt'/None -> RoundTripLoader/RoundTripDumper, (default)
+ 'safe' -> SafeLoader/SafeDumper,
+ 'unsafe' -> normal/unsafe Loader/Dumper
+ 'base' -> baseloader
+ pure: if True only use Python modules
+ """
+ if _kw is not enforce:
+ raise TypeError("{}.__init__() takes no positional argument but at least "
+ "one was given ({!r})".format(self.__class__.__name__, _kw))
+
+ self.typ = 'rt' if typ is None else typ
+ self.Resolver = ruamel.yaml.resolver.VersionedResolver
+ self.allow_unicode = True
+ self.Reader = None # type: Any
+ self.Scanner = None # type: Any
+ self.Serializer = None # type: Any
+ if self.typ == 'rt':
+ # no optimized rt-dumper yet
+ self.Emitter = ruamel.yaml.emitter.Emitter # type: Any
+ self.Serializer = ruamel.yaml.serializer.Serializer # type: Any
+ self.Representer = ruamel.yaml.representer.RoundTripRepresenter # type: Any
+ self.Scanner = ruamel.yaml.scanner.RoundTripScanner # type: Any
+ # no optimized rt-parser yet
+ self.Parser = ruamel.yaml.parser.RoundTripParser # type: Any
+ self.Composer = ruamel.yaml.composer.Composer # type: Any
+ self.Constructor = ruamel.yaml.constructor.RoundTripConstructor # type: Any
+ elif self.typ == 'safe':
+ self.Emitter = ruamel.yaml.emitter.Emitter if pure or CEmitter is None \
+ else CEmitter
+ self.Representer = ruamel.yaml.representer.SafeRepresenter
+ self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
+ self.Composer = ruamel.yaml.composer.Composer
+ self.Constructor = ruamel.yaml.constructor.SafeConstructor
+ elif self.typ == 'base':
+ self.Emitter = ruamel.yaml.emitter.Emitter
+ self.Representer = ruamel.yaml.representer.BaseRepresenter
+ self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
+ self.Composer = ruamel.yaml.composer.Composer
+ self.Constructor = ruamel.yaml.constructor.BaseConstructor
+ elif self.typ == 'unsafe':
+ self.Emitter = ruamel.yaml.emitter.Emitter
+ self.Representer = ruamel.yaml.representer.Representer
+ self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
+ self.Composer = ruamel.yaml.composer.Composer
+ self.Constructor = ruamel.yaml.constructor.Constructor
+ self.stream = None
+ self.canonical = None
+ self.indent = None
+ self.width = None
+ self.line_break = None
+ self.block_seq_indent = None
+ self.top_level_colon_align = None
+ self.prefix_colon = None
+ self.version = None
+ self.preserve_quotes = None
+ self.encoding = None
+ self.explicit_start = None
+ self.explicit_end = None
+ self.tags = None
+ self.default_style = None
+ self.default_flow_style = None
+
+ @property
+ def reader(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ setattr(self, attr, self.Reader(None, loader=self))
+ return getattr(self, attr)
+
+ @property
+ def scanner(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ setattr(self, attr, self.Scanner(loader=self))
+ return getattr(self, attr)
+
+ @property
+ def parser(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ if self.Parser is not CParser:
+ setattr(self, attr, self.Parser(loader=self))
+ else:
+ if getattr(self, '_stream', None) is None:
+ # wait for the stream
+ return None
+ else:
+ # if not hasattr(self._stream, 'read') and hasattr(self._stream, 'open'):
+ # # pathlib.Path() instance
+ # setattr(self, attr, CParser(self._stream))
+ # else:
+ setattr(self, attr, CParser(self._stream)) # type: ignore
+ # self._parser = self._composer = self
+ # print('scanner', self.loader.scanner)
+
+ return getattr(self, attr)
+
+ @property
+ def composer(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ setattr(self, attr, self.Composer(loader=self))
+ return getattr(self, attr)
+
+ @property
+ def constructor(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ setattr(self, attr, self.Constructor(
+ preserve_quotes=self.preserve_quotes, loader=self))
+ return getattr(self, attr)
+
+ @property
+ def resolver(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ setattr(self, attr, self.Resolver(
+ version=self.version, loader=self))
+ return getattr(self, attr)
+
+ @property
+ def emitter(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ if self.Emitter is not CEmitter:
+ setattr(self, attr, self.Emitter(
+ None, canonical=self.canonical,
+ indent=self.indent, width=self.width,
+ allow_unicode=self.allow_unicode, line_break=self.line_break,
+ block_seq_indent=self.block_seq_indent,
+ dumper=self))
+ else:
+ if getattr(self, '_stream', None) is None:
+ # wait for the stream
+ return None
+ return None
+ return getattr(self, attr)
+
+ @property
+ def serializer(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ setattr(self, attr, self.Serializer(
+ encoding=self.encoding,
+ explicit_start=self.explicit_start, explicit_end=self.explicit_end,
+ version=self.version, tags=self.tags, dumper=self))
+ return getattr(self, attr)
+
+ @property
+ def representer(self):
+ # type: () -> Any
+ attr = '_' + sys._getframe().f_code.co_name
+ if not hasattr(self, attr):
+ setattr(self, attr, self.Representer(
+ default_style=self.default_style,
+ default_flow_style=self.default_flow_style,
+ dumper=self))
+ return getattr(self, attr)
+
+ # separate output resolver?
+
+ def load(self, stream):
+ # type: (StreamTextType) -> Any
+ """
+ at this point you either have the non-pure Parser (which has its own reader and
+ scanner) or you have the pure Parser.
+ If the pure Parser is set, then set the Reader and Scanner, if not already set.
+ If either the Scanner or Reader are set, you cannot use the non-pure Parser,
+ so reset it to the pure parser and set the Reader resp. Scanner if necessary
+ """
+ if not hasattr(stream, 'read') and hasattr(stream, 'open'):
+ # pathlib.Path() instance
+ with stream.open('r') as fp: # type: ignore
+ return self.load(fp)
+ constructor, parser = self.get_constructor_parser(stream)
+ try:
+ return constructor.get_single_data()
+ finally:
+ parser.dispose()
+
+ def load_all(self, stream, _kw=enforce): # , skip=None):
+ # type: (StreamTextType, Any) -> Any
+ if _kw is not enforce:
+ raise TypeError("{}.__init__() takes no positional argument but at least "
+ "one was given ({!r})".format(self.__class__.__name__, _kw))
+ if not hasattr(stream, 'read') and hasattr(stream, 'open'):
+ # pathlib.Path() instance
+ with stream.open('r') as fp: # type: ignore
+ yield self.load_all(fp, _kw=enforce)
+ # if skip is None:
+ # skip = []
+ # elif isinstance(skip, int):
+ # skip = [skip]
+ constructor, parser = self.get_constructor_parser(stream)
+ try:
+ while constructor.check_data():
+ yield constructor.get_data()
+ finally:
+ parser.dispose()
+
+ def get_constructor_parser(self, stream):
+ # type: (StreamTextType) -> Any
+ """
+ the old cyaml needs special setup, and therefore the stream
+ """
+ if self.Parser is not CParser:
+ if self.Reader is None:
+ self.Reader = ruamel.yaml.reader.Reader
+ if self.Scanner is None:
+ self.Scanner = ruamel.yaml.scanner.Scanner
+ self.reader.stream = stream
+ else:
+ if self.Reader is not None:
+ if self.Scanner is None:
+ self.Scanner = ruamel.yaml.scanner.Scanner
+ self.Parser = ruamel.yaml.parser.Parser
+ self.reader.stream = stream
+ elif self.Scanner is not None:
+ if self.Reader is None:
+ self.Reader = ruamel.yaml.reader.Reader
+ self.Parser = ruamel.yaml.parser.Parser
+ self.reader.stream = stream
+ else:
+ # combined C level reader>scanner>parser
+ # does some calls to the resolver, e.g. BaseResolver.descend_resolver
+ # if you just initialise the CParser, to much of resolver.py
+ # is actually used
+ rslvr = self.Resolver
+ if rslvr is ruamel.yaml.resolver.VersionedResolver:
+ rslvr = ruamel.yaml.resolver.Resolver
+
+ class XLoader(self.Parser, self.Constructor, rslvr): # type: ignore
+ def __init__(selfx, stream, version=None, preserve_quotes=None):
+ # type: (StreamTextType, VersionType, bool) -> None
+ CParser.__init__(selfx, stream)
+ selfx._parser = selfx._composer = selfx
+ self.Constructor.__init__(selfx, loader=selfx)
+ rslvr.__init__(selfx, loadumper=selfx)
+ self._stream = stream
+ loader = XLoader(stream)
+ return loader, loader
+ return self.constructor, self.parser
+
+ def dump(self, data, stream=None):
+ # type: (Any, StreamType) -> Any
+ return self.dump_all([data], stream)
+
+ def dump_all(self, documents, stream=None):
+ # type: (Any, StreamType) -> Any
+ """
+ Serialize a sequence of Python objects into a YAML stream.
+ If stream is None, return the produced string instead.
+ """
+ # The stream should have the methods `write` and possibly `flush`.
+ if not hasattr(stream, 'write') and hasattr(stream, 'open'):
+ # pathlib.Path() instance
+ with stream.open('w') as fp: # type: ignore
+ return self.dump_all(documents, fp)
+ getvalue = None
+ if self.top_level_colon_align is True:
+ tlca = max([len(str(x)) for x in documents[0]]) # type: Any
+ else:
+ tlca = self.top_level_colon_align
+ if stream is None:
+ if self.encoding is None:
+ stream = StringIO()
+ else:
+ stream = BytesIO()
+ getvalue = stream.getvalue
+ serializer, representer, emitter = \
+ self.get_serializer_representer_emitter(stream, tlca)
+ try:
+ self.serializer.open()
+ for data in documents:
+ try:
+ self.representer.represent(data)
+ except AttributeError:
+ # print(dir(dumper._representer))
+ raise
+ self.serializer.close()
+ finally:
+ try:
+ self.emitter.dispose()
+ except AttributeError:
+ raise
+ # self.dumper.dispose() # cyaml
+ delattr(self, "_serializer")
+ delattr(self, "_emitter")
+ if getvalue is not None:
+ return getvalue()
+ return None
+
+ def get_serializer_representer_emitter(self, stream, tlca):
+ # type: (StreamType, Any) -> Any
+ # we have only .Serializer to deal with (vs .Reader & .Scanner), much simpler
+ if self.Emitter is not CEmitter:
+ if self.Serializer is None:
+ self.Serializer = ruamel.yaml.serializer.Serializer
+ self.emitter.stream = stream
+ self.emitter.top_level_colon_align = tlca
+ return self.serializer, self.representer, self.emitter
+ if self.Serializer is not None:
+ # cannot set serializer with CEmitter
+ self.Emitter = ruamel.yaml.emitter.Emitter
+ self.emitter.stream = stream
+ self.emitter.top_level_colon_align = tlca
+ return self.serializer, self.representer, self.emitter
+ # C routines
+
+ rslvr = ruamel.yaml.resolver.BaseResolver if self.typ == 'base' \
+ else ruamel.yaml.resolver.Resolver
+
+ class XDumper(CEmitter, self.Representer, rslvr): # type: ignore
+ def __init__(selfx, stream,
+ default_style=None, default_flow_style=None,
+ canonical=None, indent=None, width=None,
+ allow_unicode=None, line_break=None,
+ encoding=None, explicit_start=None, explicit_end=None,
+ version=None, tags=None, block_seq_indent=None,
+ top_level_colon_align=None, prefix_colon=None):
+ # type: (StreamType, Any, Any, Any, bool, Union[None, int], Union[None, int], bool, Any, Any, Union[None, bool], Union[None, bool], Any, Any, Any, Any, Any) -> None # NOQA
+ CEmitter.__init__(selfx, stream, canonical=canonical,
+ indent=indent, width=width, encoding=encoding,
+ allow_unicode=allow_unicode, line_break=line_break,
+ explicit_start=explicit_start,
+ explicit_end=explicit_end,
+ version=version, tags=tags)
+ selfx._emitter = selfx._serializer = selfx._representer = selfx
+ Representer.__init__(selfx, default_style=default_style,
+ default_flow_style=default_flow_style)
+ Resolver.__init__(selfx)
+ self._stream = stream
+ dumper = XDumper(stream)
+ self._emitter = self._serializer = dumper
+ return dumper, dumper, dumper
+
+ # basic types
+ def map(self, **kw):
+ # type: (Any) -> Any
+ if self.typ == 'rt':
+ from ruamel.yaml.comments import CommentedMap
+ return CommentedMap(**kw)
+ else:
+ return dict(**kw)
+
+ def seq(self, *args):
+ # type: (Any) -> Any
+ if self.typ == 'rt':
+ from ruamel.yaml.comments import CommentedSeq
+ return CommentedSeq(*args)
+ else:
+ return list(*args)
+
+
+########################################################################################
def scan(stream, Loader=Loader):
# type: (StreamTextType, Any) -> Any