#!/usr/bin/python3 # Copyright (C) 2015-2016 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program. If not, see . '''baserock_definitions_cgit_filter.py This script takes a .morph file on stdin, and outputs the same text with HTML links to the commits and repos that it references. Some of this functionality should live elsewhere: the YAML annotation code might be cleaned up and submitted to PyYAML, and much of the rest could go in a library for dealing with Baserock definitions. When testing this in cgit, you might find these instructions useful: ''' # We configure this as a source filter across all repos, so it runs for every # file that cgit ever renders. import html import io import os import sys sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') filename = sys.argv[1] if not filename.endswith('.morph'): sys.stdout.write(html.escape(sys.stdin.read())) sys.exit(0) import urllib.parse import yaml # These classes allow you to parse a YAML file with PyYAML and receive a tree # of YAMLAnnotatedObject() instances as the result. These should function the # same as the underlying Python objects, for the most part, but you can also # use the .start_mark and .end_mark attributes to find the exact place in the # input YAML file that they were defined. # # Some things get broken by the wrapper classes. In particular yaml.dump() # might not work. class YAMLAnnotatedMixin(): def __init__(self, *args): self.start_mark = None self.end_mark = None class YAMLAnnotatedDict(dict, YAMLAnnotatedMixin): pass class YAMLAnnotatedFloat(float, YAMLAnnotatedMixin): def __new__(cls, value): return float.__new__(cls, value) class YAMLAnnotatedInt(int, YAMLAnnotatedMixin): def __new__(cls, value): return int.__new__(cls, value) class YAMLAnnotatedList(list, YAMLAnnotatedMixin): pass class YAMLAnnotatedStr(str, YAMLAnnotatedMixin): def __new__(cls, value): return str.__new__(cls, value) class YAMLAnnotatedConstructor(yaml.constructor.SafeConstructor): def construct_yaml_int(self, node): data = yaml.constructor.SafeConstructor.construct_yaml_int(self, node) annotated_data = YAMLAnnotatedInt(data) annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark return annotated_data def construct_yaml_float(self, node): data = yaml.constructor.SafeConstructor.construct_yaml_float(self, node) annotated_data = YAMLAnnotatedFloat(data) annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark return annotated_data def construct_yaml_str(self, node): data = self.construct_scalar(node) annotated_data = YAMLAnnotatedStr(data) annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark return annotated_data def construct_yaml_seq(self, node): annotated_data = YAMLAnnotatedList() annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark yield annotated_data data = self.construct_sequence(node) annotated_data.extend(data) def construct_yaml_map(self, node): annotated_data = YAMLAnnotatedDict() annotated_data.start_mark = node.start_mark annotated_data.end_mark = node.end_mark yield annotated_data data = self.construct_mapping(node) annotated_data.update(data) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:int', YAMLAnnotatedConstructor.construct_yaml_int) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:float', YAMLAnnotatedConstructor.construct_yaml_float) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:str', YAMLAnnotatedConstructor.construct_yaml_str) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:seq', YAMLAnnotatedConstructor.construct_yaml_seq) YAMLAnnotatedConstructor.add_constructor( 'tag:yaml.org,2002:map', YAMLAnnotatedConstructor.construct_yaml_map) class YAMLAnnotatedLoader(yaml.reader.Reader, yaml.scanner.Scanner, yaml.parser.Parser, yaml.composer.Composer, YAMLAnnotatedConstructor, yaml.resolver.Resolver): '''Loader class for use with yaml.load(), to provide annotated results.''' def __init__(self, stream): yaml.reader.Reader.__init__(self, stream) yaml.scanner.Scanner.__init__(self) yaml.parser.Parser.__init__(self) yaml.composer.Composer.__init__(self) YAMLAnnotatedConstructor.__init__(self) yaml.resolver.Resolver.__init__(self) class TagAnnotation(): '''Generic class for adding markup to plain text. For example, if you have the string 'Hello world' and you want to add HTML tags to 'world' to make it bold, you could do: text = 'Hello world' annotation = TagAnnotation(6, 10, '', '') result = apply_tag_annotations(text, [annotation]) ''' def __init__(self, start_index, end_index, start_text, end_text): self.start_index = start_index self.end_index = end_index self.start_text = start_text self.end_text = end_text identity = lambda arg: arg def apply_tag_annotations(input_text, annotations, escape_fn=identity, index_offset=0): '''Apply a list of TagAnnotation objects to some input text. This function can be used to add HTML annotations to text. If you are doing this, you should ensure that the input text is quoted, by passing `escape_fn=html.escape`. ''' def annotations_with_range(annotations_to_filter, start_index, end_index): return [a for a in annotations_to_filter if a.start_index >= start_index and a.end_index <= end_index] def sorted_annotations(annotations_to_sort): return sorted(annotations_to_sort, key=lambda a: a.start_index) annotations = sorted_annotations(annotations) previous_pos = 0 segments = [] while len(annotations) > 0: top_annotation = annotations[0] annotations = annotations[1:] sub_annotations = annotations_with_range( annotations, top_annotation.start_index, top_annotation.end_index) for a in sub_annotations: annotations.remove(a) start_text = escape_fn( input_text[previous_pos:top_annotation.start_index - index_offset]) sub_input = input_text[top_annotation.start_index - index_offset: top_annotation.end_index - index_offset] sub_output = apply_tag_annotations( sub_input, sub_annotations, escape_fn=escape_fn, index_offset=top_annotation.start_index) segments.extend([start_text, top_annotation.start_text, sub_output, top_annotation.end_text]) previous_pos = top_annotation.end_index - index_offset end_text = escape_fn(input_text[previous_pos:]) return ''.join(segments + [end_text]) CGIT_ALIASES = { 'baserock': '/cgit/baserock/%(repo)s.git', 'upstream': '/cgit/delta/%(repo)s.git', } def repo_cgit_url(reponame): '''Returns a cgit URL for the current host for a given repo. Aliases in the repo name are parsed, but only the default fields are currently understand (baserock: and upstream:). FIXME: For users who have their own troves, we need to understand all the prefixes they want to define. This is a good argument for putting the repo aliases into DEFAULTS or somewhere, I guess! ''' repo_url = None for prefix, url_pattern in CGIT_ALIASES.items(): if reponame.startswith(prefix): repo = urllib.parse.quote(reponame[len(prefix) + 1:]) if repo.endswith('.git'): repo = repo[:-4] repo_url = url_pattern % dict(repo=repo) break else: repo_url = reponame return repo_url CGIT_REPO_URL = os.environ.get('CGIT_REPO_URL', 'missing-cgit-repo-url') def filtered_query_string(query_string): query_list = urllib.parse.parse_qsl(query_string) filtered_query_list = [ (key, value) for (key, value) in query_list if key != 'url'] return urllib.parse.urlencode(filtered_query_list) # Parse the query string to preserve the parameters (commit ID etc.) when # we generate URLs within this repo. We need to remove the 'url' parameter # if present, though. QUERY_STRING = filtered_query_string(os.environ.get('QUERY_STRING', '')) def repo_file_cgit_url(relative_path): '''Returns a cgit URL for the given Morph file. Morph file paths are relative to the top of the current repo. ''' url = '/cgit/' + CGIT_REPO_URL + '/tree/' url += urllib.parse.quote(relative_path) if len(QUERY_STRING) > 0: url += '?' + QUERY_STRING return url class HyperlinkAddition(): def load_annotated_morphology_text(self, text): return yaml.load(text, YAMLAnnotatedLoader) def process_morphology_text(self, input_text): morph = self.load_annotated_morphology_text(input_text) if not isinstance(morph, dict): raise RuntimeError("Morphology is not a dict.") html_annotations = [] if morph['kind'] == 'stratum': for build_dep in morph.get('build-depends', []): html_annotations.extend(self.format_stratumref(build_dep)) for chunkref in morph.get('chunks', []): html_annotations.extend(self.format_chunkref(chunkref)) elif morph['kind'] == 'system': for stratum in morph.get('strata', []): html_annotations.extend(self.format_stratumref(stratum)) for extension in morph.get('configuration-extensions', []): file_url = repo_file_cgit_url(extension + '.configure') html_annotations.append( self.annotate_link(extension, file_url)) elif morph['kind'] == 'cluster': for system in morph.get('systems', []): html_annotations.extend(self.format_systemref(system)) return apply_tag_annotations( input_text, html_annotations, escape_fn=html.escape) def annotate_link(self, yaml_annotated_object, link_target): return TagAnnotation(yaml_annotated_object.start_mark.index, yaml_annotated_object.end_mark.index, '' % html.escape(link_target), '') def format_chunkref(self, chunkref): '''Format links in a ChunkReference. The ChunkReference entity is how strata refer to the individual chunks to be included in that stratum. ''' html_annotations = [] if 'repo' in chunkref: repo_url = repo_cgit_url(chunkref['repo']) html_annotations.append( self.annotate_link(chunkref['repo'], repo_url)) if 'ref' in chunkref: ref_url = repo_url + '/commit/?id=' + \ urllib.parse.quote(chunkref['ref']) html_annotations.append( self.annotate_link(chunkref['ref'], ref_url)) if 'unpetrify-ref' in chunkref: unpetrify_ref_url = repo_url + '/log/?h=' + \ urllib.parse.quote(str(chunkref['unpetrify-ref'])) html_annotations.append( self.annotate_link(chunkref['unpetrify-ref'], unpetrify_ref_url)) if 'morph' in chunkref: file_url = repo_file_cgit_url(chunkref['morph']) html_annotations.append( self.annotate_link(chunkref['morph'], file_url)) return html_annotations def format_stratumref(self, stratumref): html_annotations = [] if 'morph' in stratumref: file_url = repo_file_cgit_url(stratumref['morph']) html_annotations.append( self.annotate_link(stratumref['morph'], file_url)) return html_annotations def format_systemref(self, systemref): html_annotations = self.format_stratumref(systemref) for deployment in systemref.get('deploy', []).values(): if 'type' in deployment: file_url = repo_file_cgit_url(deployment['type'] + '.write') html_annotations.append( self.annotate_link(deployment['type'], file_url)) for subsystem in systemref.get('subsystems', []): html_annotations.extend( self.format_systemref(subsystem)) return html_annotations if __name__ == "__main__": input_text = sys.stdin.read() try: output_text = HyperlinkAddition().process_morphology_text(input_text) sys.stdout.write(output_text) except Exception as e: sys.stderr.write("Exception parsing %s: %s\n" % (filename, e)) sys.stdout.write(input_text)