summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2017-11-29 17:52:37 +0000
committerSam Thursfield <sam.thursfield@codethink.co.uk>2018-01-11 18:18:14 +0000
commitad08ad267f074f73c2f895099fd659e27d0d194c (patch)
tree0b92f27c7d211f04bda4b7292ffa2d59865cdd55
parent1787ff21c4d5b9f318b65c63743f4b4811ac1c97 (diff)
downloadbuildstream-ad08ad267f074f73c2f895099fd659e27d0d194c.tar.gz
_artifactcache: Use pushreceive also for local artifact repositories
This reduces the differences between local and remote artifact repositories, increasing code coverage of tests.
-rw-r--r--buildstream/_artifactcache/ostreecache.py54
-rw-r--r--buildstream/_artifactcache/pushreceive.py71
2 files changed, 80 insertions, 45 deletions
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py
index 7c4e367e9..5dd0339d8 100644
--- a/buildstream/_artifactcache/ostreecache.py
+++ b/buildstream/_artifactcache/ostreecache.py
@@ -290,6 +290,7 @@ class OSTreeCache(ArtifactCache):
ref = buildref(element, element._get_cache_key_from_artifact())
weak_ref = buildref(element, element._get_cache_key(strength=_KeyStrength.WEAK))
+
for push_url in self.push_urls:
any_pushed |= self._push_to_remote(push_url, element, ref, weak_ref)
@@ -424,35 +425,24 @@ class OSTreeCache(ArtifactCache):
self._remote_refs[ref] = remote
def _push_to_remote(self, push_url, element, ref, weak_ref):
- if push_url.startswith("file://"):
- # local repository
- push_repo = _ostree.ensure(push_url[7:], True)
- _ostree.fetch(push_repo, remote=self.repo.get_path().get_uri(), ref=ref)
- _ostree.fetch(push_repo, remote=self.repo.get_path().get_uri(), ref=weak_ref)
-
- # Local remotes are not really a thing, just return True here
- return True
- else:
- # Push over ssh
- #
- with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir:
-
- with element.timed_activity("Preparing compressed archive"):
- # First create a temporary archive-z2 repository, we can
- # only use ostree-push with archive-z2 local repo.
- temp_repo = _ostree.ensure(temp_repo_dir, True)
-
- # Now push the ref we want to push into our temporary archive-z2 repo
- _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref)
- _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=weak_ref)
-
- with element.timed_activity("Sending artifact"), \
- element._output_file() as output_file:
- try:
- pushed = push_artifact(temp_repo.get_path().get_path(),
- push_url,
- [ref, weak_ref], output_file)
- except PushException as e:
- raise ArtifactError("Failed to push artifact {}: {}".format(ref, e)) from e
-
- return pushed
+ with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir:
+
+ with element.timed_activity("Preparing compressed archive"):
+ # First create a temporary archive-z2 repository, we can
+ # only use ostree-push with archive-z2 local repo.
+ temp_repo = _ostree.ensure(temp_repo_dir, True)
+
+ # Now push the ref we want to push into our temporary archive-z2 repo
+ _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref)
+ _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=weak_ref)
+
+ with element.timed_activity("Sending artifact"), \
+ element._output_file() as output_file:
+ try:
+ pushed = push_artifact(temp_repo.get_path().get_path(),
+ push_url,
+ [ref, weak_ref], output_file)
+ except PushException as e:
+ raise ArtifactError("Failed to push artifact {}: {}".format(ref, e)) from e
+
+ return pushed
diff --git a/buildstream/_artifactcache/pushreceive.py b/buildstream/_artifactcache/pushreceive.py
index d78e36043..9476c5c81 100644
--- a/buildstream/_artifactcache/pushreceive.py
+++ b/buildstream/_artifactcache/pushreceive.py
@@ -27,6 +27,7 @@ import sys
import shutil
import tarfile
import tempfile
+import multiprocessing
from enum import Enum
from urllib.parse import urlparse
@@ -317,10 +318,10 @@ def parse_remote_location(remotepath):
"""Parse remote artifact cache URL that's been specified in our config."""
remote_host = remote_user = remote_repo = None
- url = urlparse(remotepath)
- if url.netloc:
- if url.scheme != 'ssh':
- raise PushException('Only URL scheme ssh is allowed, '
+ url = urlparse(remotepath, scheme='file')
+ if url.scheme:
+ if url.scheme not in ['file', 'ssh']:
+ raise PushException('Only URL schemes file and ssh are allowed, '
'not "{}"'.format(url.scheme))
remote_host = url.hostname
remote_user = url.username
@@ -348,6 +349,9 @@ def parse_remote_location(remotepath):
def ssh_commandline(remote_host, remote_user=None, remote_port=22):
+ if remote_host is None:
+ return []
+
ssh_cmd = ['ssh']
if remote_user:
ssh_cmd += ['-l', remote_user]
@@ -357,6 +361,36 @@ def ssh_commandline(remote_host, remote_user=None, remote_port=22):
return ssh_cmd
+def foo_run(func, args, stdin_fd, stdout_fd, stderr_fd):
+ sys.stdin = open(stdin_fd, 'r')
+ sys.stdout = open(stdout_fd, 'w')
+ sys.stderr = open(stderr_fd, 'w')
+ func(args)
+
+
+class ProcessWithPipes(object):
+ def __init__(self, func, args, *, stderr=None):
+ r0, w0 = os.pipe()
+ r1, w1 = os.pipe()
+ if stderr is None:
+ r2, w2 = os.pipe()
+ else:
+ w2 = stderr.fileno()
+ self.proc = multiprocessing.Process(target=foo_run, args=(func, args, r0, w1, w2))
+ self.proc.start()
+ self.stdin = open(w0, 'wb')
+ os.close(r0)
+ self.stdout = open(r1, 'rb')
+ os.close(w1)
+ if stderr is None:
+ self.stderr = open(r2, 'rb')
+ os.close(w2)
+
+ def wait(self):
+ self.proc.join()
+ self.returncode = self.proc.exitcode
+
+
class OSTreePusher(object):
def __init__(self, repopath, remotepath, branches=[], verbose=False,
debug=False, output=None):
@@ -392,13 +426,19 @@ class OSTreePusher(object):
ssh_cmd += ['--verbose']
if self.debug:
ssh_cmd += ['--debug']
+ if not self.remote_host:
+ ssh_cmd += ['--pull-url', self.remote_repo]
ssh_cmd += [self.remote_repo]
logging.info('Executing {}'.format(' '.join(ssh_cmd)))
- self.ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=self.output,
- start_new_session=True)
+
+ if self.remote_host:
+ self.ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=self.output,
+ start_new_session=True)
+ else:
+ self.ssh = ProcessWithPipes(receive_main, ssh_cmd[1:], stderr=self.output)
self.writer = PushMessageWriter(self.ssh.stdin)
self.reader = PushMessageReader(self.ssh.stdout)
@@ -644,14 +684,19 @@ def initialize_push_connection(remote):
remote_host, remote_user, remote_repo, remote_port = parse_remote_location(remote)
ssh_cmd = ssh_commandline(remote_host, remote_user, remote_port)
- # We need a short timeout here because if 'remote' isn't reachable at
- # all, the process will hang until the connection times out.
- ssh_cmd += ['-oConnectTimeout=3']
+ if remote_host:
+ # We need a short timeout here because if 'remote' isn't reachable at
+ # all, the process will hang until the connection times out.
+ ssh_cmd += ['-oConnectTimeout=3']
ssh_cmd += ['bst-artifact-receive', remote_repo]
- ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ if remote_host:
+ ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ else:
+ ssh_cmd += ['--pull-url', remote_repo]
+ ssh = ProcessWithPipes(receive_main, ssh_cmd[1:])
writer = PushMessageWriter(ssh.stdin)
reader = PushMessageReader(ssh.stdout)