#!/usr/bin/python3 import glob import http.client import json import os import ssl import subprocess import sys import time import urllib.request import yaml def check_file(filename): is_ok = True repo_name = None def diagnostic(file, level, msg): if repo_name is None: print('%s: %s: %s' % (level, filename, msg), file=file) else: print('%s: %s: %s: %s' % (level, filename, repo_name, msg), file=file) def error(msg): nonlocal is_ok is_ok = False diagnostic(sys.stderr, 'E', msg) def info(msg): diagnostic(sys.stdout, 'I', msg) def try_command(args, msg_prefix=''): result = subprocess.run( args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, encoding='utf-8', errors='replace') if result.returncode != 0: error(msg_prefix + result.stderr.rstrip()) else: info(msg_prefix + 'OK') with open(filename) as f: try: obj = yaml.safe_load(f) except yaml.YAMLError: f.seek(0) obj = json.load(f) for repo_name, upstream_def in obj.items(): upstream_type = upstream_def['type'] upstream_url = upstream_def.get('url') # optional for bzr check_certificates = upstream_def.get('check-certificates', True) # Try the cheapest possible operation to verify that the # repository or file is present if upstream_type == 'bzr': cert_options = [] if not check_certificates: cert_options.append('-Ossl.cert_reqs=none') if upstream_url is None: branch_urls = upstream_def['branches'].items() else: branch_urls = [('trunk', upstream_url)] for branch_name, branch_url in branch_urls: try_command(['bzr', 'info', '--quiet', *cert_options, branch_url], msg_prefix=('%s: ' % branch_name)) elif upstream_type == 'cvs': try_command(['cvs', '-d', upstream_url, '-Q', 'rls', upstream_def['module']]) elif upstream_type == 'git': if check_certificates: os.environ.pop('GIT_SSL_NO_VERIFY', None) else: os.environ['GIT_SSL_NO_VERIFY'] = 'true' try_command(['git', 'ls-remote', '--symref', upstream_url, 'HEAD']) elif upstream_type == 'hg': # The hg client doesn't appear to support running any # kind of info/check command without a local clone. # Send a Mercurial capabilities request directly. tls_context = ssl.create_default_context() if not check_certificates: tls_context.check_hostname = False tls_context.verify_mode = ssl.CERT_NONE try: with urllib.request.urlopen( urllib.request.Request( upstream_url + '?cmd=capabilities', headers={ 'Accept': 'application/mercurial-0.1', 'User-Agent': 'mercurial/proto-1.0' }), context=tls_context) as result: if result.info().get('Content-Type') \ != 'application/mercurial-0.1': error('no Mercurial server support detected') else: info('OK') except (urllib.error.URLError, http.client.HTTPException) as e: error(str(e)) elif upstream_type == 'svn': try_command(['svn', 'info', '--non-interactive', upstream_url]) elif upstream_type in ['gzip', 'tarball', 'zip']: try: with urllib.request.urlopen( urllib.request.Request(upstream_url, method='HEAD')) as result: if result.info().get('Content-Type') == 'text/html': error('Content-Type is HTML') elif result.info().get('Content-Length') == '0': error('file is empty') else: info('OK') except (urllib.error.URLError, http.client.HTTPException) as e: error(str(e)) # Throttle requests time.sleep(0.1) return is_ok def main(): all_ok = True if len(sys.argv) > 1: filenames = sys.argv[1:] else: filenames = sorted(glob.glob('*-lorries/*.lorry')) for filename in filenames: if not check_file(filename): all_ok = False sys.exit(0 if all_ok else 1) if __name__ == '__main__': main()