#!/usr/bin/env python # -*- coding: utf-8 -*- # # Create a Baserock .lorry file for a given Python package # # Copyright © 2014, 2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from __future__ import print_function import subprocess import requests import json import sys import shutil import tempfile import xmlrpclib import logging import yaml import pkg_resources from importer_python_common import * def fetch_package_metadata(package_name): try: result = requests.get('%s/%s/json' % (PYPI_URL, package_name)) # raise exception if status code is not 200 OK result.raise_for_status() except Exception as e: error("Couldn't fetch package metadata:", e) return result.json() def find_repo_type(url): debug_vcss = False # Don't bother with detection if we can't get a 200 OK logging.debug("Getting '%s' ..." % url) status_code = requests.get(url).status_code if status_code != 200: logging.debug('Got %d status code from %s, aborting repo detection' % (status_code, url)) return None logging.debug('200 OK for %s' % url) logging.debug('Finding repo type for %s' % url) vcss = [('git', 'clone'), ('hg', 'clone'), ('svn', 'checkout'), ('bzr', 'branch')] for (vcs, vcs_command) in vcss: logging.debug('Trying %s %s' % (vcs, vcs_command)) tempdir = tempfile.mkdtemp() p = subprocess.Popen([vcs, vcs_command, url], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=tempdir) # We close stdin on parent side to prevent the child from blocking # if it reads on stdin p.stdin.close() while True: line = p.stdout.readline() if line == '': break if debug_vcss: logging.debug(line.rstrip('\n')) p.wait() # even with eof on both streams, we still wait shutil.rmtree(tempdir) if p.returncode == 0: logging.debug('%s is a %s repo' % (url, vcs)) return vcs logging.debug("%s doesn't seem to be a repo" % url) return None def get_compression(url): bzip = 'bzip2' gzip = 'gzip' lzma = 'lzma' m = {'tar.gz': gzip, 'tgz': gzip, 'tar.Z': gzip, 'tar.bz2': bzip, 'tbz2': bzip, 'tar.lzma': lzma, 'tar.xz': lzma, 'tlz': lzma, 'txz': lzma} for x in [1, 2]: ext = '.'.join(url.split('.')[-x:]) if ext in m: return m[ext] return None # Assumption: url passed to this function must have a 'standard' tar extension def make_tarball_lorry(lorry_prefix, package_name, url): name = '%s/%s' % (lorry_prefix, package_name) # TODO: shouldn't have 'x-products-python' field hardcoded here either lorry = {'type': 'tarball', 'url': url, 'x-products-python': [package_name]} compression = get_compression(url) if compression: lorry['compression'] = compression return json.dumps({name + "-tarball": lorry}, indent=4, sort_keys=True) def filter_urls(urls): allowed_extensions = ['tar.gz', 'tgz', 'tar.Z', 'tar.bz2', 'tbz2', 'tar.lzma', 'tar.xz', 'tlz', 'txz', 'tar'] def allowed_extension(url): return ('.'.join(url['url'].split('.')[-2:]) in allowed_extensions or url['url'].split('.')[-1:] in allowed_extensions) return filter(allowed_extension, urls) def get_releases(client, requirement): try: releases = client.package_releases(requirement.project_name) except Exception as e: error("Couldn't fetch release data:", e) return releases def generate_tarball_lorry(lorry_prefix, client, requirement): releases = get_releases(client, requirement) if len(releases) == 0: error("Couldn't find any releases for package %s" % requirement.project_name) releases = [v for v in releases if specs_satisfied(v, requirement.specs)] if len(releases) == 0: error("Couldn't find any releases of %s" " that satisfy version constraints: %s" % (requirement.project_name, requirement.specs)) release_version = releases[0] logging.debug('Fetching urls for package %s with version %s' % (requirement.project_name, release_version)) try: # Get a list of dicts, the dicts contain the urls. urls = client.release_urls(requirement.project_name, release_version) except Exception as e: error("Couldn't fetch release urls:", e) tarball_urls = filter_urls(urls) if len(tarball_urls) > 0: urls = tarball_urls elif len(urls) > 0: warn("None of these urls look like tarballs:") for url in urls: warn("\t%s" % url['url']) error("Cannot proceed") else: error("Couldn't find any download urls for package %s" % requirement.project_name) url = urls[0]['url'] return make_tarball_lorry(lorry_prefix, requirement.project_name, url) def str_repo_lorry(lorry_prefix, package_name, repo_type, url): name = '%s/%s' % (lorry_prefix, package_name) # TODO: this products field 'x-products-python' # probably shouldn't be hardcoded here return json.dumps({name: {'type': repo_type, 'url': url, 'x-products-python': [package_name]}}, indent=4, sort_keys=True) class PythonLorryExtension(ImportExtension): def __init__(self): super(PythonLorryExtension, self).__init__() def run(self): if len(sys.argv) != 2: # TODO explain the format of python requirements # warn the user that they probably want to quote their arg # > < will be interpreted as redirection by the shell print('usage: %s requirement' % sys.argv[0], file=sys.stderr) sys.exit(1) client = xmlrpclib.ServerProxy(PYPI_URL) req = pkg_resources.parse_requirements(sys.argv[1]).next() with open(self.local_data_path('python.yaml')) as f: lorry_prefix = yaml.load(f)['lorry-prefix'] new_proj_name = name_or_closest(client, req.project_name) if new_proj_name == None: error("Couldn't find any project with name '%s'" % req.project_name) logging.debug('Treating %s as %s' % (req.project_name, new_proj_name)) req.project_name = new_proj_name metadata = fetch_package_metadata(req.project_name) info = metadata['info'] repo_type = (find_repo_type(info['home_page']) if 'home_page' in info else None) if repo_type: print(str_repo_lorry(lorry_prefix, req.project_name, repo_type, info['home_page'])) else: print(generate_tarball_lorry(lorry_prefix, client, req)) if __name__ == '__main__': PythonLorryExtension().run()