summaryrefslogtreecommitdiff
path: root/scripts/check-lorry-urls
blob: 94d659cdabc67b33d5e09774531cf2c2ecaac748 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/python3

import glob
import http.client
import json
import os
import ssl
import subprocess
import sys
import time
import urllib.request
import yaml


def check_file(filename):
    is_ok = True
    repo_name = None

    def diagnostic(file, level, msg):
        if repo_name is None:
            print('%s: %s: %s' % (level, filename, msg),
                  file=file)
        else:
            print('%s: %s: %s: %s' % (level, filename, repo_name, msg),
                  file=file)

    def error(msg):
        nonlocal is_ok
        is_ok = False
        diagnostic(sys.stderr, 'E', msg)

    def info(msg):
        diagnostic(sys.stdout, 'I', msg)

    def try_command(args, msg_prefix=''):
        result = subprocess.run(
            args,
            stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
            encoding='utf-8', errors='replace')
        if result.returncode != 0:
            error(msg_prefix + result.stderr.rstrip())
        else:
            info(msg_prefix + 'OK')

    with open(filename) as f:
        try:
            obj = yaml.safe_load(f)
        except yaml.YAMLError:
            f.seek(0)
            obj = json.load(f)

        for repo_name, upstream_def in obj.items():
            upstream_type = upstream_def['type']
            upstream_url = upstream_def.get('url')  # optional for bzr
            check_certificates = upstream_def.get('check-certificates', True)

            # Try the cheapest possible operation to verify that the
            # repository or file is present

            if upstream_type == 'bzr':
                cert_options = []
                if not check_certificates:
                    cert_options.append('-Ossl.cert_reqs=none')
                if upstream_url is None:
                    branch_urls = upstream_def['branches'].items()
                else:
                    branch_urls = [('trunk', upstream_url)]
                for branch_name, branch_url in branch_urls:
                    try_command(['bzr', 'info', '--quiet', *cert_options,
                                 branch_url],
                                msg_prefix=('%s: ' % branch_name))

            elif upstream_type == 'cvs':
                try_command(['cvs', '-d', upstream_url, '-Q', 'rls',
                             upstream_def['module']])

            elif upstream_type == 'git':
                if check_certificates:
                    os.environ.pop('GIT_SSL_NO_VERIFY', None)
                else:
                    os.environ['GIT_SSL_NO_VERIFY'] = 'true'
                try_command(['git', 'ls-remote', '--symref',
                             upstream_url, 'HEAD'])

            elif upstream_type == 'hg':
                # The hg client doesn't appear to support running any
                # kind of info/check command without a local clone.
                # Send a Mercurial capabilities request directly.
                tls_context = ssl.create_default_context()
                if not check_certificates:
                    tls_context.check_hostname = False
                    tls_context.verify_mode = ssl.CERT_NONE
                try:
                    with urllib.request.urlopen(
                            urllib.request.Request(
                                upstream_url + '?cmd=capabilities',
                                headers={
                                    'Accept': 'application/mercurial-0.1',
                                    'User-Agent': 'mercurial/proto-1.0'
                                }),
                            context=tls_context) as result:
                        if result.info().get('Content-Type') \
                           != 'application/mercurial-0.1':
                            error('no Mercurial server support detected')
                except (urllib.error.URLError, http.client.HTTPException) as e:
                    error(str(e))
                else:
                    info('OK')

            elif upstream_type == 'svn':
                try_command(['svn', 'info', '--non-interactive', upstream_url])

            elif upstream_type in ['gzip', 'tarball', 'zip']:
                try:
                    with urllib.request.urlopen(
                            urllib.request.Request(upstream_url,
                                                   method='HEAD')) as result:
                        pass
                except (urllib.error.URLError, http.client.HTTPException) as e:
                    error(str(e))
                else:
                    info('OK')

            # Throttle requests
            time.sleep(0.1)

    return is_ok


def main():
    all_ok = True

    if len(sys.argv) > 1:
        filenames = sys.argv[1:]
    else:
        filenames = sorted(glob.glob('*-lorries/*.lorry'))
    for filename in filenames:
        if not check_file(filename):
            all_ok = False

    sys.exit(0 if all_ok else 1)


if __name__ == '__main__':
    main()