diff options
author | Jürg Billeter <j@bitron.ch> | 2017-11-29 17:52:37 +0000 |
---|---|---|
committer | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2018-01-11 17:33:19 +0000 |
commit | bc931312f965030908d51a7e9a94a4d4ee31c0d3 (patch) | |
tree | 3cdd61d6cdd057f4ed36702e1251c96f46533c1f | |
parent | bc8fc4fec1923c44583ebdd9e5cedb13c78fdf18 (diff) | |
download | buildstream-bc931312f965030908d51a7e9a94a4d4ee31c0d3.tar.gz |
_artifactcache: Use pushreceive also for local artifact repositories
This reduces the differences between local and remote artifact
repositories, increasing code coverage of tests.
-rw-r--r-- | buildstream/_artifactcache/ostreecache.py | 54 | ||||
-rw-r--r-- | buildstream/_artifactcache/pushreceive.py | 71 |
2 files changed, 80 insertions, 45 deletions
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py index 7c4e367e9..5dd0339d8 100644 --- a/buildstream/_artifactcache/ostreecache.py +++ b/buildstream/_artifactcache/ostreecache.py @@ -290,6 +290,7 @@ class OSTreeCache(ArtifactCache): ref = buildref(element, element._get_cache_key_from_artifact()) weak_ref = buildref(element, element._get_cache_key(strength=_KeyStrength.WEAK)) + for push_url in self.push_urls: any_pushed |= self._push_to_remote(push_url, element, ref, weak_ref) @@ -424,35 +425,24 @@ class OSTreeCache(ArtifactCache): self._remote_refs[ref] = remote def _push_to_remote(self, push_url, element, ref, weak_ref): - if push_url.startswith("file://"): - # local repository - push_repo = _ostree.ensure(push_url[7:], True) - _ostree.fetch(push_repo, remote=self.repo.get_path().get_uri(), ref=ref) - _ostree.fetch(push_repo, remote=self.repo.get_path().get_uri(), ref=weak_ref) - - # Local remotes are not really a thing, just return True here - return True - else: - # Push over ssh - # - with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir: - - with element.timed_activity("Preparing compressed archive"): - # First create a temporary archive-z2 repository, we can - # only use ostree-push with archive-z2 local repo. - temp_repo = _ostree.ensure(temp_repo_dir, True) - - # Now push the ref we want to push into our temporary archive-z2 repo - _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref) - _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=weak_ref) - - with element.timed_activity("Sending artifact"), \ - element._output_file() as output_file: - try: - pushed = push_artifact(temp_repo.get_path().get_path(), - push_url, - [ref, weak_ref], output_file) - except PushException as e: - raise ArtifactError("Failed to push artifact {}: {}".format(ref, e)) from e - - return pushed + with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir: + + with element.timed_activity("Preparing compressed archive"): + # First create a temporary archive-z2 repository, we can + # only use ostree-push with archive-z2 local repo. + temp_repo = _ostree.ensure(temp_repo_dir, True) + + # Now push the ref we want to push into our temporary archive-z2 repo + _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref) + _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=weak_ref) + + with element.timed_activity("Sending artifact"), \ + element._output_file() as output_file: + try: + pushed = push_artifact(temp_repo.get_path().get_path(), + push_url, + [ref, weak_ref], output_file) + except PushException as e: + raise ArtifactError("Failed to push artifact {}: {}".format(ref, e)) from e + + return pushed diff --git a/buildstream/_artifactcache/pushreceive.py b/buildstream/_artifactcache/pushreceive.py index d78e36043..9476c5c81 100644 --- a/buildstream/_artifactcache/pushreceive.py +++ b/buildstream/_artifactcache/pushreceive.py @@ -27,6 +27,7 @@ import sys import shutil import tarfile import tempfile +import multiprocessing from enum import Enum from urllib.parse import urlparse @@ -317,10 +318,10 @@ def parse_remote_location(remotepath): """Parse remote artifact cache URL that's been specified in our config.""" remote_host = remote_user = remote_repo = None - url = urlparse(remotepath) - if url.netloc: - if url.scheme != 'ssh': - raise PushException('Only URL scheme ssh is allowed, ' + url = urlparse(remotepath, scheme='file') + if url.scheme: + if url.scheme not in ['file', 'ssh']: + raise PushException('Only URL schemes file and ssh are allowed, ' 'not "{}"'.format(url.scheme)) remote_host = url.hostname remote_user = url.username @@ -348,6 +349,9 @@ def parse_remote_location(remotepath): def ssh_commandline(remote_host, remote_user=None, remote_port=22): + if remote_host is None: + return [] + ssh_cmd = ['ssh'] if remote_user: ssh_cmd += ['-l', remote_user] @@ -357,6 +361,36 @@ def ssh_commandline(remote_host, remote_user=None, remote_port=22): return ssh_cmd +def foo_run(func, args, stdin_fd, stdout_fd, stderr_fd): + sys.stdin = open(stdin_fd, 'r') + sys.stdout = open(stdout_fd, 'w') + sys.stderr = open(stderr_fd, 'w') + func(args) + + +class ProcessWithPipes(object): + def __init__(self, func, args, *, stderr=None): + r0, w0 = os.pipe() + r1, w1 = os.pipe() + if stderr is None: + r2, w2 = os.pipe() + else: + w2 = stderr.fileno() + self.proc = multiprocessing.Process(target=foo_run, args=(func, args, r0, w1, w2)) + self.proc.start() + self.stdin = open(w0, 'wb') + os.close(r0) + self.stdout = open(r1, 'rb') + os.close(w1) + if stderr is None: + self.stderr = open(r2, 'rb') + os.close(w2) + + def wait(self): + self.proc.join() + self.returncode = self.proc.exitcode + + class OSTreePusher(object): def __init__(self, repopath, remotepath, branches=[], verbose=False, debug=False, output=None): @@ -392,13 +426,19 @@ class OSTreePusher(object): ssh_cmd += ['--verbose'] if self.debug: ssh_cmd += ['--debug'] + if not self.remote_host: + ssh_cmd += ['--pull-url', self.remote_repo] ssh_cmd += [self.remote_repo] logging.info('Executing {}'.format(' '.join(ssh_cmd))) - self.ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=self.output, - start_new_session=True) + + if self.remote_host: + self.ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=self.output, + start_new_session=True) + else: + self.ssh = ProcessWithPipes(receive_main, ssh_cmd[1:], stderr=self.output) self.writer = PushMessageWriter(self.ssh.stdin) self.reader = PushMessageReader(self.ssh.stdout) @@ -644,14 +684,19 @@ def initialize_push_connection(remote): remote_host, remote_user, remote_repo, remote_port = parse_remote_location(remote) ssh_cmd = ssh_commandline(remote_host, remote_user, remote_port) - # We need a short timeout here because if 'remote' isn't reachable at - # all, the process will hang until the connection times out. - ssh_cmd += ['-oConnectTimeout=3'] + if remote_host: + # We need a short timeout here because if 'remote' isn't reachable at + # all, the process will hang until the connection times out. + ssh_cmd += ['-oConnectTimeout=3'] ssh_cmd += ['bst-artifact-receive', remote_repo] - ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if remote_host: + ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + ssh_cmd += ['--pull-url', remote_repo] + ssh = ProcessWithPipes(receive_main, ssh_cmd[1:]) writer = PushMessageWriter(ssh.stdin) reader = PushMessageReader(ssh.stdout) |