diff options
Diffstat (limited to 'py/yaml.py')
-rw-r--r-- | py/yaml.py | 527 |
1 files changed, 0 insertions, 527 deletions
diff --git a/py/yaml.py b/py/yaml.py deleted file mode 100644 index 6eda0ab..0000000 --- a/py/yaml.py +++ /dev/null @@ -1,527 +0,0 @@ -# coding: utf-8 - -""" -this is the source for the yaml utility -""" - -from __future__ import print_function -from __future__ import absolute_import - - -import sys -import os -import io -from textwrap import dedent - -from ruamel.std.argparse import ProgramBase, option, sub_parser, version, \ - CountAction, SmartFormatter -# from ruamel.appconfig import AppConfig -from . import __version__ - -import ruamel.yaml -from ruamel.yaml.compat import ordereddict, DBG_TOKEN, DBG_EVENT, DBG_NODE -from ruamel.yaml.configobjwalker import configobj_walker - - -def yaml_to_html2(code): - buf = io.StringIO() - buf.write(u'<HTML>\n') - buf.write(u'<HEAD>\n') - buf.write(u'</HEAD>\n') - buf.write(u'<BODY>\n') - buf.write(u'<TABLE>\n') - if isinstance(code, dict): - for k in code: - buf.write(u' <TR>\n') - for x in [k] + code[k]: - buf.write(u' <TD>{0}</TD>\n'.format(x)) - buf.write(u' </TR>\n') - buf.write(u'<TABLE>\n') - buf.write(u'</BODY>\n') - buf.write(u'</HTML>\n') - return buf.getvalue() - - -def yaml_to_html(code, level): - if level == 2: - return yaml_to_html2(code) - elif level == 3: - return yaml_to_html3(code) - raise NotImplementedError - - -class YAML: - def __init__(self, args, config): - self._args = args - self._config = config - - def from_ini(self): - try: - from configobj import ConfigObj - except ImportError: - print("to convert from .ini you need to install configobj:") - print(" pip install configobj:") - sys.exit(1) - errors = 0 - doc = [] - cfg = ConfigObj(open(self._args.file)) - if self._args.test: - print(ruamel.yaml.dump(cfg)) - return - for line in configobj_walker(cfg): - doc.append(line) - joined = '\n'.join(doc) - rto = self.round_trip_single(joined) - if self._args.basename: - out_fn = os.path.splitext(self._args.file)[0] + '.yaml' - if self._args.verbose > 0: - print('writing', out_fn) - with open(out_fn, 'w') as fp: - print(rto, end='', file=fp) # already has eol at eof - else: - print(rto, end='') # already has eol at eof - # print() - # if rto != joined: - # self.diff(joined, rto, "test.ini") - return 1 if errors else 0 - - def test(self): - self._args.event = self._args.node = True - dbg = 0 - if self._args.event: - dbg |= DBG_EVENT - if self._args.node: - dbg |= DBG_NODE - os.environ['YAMLDEBUG'] = str(dbg) - if False: - x = ruamel.yaml.comment.Comment() - print(sys.getsizeof(x)) - return - - def print_input(input): - print(input, end='') - print('-' * 15) - - def print_tokens(input): - print('Tokens (from scanner) ' + '#' * 50) - tokens = ruamel.yaml.scan(input, ruamel.yaml.RoundTripLoader) - for idx, token in enumerate(tokens): - # print(token.start_mark) - # print(token.end_mark) - print("{0:2} {1}".format(idx, token)) - - def rt_events(input): - dumper = ruamel.yaml.RoundTripDumper - events = ruamel.yaml.parse(input, ruamel.yaml.RoundTripLoader) - print(ruamel.yaml.emit(events, indent=False, Dumper=dumper)) - - def rt_nodes(input): - dumper = ruamel.yaml.RoundTripDumper - nodes = ruamel.yaml.compose(input, ruamel.yaml.RoundTripLoader) - print(ruamel.yaml.serialize(nodes, indent=False, Dumper=dumper)) - - def print_events(input): - print('Events (from parser) ' + '#' * 50) - events = ruamel.yaml.parse(input, ruamel.yaml.RoundTripLoader) - for idx, event in enumerate(events): - print("{0:2} {1}".format(idx, event)) - - def print_nodes(input): - print('Nodes (from composer) ' + '#' * 50) - x = ruamel.yaml.compose(input, ruamel.yaml.RoundTripLoader) - x.dump() # dump the node - - def scan_file(file_name): - inp = open(file_name).read() - print('---------\n', file_name) - print('---', repr(self.first_non_empty_line(inp))) - print('<<<', repr(self.last_non_empty_line(inp))) - - if False: - for x in self._args.file: - scan_file(x) - return - - if True: - import pickle - lines = 0 - for x in self._args.file: - print(x, end=' ') - if x.endswith('.yaml'): - data = ruamel.yaml.load(open(x)) - print(len(data), end=' ') - lines += len(data) - out_name = x.replace('.yaml', '.pickle') - with open(out_name, 'w') as fp: - pickle.dump(data, fp) - elif x.endswith('.pickle'): - with open(x) as fp: - data = pickle.load(fp) - print(len(data), end=' ') - lines += len(data) - print() - print('lines', lines) - return - - input = dedent(""" - application: web2py - version: 1 - runtime: python27 - api_version: 1 - threadsafe: false - - default_expiration: "24h" - - handlers: - - url: /(?P<a>.+?)/static/(?P<b>.+) - static_files: 'applications/\\1/static/\\2' - upload: applications/(.+?)/static/(.+) - secure: optional - """) - - input = dedent("""\ - a: - b: foo - c: bar - """) - - print_input(input) - print_tokens(input) - print_events(input) - # rt_events(input) - print_nodes(input) - # rt_nodes(input) - - data = ruamel.yaml.load(input, ruamel.yaml.RoundTripLoader) - print('data', data) - if False: - data['american'][0] = 'Fijenoord' - l = data['american'] - l = data - if True: - # print type(l), '\n', dir(l) - comment = getattr(l, '_yaml_comment', None) - print('comment_1', comment) - dumper = ruamel.yaml.RoundTripDumper - print('>>>>>>>>>>') - # print(ruamel.yaml.dump(data, default_flow_style=False, - # Dumper=dumper), '===========') - print("{0}=========".format(ruamel.yaml.dump( - data, - indent=4, - Dumper=dumper))) - comment = getattr(l, '_yaml_comment', None) - print('comment_2', comment) - - # test end - - def from_json(self): - # use roundtrip to preserve order - errors = 0 - docs = [] - for file_name in self._args.file: - if file_name == '-': - inp = sys.stdin.read() - else: - inp = open(file_name).read() - loader = ruamel.yaml.Loader # RoundTripLoader - docs.append(ruamel.yaml.load(inp, loader)) - #if self._args.literal: - # from ruamel.yaml.convert.literal import walk_tree - # for doc in docs: - # walk_tree(doc) - dumper = ruamel.yaml.RoundTripDumper - print(ruamel.yaml.dump_all( - docs, Dumper=dumper, - default_flow_style=self._args.flow)) - return 1 if errors else 0 - - def to_htmltable(self): - def vals(x): - if isinstance(x, list): - return x - if isinstance(x, (dict, ordereddict)): - return x.values() - return [] - - def seek_levels(x, count=0): - my_level = count - sub_level = 0 - for v in vals(x): - if v is None: - continue - sub_level = max(sub_level, seek_levels(v, my_level+1)) - return max(my_level, sub_level) - - inp = open(self._args.file).read() - loader = ruamel.yaml.RoundTripLoader - code = ruamel.yaml.load(inp, loader) - # assert isinstance(code, [ruamel.yaml.comments.CommentedMap]) - assert isinstance(code, (dict, list)) - levels = seek_levels(code) - if self._args.level: - print("levels:", levels) - return - print(yaml_to_html(code, levels)) - - def from_html(self): - from .convert.html import HTML2YAML - h2y = HTML2YAML(self._args) - with open(self._args.file) as fp: - print(h2y(fp.read())) - - def from_csv(self): - from .convert.from_csv import CSV2YAML - c2y = CSV2YAML(self._args) - c2y(self._args.file) - - def round_trip(self): - errors = 0 - warnings = 0 - for file_name in self._args.file: - inp = open(file_name).read() - e, w, stabilize, outp = self.round_trip_input(inp) - if w == 0: - if self._args.verbose > 0: - print(u"{0}: ok".format(file_name)) - continue - if self._args.save: - backup_file_name = file_name + '.orig' - if not os.path.exists(backup_file_name): - os.rename(file_name, backup_file_name) - with open(file_name, 'w') as ofp: - ofp.write(outp) - if not self._args.save or self._args.verbose > 0: - print("{0}:\n {1}".format(file_name, u', '.join(stabilize))) - self.diff(inp, outp, file_name) - errors += e - warnings += w - if errors > 0: - return 2 - if warnings > 0: - return 1 - return 0 - - def round_trip_input(self, inp): - errors = 0 - warnings = 0 - stabilize = [] - outp = self.round_trip_single(inp) - if inp == outp: - return errors, warnings, stabilize, outp - warnings += 1 - if inp.split() != outp.split(): - errors += 1 - stabilize.append(u"drops info on round trip") - else: - if self.round_trip_single(outp) == outp: - stabilize.append(u"stabilizes on second round trip") - else: - errors += 1 - ncoutp = self.round_trip_single(inp, drop_comment=True) - if self.round_trip_single(ncoutp, drop_comment=True) == ncoutp: - stabilize.append(u"ok without comments") - return errors, warnings, stabilize, outp - - def round_trip_single(self, inp, drop_comment=False): - explicit_start=self.first_non_empty_line(inp) == '---' - explicit_end=self.last_non_empty_line(inp) == '...' - indent = self._args.indent - loader = ruamel.yaml.SafeLoader if drop_comment else \ - ruamel.yaml.RoundTripLoader - code = ruamel.yaml.load(inp, loader) - dumper = ruamel.yaml.SafeDumper if drop_comment else \ - ruamel.yaml.RoundTripDumper - return ruamel.yaml.dump( - code, - Dumper=dumper, - indent=indent, - explicit_start=explicit_start, - explicit_end=explicit_end, - ) - - def first_non_empty_line(self, txt): - """return the first non-empty line of a block of text (stripped) - do not split or strip the complete txt - """ - pos = txt.find('\n') - prev_pos = 0 - while pos >= 0: - segment = txt[prev_pos:pos].strip() - if segment: - break - # print (pos, repr(segment)) - prev_pos = pos - pos = txt.find('\n', pos+1) - return segment - - def last_non_empty_line(self, txt): - """return the last non-empty line of a block of text (stripped) - do not split or strip the complete txt - """ - pos = txt.rfind('\n') - prev_pos = len(txt) - maxloop = 10 - while pos >= 0: - segment = txt[pos:prev_pos].strip() - if segment: - break - # print (pos, repr(segment)) - prev_pos = pos - pos = txt.rfind('\n', 0, pos-1) - maxloop -= 1 - if maxloop < 0: - break - return segment - - def diff(self, inp, outp, file_name): - import difflib - inl = inp.splitlines(True) # True for keepends - outl = outp.splitlines(True) - diff = difflib.unified_diff(inl, outl, file_name, 'round trip YAML') - # 2.6 difflib has trailing space on filename lines %-) - strip_trailing_space = sys.version_info < (2, 7) - for line in diff: - if strip_trailing_space and line[:4] in ['--- ', '+++ ']: - line = line.rstrip() + '\n' - sys.stdout.write(line) - - -def to_stdout(*args): - sys.stdout.write(' '.join(args)) - - -class YAML_Cmd(ProgramBase): - def __init__(self): - super(YAML_Cmd, self).__init__( - formatter_class=SmartFormatter, - aliases=True, - ) - self._config = None - - # you can put these on __init__, but subclassing YAML_Cmd - # will cause that to break - @option('--verbose', '-v', - help='increase verbosity level', action=CountAction, - const=1, nargs=0, default=0, global_option=True) - @option('--indent', type=int, global_option=True) - @version('version: ' + __version__) - def _pb_init(self): - # special name for which attribs are included in help - pass - - def run(self): - self._yaml = YAML(self._args, self._config) - if self._args.func: - return self._args.func() - - def parse_args(self): - # self._config = AppConfig( - # 'yaml', - # filename=AppConfig.check, - # parser=self._parser, # sets --config option - # warning=to_stdout, - # add_save=False, # add a --save-defaults (to config) option - # ) - # self._config._file_name can be handed to objects that need - # to get other information from the configuration directory - # self._config.set_defaults() - self._parse_args() - - @sub_parser( - aliases=['round-trip'], - help='test round trip on YAML data', - description='test round trip on YAML data', - ) - @option('--save', action='store_true', help="""save the rewritten data back - to the input file (if it doesn't exist a '.orig' backup will be made) - """) - @option('file', nargs='+') - def rt(self): - return self._yaml.round_trip() - - @sub_parser( - aliases=['from-json'], - help='convert json to block YAML', - description='convert json to block YAML', - ) - @option('--flow', action='store_true', - help='use flow instead of block style') - #@option('--literal', action='store_true', - # help='convert scalars with newlines to literal block style') - @option('file', nargs='+') - def json(self): - return self._yaml.from_json() - - @sub_parser( - aliases=['from-ini'], - help='convert .ini/config to block YAML', - description='convert .ini/config to block YAML', - ) - @option('--basename', '-b', action='store_true', - help='re-use basename of file for .yaml file, instead of writing' - ' to stdout') - @option('--test', action='store_true') - @option('file') - def ini(self): - return self._yaml.from_ini() - - @sub_parser( - #aliases=['to-html'], - help='convert YAML to html tables', - description="""convert YAML to html tables. If hierarchy is two deep ( - sequence/mapping over sequence/mapping) this is mapped to one table - If the hierarchy is three deep, a list of 2 deep tables is assumed, but - any non-list/mapp second level items are considered text. - Row level keys are inserted in first column (unless --no-row-key), - item level keys are used as classes for the TD. - """, - ) - @option("--level", action='store_true', help="print # levels and exit") - @option('file') - def htmltable(self): - return self._yaml.to_htmltable() - - @sub_parser('from-html', - help='convert HTML to YAML', - description="""convert HTML to YAML. Tags become keys with as - value a list. The first item in the list is a key value pair with - key ".attribute" if attributes are available followed by tag and string - segment items. Lists with one item are by default flattened. - """, - ) - @option("--no-body", action='store_true', - help="drop top level html and body from HTML code segments") - @option("--strip", action='store_true', - help="strip whitespace surrounding strings") - @option('file') - def from_html(self): - return self._yaml.from_html() - - @sub_parser('from-csv', - aliases=['csv'], - help='convert CSV to YAML', - description="""convert CSV to YAML. - By default generates a list of rows, with the items in a 2nd level - list. - """, - ) - @option('--delimiter') - #@option('--quotechar') - @option('file') - def from_csv(self): - return self._yaml.from_csv() - - if 'test' in sys.argv: - @sub_parser( - description='internal test function', - ) - @option('file', nargs='*') - def test(self): - return self._yaml.test() - - -def main(): - n = YAML_Cmd() - n.parse_args() - sys.exit(n.run()) |