diff options
Diffstat (limited to 'buildstream/_artifactcache/pushreceive.py')
-rw-r--r-- | buildstream/_artifactcache/pushreceive.py | 812 |
1 files changed, 0 insertions, 812 deletions
diff --git a/buildstream/_artifactcache/pushreceive.py b/buildstream/_artifactcache/pushreceive.py deleted file mode 100644 index 777065e18..000000000 --- a/buildstream/_artifactcache/pushreceive.py +++ /dev/null @@ -1,812 +0,0 @@ -#!/usr/bin/python3 - -# Push OSTree commits to a remote repo, based on Dan Nicholson's ostree-push -# -# Copyright (C) 2015 Dan Nicholson <nicholson@endlessm.com> -# Copyright (C) 2017 Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import logging -import multiprocessing -import os -import re -import subprocess -import sys -import shutil -import tarfile -import tempfile -from enum import Enum -from urllib.parse import urlparse - -import click -import gi - -from .. import _signals # nopep8 -from .._profile import Topics, profile_start, profile_end - -gi.require_version('OSTree', '1.0') -# pylint: disable=wrong-import-position,wrong-import-order -from gi.repository import GLib, Gio, OSTree # nopep8 - - -PROTO_VERSION = 1 -HEADER_SIZE = 5 - - -# An error occurred -class PushException(Exception): - pass - - -# Trying to commit a ref which already exists in remote -class PushExistsException(Exception): - pass - - -class PushCommandType(Enum): - info = 0 - update = 1 - putobjects = 2 - status = 3 - done = 4 - - -def python_to_msg_byteorder(python_byteorder=sys.byteorder): - if python_byteorder == 'little': - return 'l' - elif python_byteorder == 'big': - return 'B' - else: - raise PushException('Unrecognized system byteorder {}' - .format(python_byteorder)) - - -def msg_to_python_byteorder(msg_byteorder): - if msg_byteorder == 'l': - return 'little' - elif msg_byteorder == 'B': - return 'big' - else: - raise PushException('Unrecognized message byteorder {}' - .format(msg_byteorder)) - - -def ostree_object_path(repo, obj): - repodir = repo.get_path().get_path() - return os.path.join(repodir, 'objects', obj[0:2], obj[2:]) - - -class PushCommand(object): - def __init__(self, cmdtype, args): - self.cmdtype = cmdtype - self.args = args - self.validate(self.cmdtype, self.args) - self.variant = GLib.Variant('a{sv}', self.args) - - @staticmethod - def validate(command, args): - if not isinstance(command, PushCommandType): - raise PushException('Message command must be PushCommandType') - if not isinstance(args, dict): - raise PushException('Message args must be dict') - # Ensure all values are variants for a{sv} vardict - for val in args.values(): - if not isinstance(val, GLib.Variant): - raise PushException('Message args values must be ' - 'GLib.Variant') - - -class PushMessageWriter(object): - def __init__(self, file, byteorder=sys.byteorder): - self.file = file - self.byteorder = byteorder - self.msg_byteorder = python_to_msg_byteorder(self.byteorder) - - def encode_header(self, cmdtype, size): - header = self.msg_byteorder.encode() + \ - PROTO_VERSION.to_bytes(1, self.byteorder) + \ - cmdtype.value.to_bytes(1, self.byteorder) + \ - size.to_bytes(2, self.byteorder) - return header - - def encode_message(self, command): - if not isinstance(command, PushCommand): - raise PushException('Command must be PushCommand') - data = command.variant.get_data_as_bytes() - size = data.get_size() - - # Build the header - header = self.encode_header(command.cmdtype, size) - - return header + data.get_data() - - def write(self, command): - msg = self.encode_message(command) - self.file.write(msg) - self.file.flush() - - def send_hello(self): - # The 'hello' message is used to check connectivity and discover the - # cache's pull URL. It's actually transmitted as an empty info request. - args = { - 'mode': GLib.Variant('i', 0), - 'refs': GLib.Variant('a{ss}', {}) - } - command = PushCommand(PushCommandType.info, args) - self.write(command) - - def send_info(self, repo, refs, pull_url=None): - cmdtype = PushCommandType.info - mode = repo.get_mode() - - ref_map = {} - for ref in refs: - _, checksum = repo.resolve_rev(ref, True) - if checksum: - _, has_object = repo.has_object(OSTree.ObjectType.COMMIT, checksum, None) - if has_object: - ref_map[ref] = checksum - - args = { - 'mode': GLib.Variant('i', mode), - 'refs': GLib.Variant('a{ss}', ref_map) - } - - # The server sends this so clients can discover the correct pull URL - # for this cache without requiring end-users to specify it. - if pull_url: - args['pull_url'] = GLib.Variant('s', pull_url) - - command = PushCommand(cmdtype, args) - self.write(command) - - def send_update(self, refs): - cmdtype = PushCommandType.update - args = {} - for branch, revs in refs.items(): - args[branch] = GLib.Variant('(ss)', revs) - command = PushCommand(cmdtype, args) - self.write(command) - - def send_putobjects(self, repo, objects): - - logging.info('Sending {} objects'.format(len(objects))) - - # Send command saying we're going to send a stream of objects - cmdtype = PushCommandType.putobjects - command = PushCommand(cmdtype, {}) - self.write(command) - - # Open a TarFile for writing uncompressed tar to a stream - tar = tarfile.TarFile.open(mode='w|', fileobj=self.file) - for obj in objects: - - logging.info('Sending object {}'.format(obj)) - objpath = ostree_object_path(repo, obj) - stat = os.stat(objpath) - - tar_info = tarfile.TarInfo(obj) - tar_info.mtime = stat.st_mtime - tar_info.size = stat.st_size - with open(objpath, 'rb') as obj_fp: - tar.addfile(tar_info, obj_fp) - - # We're done, close the tarfile - tar.close() - - def send_status(self, result, message=''): - cmdtype = PushCommandType.status - args = { - 'result': GLib.Variant('b', result), - 'message': GLib.Variant('s', message) - } - command = PushCommand(cmdtype, args) - self.write(command) - - def send_done(self): - command = PushCommand(PushCommandType.done, {}) - self.write(command) - - -class PushMessageReader(object): - def __init__(self, file, byteorder=sys.byteorder, tmpdir=None): - self.file = file - self.byteorder = byteorder - self.tmpdir = tmpdir - - def decode_header(self, header): - if len(header) != HEADER_SIZE: - raise Exception('Header is {:d} bytes, not {:d}'.format(len(header), HEADER_SIZE)) - order = msg_to_python_byteorder(chr(header[0])) - version = int(header[1]) - if version != PROTO_VERSION: - raise Exception('Unsupported protocol version {:d}'.format(version)) - cmdtype = PushCommandType(int(header[2])) - vlen = int.from_bytes(header[3:], order) - return order, version, cmdtype, vlen - - def decode_message(self, message, size, order): - if len(message) != size: - raise Exception('Expected {:d} bytes, but got {:d}'.format(size, len(message))) - data = GLib.Bytes.new(message) - variant = GLib.Variant.new_from_bytes(GLib.VariantType.new('a{sv}'), - data, False) - if order != self.byteorder: - variant = GLib.Variant.byteswap(variant) - - return variant - - def read(self): - header = self.file.read(HEADER_SIZE) - if not header: - # Remote end quit - return None, None - order, _, cmdtype, size = self.decode_header(header) - msg = self.file.read(size) - if len(msg) != size: - raise PushException('Did not receive full message') - args = self.decode_message(msg, size, order) - - return cmdtype, args - - def receive(self, allowed): - cmdtype, args = self.read() - if cmdtype is None: - raise PushException('Expected reply, got none') - if cmdtype not in allowed: - raise PushException('Unexpected reply type', cmdtype.name) - return cmdtype, args.unpack() - - def receive_info(self): - _, args = self.receive([PushCommandType.info]) - return args - - def receive_update(self): - _, args = self.receive([PushCommandType.update]) - return args - - def receive_putobjects(self, repo): - - received_objects = [] - - # Open a TarFile for reading uncompressed tar from a stream - tar = tarfile.TarFile.open(mode='r|', fileobj=self.file) - - # Extract every tarinfo into the temp location - # - # This should block while tar.next() reads the next - # tar object from the stream. - while True: - filepos = tar.fileobj.tell() - tar_info = tar.next() - if not tar_info: - # End of stream marker consists of two 512 Byte blocks. - # Current Python tarfile stops reading after the first block. - # Read the second block as well to ensure the stream is at - # the right position for following messages. - if tar.fileobj.tell() - filepos < 1024: - tar.fileobj.read(512) - break - - tar.extract(tar_info, self.tmpdir) - received_objects.append(tar_info.name) - - # Finished with this stream - tar.close() - - return received_objects - - def receive_status(self): - _, args = self.receive([PushCommandType.status]) - return args - - def receive_done(self): - _, args = self.receive([PushCommandType.done]) - return args - - -def parse_remote_location(remotepath): - """Parse remote artifact cache URL that's been specified in our config.""" - remote_host = remote_user = remote_repo = None - - url = urlparse(remotepath, scheme='file') - if url.scheme: - if url.scheme not in ['file', 'ssh']: - raise PushException('Only URL schemes file and ssh are allowed, ' - 'not "{}"'.format(url.scheme)) - remote_host = url.hostname - remote_user = url.username - remote_repo = url.path - remote_port = url.port or 22 - else: - # Scp/git style remote (user@hostname:path) - parts = remotepath.split('@', 1) - if len(parts) > 1: - remote_user = parts[0] - remainder = parts[1] - else: - remote_user = None - remainder = parts[0] - parts = remainder.split(':', 1) - if len(parts) != 2: - raise PushException('Remote repository "{}" does not ' - 'contain a hostname and path separated ' - 'by ":"'.format(remotepath)) - remote_host, remote_repo = parts - # This form doesn't make it possible to specify a non-standard port. - remote_port = 22 - - return remote_host, remote_user, remote_repo, remote_port - - -def ssh_commandline(remote_host, remote_user=None, remote_port=22): - if remote_host is None: - return [] - - ssh_cmd = ['ssh'] - if remote_user: - ssh_cmd += ['-l', remote_user] - if remote_port != 22: - ssh_cmd += ['-p', str(remote_port)] - ssh_cmd += [remote_host] - return ssh_cmd - - -def foo_run(func, args, stdin_fd, stdout_fd, stderr_fd): - sys.stdin = open(stdin_fd, 'r') - sys.stdout = open(stdout_fd, 'w') - sys.stderr = open(stderr_fd, 'w') - func(args) - - -class ProcessWithPipes(object): - def __init__(self, func, args, *, stderr=None): - r0, w0 = os.pipe() - r1, w1 = os.pipe() - if stderr is None: - r2, w2 = os.pipe() - else: - w2 = stderr.fileno() - self.proc = multiprocessing.Process(target=foo_run, args=(func, args, r0, w1, w2)) - self.proc.start() - self.stdin = open(w0, 'wb') - os.close(r0) - self.stdout = open(r1, 'rb') - os.close(w1) - if stderr is None: - self.stderr = open(r2, 'rb') - os.close(w2) - - # The eventual return code - self.returncode = -1 - - def wait(self): - self.proc.join() - self.returncode = self.proc.exitcode - - -class OSTreePusher(object): - def __init__(self, repopath, remotepath, branches=None, verbose=False, - debug=False, output=None): - self.repopath = repopath - self.remotepath = remotepath - self.verbose = verbose - self.debug = debug - self.output = output - - self.remote_host, self.remote_user, self.remote_repo, self.remote_port = \ - parse_remote_location(remotepath) - - if self.repopath is None: - self.repo = OSTree.Repo.new_default() - else: - self.repo = OSTree.Repo.new(Gio.File.new_for_path(self.repopath)) - self.repo.open(None) - - # Enumerate branches to push - if branches is None: - _, self.refs = self.repo.list_refs(None, None) - else: - self.refs = {} - for branch in branches: - _, rev = self.repo.resolve_rev(branch, False) - self.refs[branch] = rev - - # Start ssh - ssh_cmd = ssh_commandline(self.remote_host, self.remote_user, self.remote_port) - - ssh_cmd += ['bst-artifact-receive'] - if self.verbose: - ssh_cmd += ['--verbose'] - if self.debug: - ssh_cmd += ['--debug'] - if not self.remote_host: - ssh_cmd += ['--pull-url', self.remote_repo] - ssh_cmd += [self.remote_repo] - - logging.info('Executing {}'.format(' '.join(ssh_cmd))) - - if self.remote_host: - self.ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=self.output, - start_new_session=True) - else: - self.ssh = ProcessWithPipes(receive_main, ssh_cmd[1:], stderr=self.output) - - self.writer = PushMessageWriter(self.ssh.stdin) - self.reader = PushMessageReader(self.ssh.stdout) - - def needed_commits(self, remote, local, needed): - parent = local - if remote == '0' * 64: - # Nonexistent remote branch, use None for convenience - remote = None - while parent != remote: - needed.add(parent) - _, commit = self.repo.load_variant_if_exists(OSTree.ObjectType.COMMIT, - parent) - if commit is None: - raise PushException('Shallow history from commit {} does ' - 'not contain remote commit {}'.format(local, remote)) - parent = OSTree.commit_get_parent(commit) - if parent is None: - break - if remote is not None and parent != remote: - self.writer.send_done() - raise PushExistsException('Remote commit {} not descendent of ' - 'commit {}'.format(remote, local)) - - def needed_objects(self, commits): - objects = set() - for rev in commits: - _, reachable = self.repo.traverse_commit(rev, 0, None) - for obj in reachable: - objname = OSTree.object_to_string(obj[0], obj[1]) - if obj[1] == OSTree.ObjectType.FILE: - # Make this a filez since we're archive-z2 - objname += 'z' - elif obj[1] == OSTree.ObjectType.COMMIT: - # Add in detached metadata - metaobj = objname + 'meta' - metapath = ostree_object_path(self.repo, metaobj) - if os.path.exists(metapath): - objects.add(metaobj) - - # Add in Endless compat files - for suffix in ['sig', 'sizes2']: - metaobj = obj[0] + '.' + suffix - metapath = ostree_object_path(self.repo, metaobj) - if os.path.exists(metapath): - objects.add(metaobj) - objects.add(objname) - return objects - - def close(self): - self.ssh.stdin.close() - return self.ssh.wait() - - def run(self): - remote_refs = {} - update_refs = {} - - # Send info immediately - self.writer.send_info(self.repo, list(self.refs.keys())) - - # Receive remote info - logging.info('Receiving repository information') - args = self.reader.receive_info() - remote_mode = args['mode'] - if remote_mode != OSTree.RepoMode.ARCHIVE_Z2: - raise PushException('Can only push to archive-z2 repos') - remote_refs = args['refs'] - for branch, rev in self.refs.items(): - remote_rev = remote_refs.get(branch, '0' * 64) - if rev != remote_rev: - update_refs[branch] = remote_rev, rev - if not update_refs: - logging.info('Nothing to update') - self.writer.send_done() - raise PushExistsException('Nothing to update') - - # Send update command - logging.info('Sending update request') - self.writer.send_update(update_refs) - - # Receive status for update request - args = self.reader.receive_status() - if not args['result']: - self.writer.send_done() - raise PushException(args['message']) - - # Collect commits and objects to push - commits = set() - exc_info = None - ref_count = 0 - for branch, revs in update_refs.items(): - logging.info('Updating {} {} to {}'.format(branch, revs[0], revs[1])) - try: - self.needed_commits(revs[0], revs[1], commits) - ref_count += 1 - except PushExistsException: - if exc_info is None: - exc_info = sys.exc_info() - - # Re-raise PushExistsException if all refs exist already - if ref_count == 0 and exc_info: - raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) - - logging.info('Enumerating objects to send') - objects = self.needed_objects(commits) - - # Send all the objects to receiver, checking status after each - self.writer.send_putobjects(self.repo, objects) - - # Inform receiver that all objects have been sent - self.writer.send_done() - - # Wait until receiver has completed - self.reader.receive_done() - - return self.close() - - -# OSTreeReceiver is on the receiving end of an OSTree push. -# -# Args: -# repopath (str): On-disk location of the receiving repository. -# pull_url (str): Redirection for clients who want to pull, not push. -# -class OSTreeReceiver(object): - def __init__(self, repopath, pull_url): - self.repopath = repopath - self.pull_url = pull_url - - if self.repopath is None: - self.repo = OSTree.Repo.new_default() - else: - self.repo = OSTree.Repo.new(Gio.File.new_for_path(self.repopath)) - self.repo.open(None) - - repo_tmp = os.path.join(self.repopath, 'tmp') - self.tmpdir = tempfile.mkdtemp(dir=repo_tmp, prefix='bst-push-') - self.writer = PushMessageWriter(sys.stdout.buffer) - self.reader = PushMessageReader(sys.stdin.buffer, tmpdir=self.tmpdir) - - # Set a sane umask before writing any objects - os.umask(0o0022) - - def close(self): - shutil.rmtree(self.tmpdir) - sys.stdout.flush() - return 0 - - def run(self): - try: - exit_code = self.do_run() - self.close() - return exit_code - except: - # BLIND EXCEPT - Just abort if we receive any exception, this - # can be a broken pipe, a tarfile read error when the remote - # connection is closed, a bug; whatever happens we want to cleanup. - self.close() - raise - - def do_run(self): - # Receive remote info - args = self.reader.receive_info() - remote_refs = args['refs'] - - # Send info back - self.writer.send_info(self.repo, list(remote_refs.keys()), - pull_url=self.pull_url) - - # Wait for update or done command - cmdtype, args = self.reader.receive([PushCommandType.update, - PushCommandType.done]) - if cmdtype == PushCommandType.done: - return 0 - update_refs = args - - profile_names = set() - for update_ref in update_refs: - # Strip off the SHA256 sum on the right of the reference, - # leaving the project and element name - project_and_element_name = re.sub(r"/[a-z0-9]+$", '', update_ref) - profile_names.add(project_and_element_name) - - profile_name = '_'.join(profile_names) - profile_start(Topics.ARTIFACT_RECEIVE, profile_name) - - self.writer.send_status(True) - - # Wait for putobjects or done - cmdtype, args = self.reader.receive([PushCommandType.putobjects, - PushCommandType.done]) - - if cmdtype == PushCommandType.done: - logging.debug('Received done before any objects, exiting') - return 0 - - # Receive the actual objects - received_objects = self.reader.receive_putobjects(self.repo) - - # Ensure that pusher has sent all objects - self.reader.receive_done() - - # If we didn't get any objects, we're done - if not received_objects: - return 0 - - # Got all objects, move them to the object store - for obj in received_objects: - tmp_path = os.path.join(self.tmpdir, obj) - obj_path = ostree_object_path(self.repo, obj) - os.makedirs(os.path.dirname(obj_path), exist_ok=True) - logging.debug('Renaming {} to {}'.format(tmp_path, obj_path)) - os.rename(tmp_path, obj_path) - - # Verify that we have the specified commit objects - for branch, revs in update_refs.items(): - _, has_object = self.repo.has_object(OSTree.ObjectType.COMMIT, revs[1], None) - if not has_object: - raise PushException('Missing commit {} for ref {}'.format(revs[1], branch)) - - # Finally, update the refs - for branch, revs in update_refs.items(): - logging.debug('Setting ref {} to {}'.format(branch, revs[1])) - self.repo.set_ref_immediate(None, branch, revs[1], None) - - # Inform pusher that everything is in place - self.writer.send_done() - - profile_end(Topics.ARTIFACT_RECEIVE, profile_name) - - return 0 - - -# initialize_push_connection() -# -# Test that we can connect to the remote bst-artifact-receive program, and -# receive the pull URL for this artifact cache. -# -# We don't want to make the user wait until the first artifact has been built -# to discover that they actually cannot push, so this should be called early. -# -# The SSH push protocol doesn't allow pulling artifacts. We don't want to -# require users to specify two URLs for a single cache, so we have the push -# protocol return the corresponding pull URL as part of the 'hello' response. -# -# Args: -# remote: The ssh remote url to push to -# -# Returns: -# (str): The URL that should be used for pushing to this cache. -# -# Raises: -# PushException if there was an issue connecting to the remote. -def initialize_push_connection(remote): - remote_host, remote_user, remote_repo, remote_port = parse_remote_location(remote) - ssh_cmd = ssh_commandline(remote_host, remote_user, remote_port) - - if remote_host: - # We need a short timeout here because if 'remote' isn't reachable at - # all, the process will hang until the connection times out. - ssh_cmd += ['-oConnectTimeout=3'] - - ssh_cmd += ['bst-artifact-receive', remote_repo] - - if remote_host: - ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - else: - ssh_cmd += ['--pull-url', remote_repo] - ssh = ProcessWithPipes(receive_main, ssh_cmd[1:]) - - writer = PushMessageWriter(ssh.stdin) - reader = PushMessageReader(ssh.stdout) - - try: - writer.send_hello() - args = reader.receive_info() - writer.send_done() - - if 'pull_url' in args: - pull_url = args['pull_url'] - else: - raise PushException( - "Remote cache did not tell us its pull URL. This cache probably " - "requires updating to a newer version of `bst-artifact-receive`.") - except PushException as protocol_error: - # If we get a read error on the wire, let's first see if SSH reported - # an error such as 'Permission denied'. If so this will be much more - # useful to the user than the "Expected reply, got none" sort of - # message that reader.receive_info() will have raised. - ssh.wait() - if ssh.returncode != 0: - ssh_error = ssh.stderr.read().decode('unicode-escape').strip() - raise PushException("{}".format(ssh_error)) - else: - raise protocol_error - - return pull_url - - -# push() -# -# Run the pusher in process, with logging going to the output file -# -# Args: -# repo: The local repository path -# remote: The ssh remote url to push to -# branches: The refs to push -# output: The output where logging should go -# -# Returns: -# (bool): True if the remote was updated, False if it already existed -# and no updated was required -# -# Raises: -# PushException if there was an error -# -def push(repo, remote, branches, output): - - logging.basicConfig(format='%(module)s: %(levelname)s: %(message)s', - level=logging.INFO, stream=output) - - pusher = OSTreePusher(repo, remote, branches, True, False, output=output) - - def terminate_push(): - pusher.close() - - with _signals.terminator(terminate_push): - try: - pusher.run() - return True - except ConnectionError as e: - # Connection attempt failed or connection was terminated unexpectedly - terminate_push() - raise PushException("Connection failed") from e - except PushException: - terminate_push() - raise - except PushExistsException: - # If the commit already existed, just bail out - # on the push and dont bother re-raising the error - logging.info("Ref {} was already present in remote {}".format(branches, remote)) - terminate_push() - return False - - -@click.command(short_help="Receive pushed artifacts over ssh") -@click.option('--verbose', '-v', is_flag=True, default=False, help="Verbose mode") -@click.option('--debug', '-d', is_flag=True, default=False, help="Debug mode") -@click.option('--pull-url', type=str, required=True, - help="Clients who try to pull over SSH will be redirected here") -@click.argument('repo') -def receive_main(verbose, debug, pull_url, repo): - """A BuildStream sister program for receiving artifacts send to a shared artifact cache - """ - loglevel = logging.WARNING - if verbose: - loglevel = logging.INFO - if debug: - loglevel = logging.DEBUG - logging.basicConfig(format='%(module)s: %(levelname)s: %(message)s', - level=loglevel, stream=sys.stderr) - - receiver = OSTreeReceiver(repo, pull_url) - return receiver.run() |