1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
#!/usr/bin/python3
import glob
import http.client
import json
import os
import ssl
import subprocess
import sys
import time
import urllib.request
import yaml
def check_file(filename):
is_ok = True
repo_name = None
def diagnostic(file, level, msg):
if repo_name is None:
print('%s: %s: %s' % (level, filename, msg),
file=file)
else:
print('%s: %s: %s: %s' % (level, filename, repo_name, msg),
file=file)
def error(msg):
nonlocal is_ok
is_ok = False
diagnostic(sys.stderr, 'E', msg)
def info(msg):
diagnostic(sys.stdout, 'I', msg)
def try_command(args, msg_prefix=''):
result = subprocess.run(
args,
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
encoding='utf-8', errors='replace')
if result.returncode != 0:
error(msg_prefix + result.stderr.rstrip())
else:
info(msg_prefix + 'OK')
with open(filename) as f:
try:
obj = yaml.safe_load(f)
except yaml.YAMLError:
f.seek(0)
obj = json.load(f)
for repo_name, upstream_def in obj.items():
upstream_type = upstream_def['type']
upstream_url = upstream_def.get('url') # optional for bzr
check_certificates = upstream_def.get('check-certificates', True)
# Try the cheapest possible operation to verify that the
# repository or file is present
if upstream_type == 'bzr':
cert_options = []
if not check_certificates:
cert_options.append('-Ossl.cert_reqs=none')
if upstream_url is None:
branch_urls = upstream_def['branches'].items()
else:
branch_urls = [('trunk', upstream_url)]
for branch_name, branch_url in branch_urls:
try_command(['bzr', 'info', '--quiet', *cert_options,
branch_url],
msg_prefix=('%s: ' % branch_name))
elif upstream_type == 'cvs':
try_command(['cvs', '-d', upstream_url, '-Q', 'rls',
upstream_def['module']])
elif upstream_type == 'git':
if check_certificates:
os.environ.pop('GIT_SSL_NO_VERIFY', None)
else:
os.environ['GIT_SSL_NO_VERIFY'] = 'true'
try_command(['git', 'ls-remote', '--symref',
upstream_url, 'HEAD'])
elif upstream_type == 'hg':
# The hg client doesn't appear to support running any
# kind of info/check command without a local clone.
# Send a Mercurial capabilities request directly.
tls_context = ssl.create_default_context()
if not check_certificates:
tls_context.check_hostname = False
tls_context.verify_mode = ssl.CERT_NONE
try:
with urllib.request.urlopen(
urllib.request.Request(
upstream_url + '?cmd=capabilities',
headers={
'Accept': 'application/mercurial-0.1',
'User-Agent': 'mercurial/proto-1.0'
}),
context=tls_context) as result:
if result.info().get('Content-Type') \
!= 'application/mercurial-0.1':
error('no Mercurial server support detected')
else:
info('OK')
except (urllib.error.URLError, http.client.HTTPException) as e:
error(str(e))
elif upstream_type == 'svn':
try_command(['svn', 'info', '--non-interactive', upstream_url])
elif upstream_type in ['gzip', 'tarball', 'zip']:
try:
with urllib.request.urlopen(
urllib.request.Request(upstream_url,
method='HEAD')) as result:
if result.info().get('Content-Type') == 'text/html':
error('Content-Type is HTML')
elif result.info().get('Content-Length') == '0':
error('file is empty')
else:
info('OK')
except (urllib.error.URLError, http.client.HTTPException) as e:
error(str(e))
# Throttle requests
time.sleep(0.1)
return is_ok
def main():
all_ok = True
if len(sys.argv) > 1:
filenames = sys.argv[1:]
else:
filenames = sorted(glob.glob('*-lorries/*.lorry'))
for filename in filenames:
if not check_file(filename):
all_ok = False
sys.exit(0 if all_ok else 1)
if __name__ == '__main__':
main()
|