#!/usr/bin/python3 # Copyright (C) 2015-2016 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program. If not, see . '''baserock_definitions_cgit_filter.py This script takes a .morph file on stdin, and outputs the same text with HTML links to the commits and repos that it references. Some of this functionality should live elsewhere: the YAML annotation code might be cleaned up and submitted to PyYAML, and much of the rest could go in a library for dealing with Baserock definitions. When testing this in cgit, you might find these instructions useful: ''' # We configure this as a source filter across all repos, so it runs for every # file that cgit ever renders. import html import io import os import sys sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') filename = sys.argv[1] if not filename.endswith('.morph'): sys.stdout.write(html.escape(sys.stdin.read())) sys.exit(0) import urllib.parse import yaml # These classes allow you to parse a YAML file with PyYAML and receive a tree # of YAMLAnnotatedObject() instances as the result. These should function the # same as the underlying Python objects, for the most part, but you can also # use the .start_mark and .end_mark attributes to find the exact place in the # input YAML file that they were defined. # # Some things get broken by the wrapper classes. In particular yaml.dump() # might not work. class YAMLAnnotatedMixin(): def __init__(self, *args): self.start_mark = None self.end_mark = None class YAMLAnnotatedDict(dict, YAMLAnnotatedMixin): pass class YAMLAnnotatedFloat(float, YAMLAnnotatedMixin): def __new__(cls, value): return float.__new__(cls, value) class YAMLAnnotatedInt(int, YAMLAnnotatedMixin): def __new__(cls, value): return int.__new__(cls, value) class YAMLAnnotatedList(list, YAMLAnnotatedMixin): pass class YAMLAnnotatedStr(str, YAMLAnnotatedMixin): def __new__(cls, value): return str.__new__(cls, value) class YAMLAnnotatedConstructor(yaml.constructor.SafeConstructor): def construct_yaml_int(self, node): data = yaml.constructor.SafeConstructor.construct_yaml_int(self, node) annotated_data = YAMLAnnotatedInt(data) annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark return annotated_data def construct_yaml_float(self, node): data = yaml.constructor.SafeConstructor.construct_yaml_float(self, node) annotated_data = YAMLAnnotatedFloat(data) annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark return annotated_data def construct_yaml_str(self, node): data = self.construct_scalar(node) annotated_data = YAMLAnnotatedStr(data) annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark return annotated_data def construct_yaml_seq(self, node): annotated_data = YAMLAnnotatedList() annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark yield annotated_data data = self.construct_sequence(node) annotated_data.extend(data) def construct_yaml_map(self, node): annotated_data = YAMLAnnotatedDict() annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark yield annotated_data data = self.construct_mapping(node) annotated_data.update(data) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:int', YAMLAnnotatedConstructor.construct_yaml_int) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:float', YAMLAnnotatedConstructor.construct_yaml_float) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:str', YAMLAnnotatedConstructor.construct_yaml_str) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:seq', YAMLAnnotatedConstructor.construct_yaml_seq) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:map', YAMLAnnotatedConstructor.construct_yaml_map) class YAMLAnnotatedLoader(yaml.reader.Reader, yaml.scanner.Scanner, yaml.parser.Parser, yaml.composer.Composer, YAMLAnnotatedConstructor, yaml.resolver.Resolver): '''Loader class for use with yaml.load(), to provide annotated results.''' def __init__(self, stream): yaml.reader.Reader.__init__(self, stream) yaml.scanner.Scanner.__init__(self) yaml.parser.Parser.__init__(self) yaml.composer.Composer.__init__(self) YAMLAnnotatedConstructor.__init__(self) yaml.resolver.Resolver.__init__(self) class TagAnnotation(): '''Generic class for adding markup to plain text.''' def __init__(self, start_index, end_index, start_text, end_text): self.start_index = start_index self.end_index = end_index self.start_text = start_text self.end_text = end_text def apply_tag_annotations(input_text, annotations, index_offset=0): def annotations_with_range(annotations_to_filter, start_index, end_index): return [a for a in annotations_to_filter if a.start_index >= start_index and a.end_index <= end_index] def sorted_annotations(annotations_to_sort): return sorted(annotations_to_sort, key=lambda a: a.start_index) annotations = sorted_annotations(annotations) previous_pos = 0 segments = [] while len(annotations) > 0: top_annotation = annotations[0] annotations = annotations[1:] sub_annotations = annotations_with_range( annotations, top_annotation.start_index, top_annotation.end_index) for a in sub_annotations: annotations.remove(a) start_text = input_text[previous_pos: top_annotation.start_index - index_offset] sub_input = input_text[top_annotation.start_index - index_offset: top_annotation.end_index - index_offset] sub_output = apply_tag_annotations( sub_input, sub_annotations, index_offset=top_annotation.start_index) segments.extend([start_text, top_annotation.start_text, sub_output, top_annotation.end_text]) previous_pos = top_annotation.end_index - index_offset end_text = input_text[previous_pos:] return ''.join(segments + [end_text]) CGIT_ALIASES = { 'baserock': '/cgit/baserock/%(repo)s.git', 'upstream': '/cgit/delta/%(repo)s.git', } def repo_cgit_url(reponame): '''Returns a cgit URL for the current host for a given repo. Aliases in the repo name are parsed, but only the default fields are currently understand (baserock: and upstream:). FIXME: For users who have their own troves, we need to understand all the prefixes they want to define. This is a good argument for putting the repo aliases into DEFAULTS or somewhere, I guess! ''' repo_url = None for prefix, url_pattern in CGIT_ALIASES.items(): if reponame.startswith(prefix): repo = urllib.parse.quote(reponame[len(prefix) + 1:]) if repo.endswith('.git'): repo = repo[:-4] repo_url = url_pattern % dict(repo=repo) break else: repo_url = reponame return repo_url CGIT_REPO_URL = os.environ.get('CGIT_REPO_URL', 'missing-cgit-repo-url') def filtered_query_string(query_string): query_list = urllib.parse.parse_qsl(query_string) filtered_query_list = [ (key, value) for (key, value) in query_list if key != 'url'] return urllib.parse.urlencode(filtered_query_list) # Parse the query string to preserve the parameters (commit ID etc.) when # we generate URLs within this repo. We need to remove the 'url' parameter # if present, though. QUERY_STRING = filtered_query_string(os.environ.get('QUERY_STRING', '')) def repo_file_cgit_url(relative_path): '''Returns a cgit URL for the given Morph file. Morph file paths are relative to the top of the current repo. ''' url = '/cgit/' + CGIT_REPO_URL + '/tree/' url += urllib.parse.quote(relative_path) if len(QUERY_STRING) > 0: url += '?' + QUERY_STRING return url class HyperlinkAddition(): def load_annotated_morphology_text(self, text): return yaml.load(text, YAMLAnnotatedLoader) def process_morphology_text(self, input_text): morph = self.load_annotated_morphology_text(input_text) if not isinstance(morph, dict): raise RuntimeError("Morphology is not a dict.") html_annotations = [] if morph['kind'] == 'stratum': for build_dep in morph.get('build-depends', []): html_annotations.extend(self.format_stratumref(build_dep)) for chunkref in morph.get('chunks', []): html_annotations.extend(self.format_chunkref(chunkref)) elif morph['kind'] == 'system': for stratum in morph.get('strata', []): html_annotations.extend(self.format_stratumref(stratum)) for extension in morph.get('configuration-extensions', []): file_url = repo_file_cgit_url(extension + '.configure') html_annotations.append( self.annotate_link(extension, file_url)) elif morph['kind'] == 'cluster': for system in morph.get('systems', []): html_annotations.extend(self.format_systemref(system)) return apply_tag_annotations(input_text, html_annotations) def annotate_link(self, yaml_annotated_object, link_target): return TagAnnotation(yaml_annotated_object.start_mark.index, yaml_annotated_object.end_mark.index, '' % html.escape(link_target), '') def format_chunkref(self, chunkref): '''Format links in a ChunkReference. The ChunkReference entity is how strata refer to the individual chunks to be included in that stratum. ''' html_annotations = [] if 'repo' in chunkref: repo_url = repo_cgit_url(chunkref['repo']) html_annotations.append( self.annotate_link(chunkref['repo'], repo_url)) if 'ref' in chunkref: ref_url = repo_url + '/commit/?id=' + \ urllib.parse.quote(chunkref['ref']) html_annotations.append( self.annotate_link(chunkref['ref'], ref_url)) if 'unpetrify-ref' in chunkref: unpetrify_ref_url = repo_url + '/log/?h=' + \ urllib.parse.quote(str(chunkref['unpetrify-ref'])) html_annotations.append( self.annotate_link(chunkref['unpetrify-ref'], unpetrify_ref_url)) if 'morph' in chunkref: file_url = repo_file_cgit_url(chunkref['morph']) html_annotations.append( self.annotate_link(chunkref['morph'], file_url)) return html_annotations def format_stratumref(self, stratumref): html_annotations = [] if 'morph' in stratumref: file_url = repo_file_cgit_url(stratumref['morph']) html_annotations.append( self.annotate_link(stratumref['morph'], file_url)) return html_annotations def format_systemref(self, systemref): html_annotations = self.format_stratumref(systemref) for deployment in systemref.get('deploy', []).values(): if 'type' in deployment: file_url = repo_file_cgit_url(deployment['type'] + '.write') html_annotations.append( self.annotate_link(deployment['type'], file_url)) for subsystem in systemref.get('subsystems', []): html_annotations.extend( self.format_systemref(subsystem)) return html_annotations if __name__ == "__main__": input_text = sys.stdin.read() try: output_text = HyperlinkAddition().process_morphology_text(input_text) sys.stdout.write(output_text) except Exception as e: sys.stderr.write("Exception parsing %s: %s\n" % (filename, e)) sys.stdout.write(input_text)