#!/usr/bin/python3
# Copyright (C) 2015-2016 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
'''baserock_definitions_cgit_filter.py
This script takes a .morph file on stdin, and outputs the same text with
HTML links to the commits and repos that it references.
Some of this functionality should live elsewhere: the YAML annotation code
might be cleaned up and submitted to PyYAML, and much of the rest could go
in a library for dealing with Baserock definitions.
When testing this in cgit, you might find these instructions useful:
'''
# We configure this as a source filter across all repos, so it runs for every
# file that cgit ever renders.
import html
import io
import os
import sys
sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
filename = sys.argv[1]
if not filename.endswith('.morph'):
sys.stdout.write(html.escape(sys.stdin.read()))
sys.exit(0)
import urllib.parse
import yaml
# These classes allow you to parse a YAML file with PyYAML and receive a tree
# of YAMLAnnotatedObject() instances as the result. These should function the
# same as the underlying Python objects, for the most part, but you can also
# use the .start_mark and .end_mark attributes to find the exact place in the
# input YAML file that they were defined.
#
# Some things get broken by the wrapper classes. In particular yaml.dump()
# might not work.
class YAMLAnnotatedMixin():
def __init__(self, *args):
self.start_mark = None
self.end_mark = None
class YAMLAnnotatedDict(dict, YAMLAnnotatedMixin):
pass
class YAMLAnnotatedFloat(float, YAMLAnnotatedMixin):
def __new__(cls, value):
return float.__new__(cls, value)
class YAMLAnnotatedInt(int, YAMLAnnotatedMixin):
def __new__(cls, value):
return int.__new__(cls, value)
class YAMLAnnotatedList(list, YAMLAnnotatedMixin):
pass
class YAMLAnnotatedStr(str, YAMLAnnotatedMixin):
def __new__(cls, value):
return str.__new__(cls, value)
class YAMLAnnotatedConstructor(yaml.constructor.SafeConstructor):
def construct_yaml_int(self, node):
data = yaml.constructor.SafeConstructor.construct_yaml_int(self, node)
annotated_data = YAMLAnnotatedInt(data)
annotated_data.start_mark = node.start_mark
annotated_data.end_mark = node.end_mark
return annotated_data
def construct_yaml_float(self, node):
data = yaml.constructor.SafeConstructor.construct_yaml_float(self, node)
annotated_data = YAMLAnnotatedFloat(data)
annotated_data.start_mark = node.start_mark
annotated_data.end_mark = node.end_mark
return annotated_data
def construct_yaml_str(self, node):
data = self.construct_scalar(node)
annotated_data = YAMLAnnotatedStr(data)
annotated_data.start_mark = node.start_mark
annotated_data.end_mark = node.end_mark
return annotated_data
def construct_yaml_seq(self, node):
annotated_data = YAMLAnnotatedList()
annotated_data.start_mark = node.start_mark
annotated_data.end_mark = node.end_mark
yield annotated_data
data = self.construct_sequence(node)
annotated_data.extend(data)
def construct_yaml_map(self, node):
annotated_data = YAMLAnnotatedDict()
annotated_data.start_mark = node.start_mark
annotated_data.end_mark = node.end_mark
yield annotated_data
data = self.construct_mapping(node)
annotated_data.update(data)
YAMLAnnotatedConstructor.add_constructor(
'tag:yaml.org,2002:int',
YAMLAnnotatedConstructor.construct_yaml_int)
YAMLAnnotatedConstructor.add_constructor(
'tag:yaml.org,2002:float',
YAMLAnnotatedConstructor.construct_yaml_float)
YAMLAnnotatedConstructor.add_constructor(
'tag:yaml.org,2002:str',
YAMLAnnotatedConstructor.construct_yaml_str)
YAMLAnnotatedConstructor.add_constructor(
'tag:yaml.org,2002:seq',
YAMLAnnotatedConstructor.construct_yaml_seq)
YAMLAnnotatedConstructor.add_constructor(
'tag:yaml.org,2002:map',
YAMLAnnotatedConstructor.construct_yaml_map)
class YAMLAnnotatedLoader(yaml.reader.Reader, yaml.scanner.Scanner,
yaml.parser.Parser, yaml.composer.Composer,
YAMLAnnotatedConstructor, yaml.resolver.Resolver):
'''Loader class for use with yaml.load(), to provide annotated results.'''
def __init__(self, stream):
yaml.reader.Reader.__init__(self, stream)
yaml.scanner.Scanner.__init__(self)
yaml.parser.Parser.__init__(self)
yaml.composer.Composer.__init__(self)
YAMLAnnotatedConstructor.__init__(self)
yaml.resolver.Resolver.__init__(self)
class TagAnnotation():
'''Generic class for adding markup to plain text.'''
def __init__(self, start_index, end_index, start_text, end_text):
self.start_index = start_index
self.end_index = end_index
self.start_text = start_text
self.end_text = end_text
def apply_tag_annotations(input_text, annotations, index_offset=0):
def annotations_with_range(annotations_to_filter, start_index, end_index):
return [a for a in annotations_to_filter
if a.start_index >= start_index and a.end_index <= end_index]
def sorted_annotations(annotations_to_sort):
return sorted(annotations_to_sort, key=lambda a: a.start_index)
annotations = sorted_annotations(annotations)
previous_pos = 0
segments = []
while len(annotations) > 0:
top_annotation = annotations[0]
annotations = annotations[1:]
sub_annotations = annotations_with_range(
annotations, top_annotation.start_index, top_annotation.end_index)
for a in sub_annotations:
annotations.remove(a)
start_text = input_text[previous_pos:
top_annotation.start_index - index_offset]
sub_input = input_text[top_annotation.start_index - index_offset:
top_annotation.end_index - index_offset]
sub_output = apply_tag_annotations(
sub_input, sub_annotations,
index_offset=top_annotation.start_index)
segments.extend([start_text, top_annotation.start_text, sub_output,
top_annotation.end_text])
previous_pos = top_annotation.end_index - index_offset
end_text = input_text[previous_pos:]
return ''.join(segments + [end_text])
CGIT_ALIASES = {
'baserock': '/cgit/baserock/%(repo)s.git',
'upstream': '/cgit/delta/%(repo)s.git',
}
def repo_cgit_url(reponame):
'''Returns a cgit URL for the current host for a given repo.
Aliases in the repo name are parsed, but only the default fields are
currently understand (baserock: and upstream:).
FIXME: For users who have their own troves, we need to understand all
the prefixes they want to define. This is a good argument for putting
the repo aliases into DEFAULTS or somewhere, I guess!
'''
repo_url = None
for prefix, url_pattern in CGIT_ALIASES.items():
if reponame.startswith(prefix):
repo = urllib.parse.quote(reponame[len(prefix) + 1:])
if repo.endswith('.git'):
repo = repo[:-4]
repo_url = url_pattern % dict(repo=repo)
break
else:
repo_url = reponame
return repo_url
CGIT_REPO_URL = os.environ.get('CGIT_REPO_URL', 'missing-cgit-repo-url')
def filtered_query_string(query_string):
query_list = urllib.parse.parse_qsl(query_string)
filtered_query_list = [
(key, value) for (key, value) in query_list if key != 'url']
return urllib.parse.urlencode(filtered_query_list)
# Parse the query string to preserve the parameters (commit ID etc.) when
# we generate URLs within this repo. We need to remove the 'url' parameter
# if present, though.
QUERY_STRING = filtered_query_string(os.environ.get('QUERY_STRING', ''))
def repo_file_cgit_url(relative_path):
'''Returns a cgit URL for the given Morph file.
Morph file paths are relative to the top of the current repo.
'''
url = '/cgit/' + CGIT_REPO_URL + '/tree/'
url += urllib.parse.quote(relative_path)
if len(QUERY_STRING) > 0:
url += '?' + QUERY_STRING
return url
class HyperlinkAddition():
def load_annotated_morphology_text(self, text):
return yaml.load(text, YAMLAnnotatedLoader)
def process_morphology_text(self, input_text):
morph = self.load_annotated_morphology_text(input_text)
if not isinstance(morph, dict):
raise RuntimeError("Morphology is not a dict.")
html_annotations = []
if morph['kind'] == 'stratum':
for build_dep in morph.get('build-depends', []):
html_annotations.extend(self.format_stratumref(build_dep))
for chunkref in morph.get('chunks', []):
html_annotations.extend(self.format_chunkref(chunkref))
elif morph['kind'] == 'system':
for stratum in morph.get('strata', []):
html_annotations.extend(self.format_stratumref(stratum))
for extension in morph.get('configuration-extensions', []):
file_url = repo_file_cgit_url(extension + '.configure')
html_annotations.append(
self.annotate_link(extension, file_url))
elif morph['kind'] == 'cluster':
for system in morph.get('systems', []):
html_annotations.extend(self.format_systemref(system))
return apply_tag_annotations(input_text, html_annotations)
def annotate_link(self, yaml_annotated_object, link_target):
return TagAnnotation(yaml_annotated_object.start_mark.index,
yaml_annotated_object.end_mark.index,
'' % html.escape(link_target),
'')
def format_chunkref(self, chunkref):
'''Format links in a ChunkReference.
The ChunkReference entity is how strata refer to the individual
chunks to be included in that stratum.
'''
html_annotations = []
if 'repo' in chunkref:
repo_url = repo_cgit_url(chunkref['repo'])
html_annotations.append(
self.annotate_link(chunkref['repo'], repo_url))
if 'ref' in chunkref:
ref_url = repo_url + '/commit/?id=' + \
urllib.parse.quote(chunkref['ref'])
html_annotations.append(
self.annotate_link(chunkref['ref'], ref_url))
if 'unpetrify-ref' in chunkref:
unpetrify_ref_url = repo_url + '/log/?h=' + \
urllib.parse.quote(str(chunkref['unpetrify-ref']))
html_annotations.append(
self.annotate_link(chunkref['unpetrify-ref'],
unpetrify_ref_url))
if 'morph' in chunkref:
file_url = repo_file_cgit_url(chunkref['morph'])
html_annotations.append(
self.annotate_link(chunkref['morph'], file_url))
return html_annotations
def format_stratumref(self, stratumref):
html_annotations = []
if 'morph' in stratumref:
file_url = repo_file_cgit_url(stratumref['morph'])
html_annotations.append(
self.annotate_link(stratumref['morph'], file_url))
return html_annotations
def format_systemref(self, systemref):
html_annotations = self.format_stratumref(systemref)
for deployment in systemref.get('deploy', []).values():
if 'type' in deployment:
file_url = repo_file_cgit_url(deployment['type'] + '.write')
html_annotations.append(
self.annotate_link(deployment['type'], file_url))
for subsystem in systemref.get('subsystems', []):
html_annotations.extend(
self.format_systemref(subsystem))
return html_annotations
if __name__ == "__main__":
input_text = sys.stdin.read()
try:
output_text = HyperlinkAddition().process_morphology_text(input_text)
sys.stdout.write(output_text)
except Exception as e:
sys.stderr.write("Exception parsing %s: %s\n" % (filename, e))
sys.stdout.write(input_text)