summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildstream/_artifactcache/artifactcache.py10
-rw-r--r--buildstream/_artifactcache/pushreceive.py131
-rw-r--r--buildstream/_pipeline.py18
3 files changed, 121 insertions, 38 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 2508e0b88..a7208e8f2 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -28,6 +28,7 @@ from ..exceptions import _ArtifactError
from ..element import _KeyStrength
from .._ostree import OSTreeError
+from .pushreceive import check_push_connection
from .pushreceive import push as push_artifact
from .pushreceive import PushException
@@ -67,6 +68,15 @@ class ArtifactCache():
self.__offline = False
+ def preflight(self):
+ if self.can_push() and not self.context.artifact_push.startswith("/"):
+ try:
+ check_push_connection(self.context.artifact_push,
+ self.context.artifact_push_port)
+ except PushException as e:
+ raise _ArtifactError("BuildStream will be unable to push artifacts "
+ "to the shared cache: {}".format(e))
+
# contains():
#
# Check whether the artifact for the specified Element is already available
diff --git a/buildstream/_artifactcache/pushreceive.py b/buildstream/_artifactcache/pushreceive.py
index 0706b5e41..f47076c76 100644
--- a/buildstream/_artifactcache/pushreceive.py
+++ b/buildstream/_artifactcache/pushreceive.py
@@ -29,6 +29,7 @@ import tempfile
import shutil
import tarfile
import signal
+import tempfile
from enum import Enum
from urllib.parse import urlparse
@@ -135,6 +136,16 @@ class PushMessageWriter(object):
self.file.write(msg)
self.file.flush()
+ def send_hello(self):
+ # The 'hello' message is used to check connectivity, and is actually
+ # an empty info request in order to keep the receiver code simple.
+ args = {
+ 'mode': GLib.Variant('i', 0),
+ 'refs': GLib.Variant('a{ss}', {})
+ }
+ command = PushCommand(PushCommandType.info, args)
+ self.write(command)
+
def send_info(self, repo, refs):
cmdtype = PushCommandType.info
mode = repo.get_mode()
@@ -292,6 +303,48 @@ class PushMessageReader(object):
return args
+def parse_remote_location(remotepath, remote_port):
+ """Parse remote artifact cache URL that's been specified in our config."""
+ remote_host = remote_user = remote_repo = None
+
+ url = urlparse(remotepath)
+ if url.netloc:
+ if url.scheme != 'ssh':
+ raise PushException('Only URL scheme ssh is allowed, '
+ 'not "%s"' % url.scheme)
+ remote_host = url.hostname
+ remote_user = url.username
+ remote_repo = url.path
+ remote_port = url.port
+ else:
+ # Scp/git style remote (user@hostname:path)
+ parts = remotepath.split('@', 1)
+ if len(parts) > 1:
+ remote_user = parts[0]
+ remainder = parts[1]
+ else:
+ remote_user = None
+ remainder = parts[0]
+ parts = remainder.split(':', 1)
+ if len(parts) != 2:
+ raise PushException('Remote repository "%s" does not '
+ 'contain a hostname and path separated '
+ 'by ":"' % remotepath)
+ remote_host, remote_repo = parts
+
+ return remote_host, remote_user, remote_repo, remote_port
+
+
+def ssh_commandline(remote_host, remote_user=None, remote_port=22):
+ ssh_cmd = ['ssh']
+ if remote_user:
+ ssh_cmd += ['-l', remote_user]
+ if remote_port != 22:
+ ssh_cmd += ['-p', str(remote_port)]
+ ssh_cmd += [remote_host]
+ return ssh_cmd
+
+
class OSTreePusher(object):
def __init__(self, repopath, remotepath, remote_port, branches=[], verbose=False,
debug=False, output=None):
@@ -301,11 +354,8 @@ class OSTreePusher(object):
self.debug = debug
self.output = output
- self.remote_host = None
- self.remote_user = None
- self.remote_repo = None
- self.remote_port = remote_port
- self._set_remote_args()
+ self.remote_host, self.remote_user, self.remote_repo, self.remote_port = \
+ parse_remote_location(remotepath, remote_port)
if self.repopath is None:
self.repo = OSTree.Repo.new_default()
@@ -323,18 +373,14 @@ class OSTreePusher(object):
self.refs[branch] = rev
# Start ssh
- ssh_cmd = ['ssh']
- if self.remote_user:
- ssh_cmd += ['-l', self.remote_user]
- if self.remote_port != 22:
- ssh_cmd += ['-p', str(self.remote_port)]
+ ssh_cmd = ssh_commandline(self.remote_host, self.remote_user, self.remote_port)
- ssh_cmd += [self.remote_host, 'bst-artifact-receive']
+ ssh_cmd += ['bst-artifact-receive']
if self.verbose:
ssh_cmd += ['--verbose']
if self.debug:
ssh_cmd += ['--debug']
- ssh_cmd += [self.remotepath]
+ ssh_cmd += [self.remote_repo]
logging.info('Executing {}'.format(' '.join(ssh_cmd)))
self.ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE,
@@ -345,32 +391,6 @@ class OSTreePusher(object):
self.writer = PushMessageWriter(self.ssh.stdin)
self.reader = PushMessageReader(self.ssh.stdout)
- def _set_remote_args(self):
- url = urlparse(self.remotepath)
- if url.netloc:
- if url.scheme != 'ssh':
- raise PushException('Only URL scheme ssh is allowed, '
- 'not "%s"' % url.scheme)
- self.remote_host = url.hostname
- self.remote_user = url.username
- self.remote_repo = url.path
- self.remote_port = url.port
- else:
- # Scp/git style remote (user@hostname:path)
- parts = self.remotepath.split('@', 1)
- if len(parts) > 1:
- self.remote_user = parts[0]
- remainder = parts[1]
- else:
- self.remote_user = None
- remainder = parts[0]
- parts = remainder.split(':', 1)
- if len(parts) != 2:
- raise PushException('Remote repository "%s" does not '
- 'contain a hostname and path separated '
- 'by ":"' % self.remotepath)
- self.remote_host, self.remotepath = parts
-
def needed_commits(self, remote, local, needed):
parent = local
if remote == '0' * 64:
@@ -579,6 +599,41 @@ class OSTreeReceiver(object):
return 0
+# check_push_connection()
+#
+# Test that we can connect to the remote bst-artifact-receive program.
+# We don't want to make the user wait until the first artifact has been built
+# to discover that they actually cannot push.
+#
+# Args:
+# remote: The ssh remote url to push to
+# remote_port: The ssh port at the remote url
+#
+# Raises:
+# PushException if there was an issue connecting to the remote.
+def check_push_connection(remote, remote_port):
+ remote_host, remote_user, remote_repo, remote_port = parse_remote_location(remote, remote_port)
+ ssh_cmd = ssh_commandline(remote_host, remote_user, remote_port)
+
+ # We need a short timeout here because if 'remote' isn't reachable at
+ # all, the process will hang until the connection times out.
+ ssh_cmd += ['-oConnectTimeout=3']
+
+ ssh_cmd += ['bst-artifact-receive', remote_repo]
+
+ ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ writer = PushMessageWriter(ssh.stdin)
+ writer.send_hello()
+ writer.send_done()
+
+ ssh.wait()
+ if ssh.returncode != 0:
+ error = ssh.stderr.read().decode('unicode-escape')
+ raise PushException(error)
+
+
# push()
#
# Run the pusher in process, with logging going to the output file
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 960581a5c..64ce4d679 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -19,6 +19,7 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
+import datetime
import os
import stat
import shlex
@@ -282,6 +283,21 @@ class Pipeline():
return element
+ # Internal: If a remote artifact cache is configured for pushing, check that it
+ # actually works.
+ def assert_remote_artifact_cache(self):
+ if self.artifacts.can_push():
+ starttime = datetime.datetime.now()
+ self.message(self.target, MessageType.START, "Checking connectivity to remote artifact cache")
+ try:
+ self.artifacts.preflight()
+ except _ArtifactError as e:
+ self.message(self.target, MessageType.FAIL, str(e),
+ elapsed=datetime.datetime.now() - starttime)
+ raise PipelineError()
+ self.message(self.target, MessageType.SUCCESS, "Connectivity OK",
+ elapsed=datetime.datetime.now() - starttime)
+
#############################################################
# Commands #
#############################################################
@@ -391,6 +407,8 @@ class Pipeline():
detail="\n".join([el + "-" + str(src) for el, src, _
in self.unused_workspaces]))
+ self.assert_remote_artifact_cache()
+
if build_all or track_first:
plan = list(self.dependencies(Scope.ALL))
else: