summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim MacArthur <jim.macarthur@codethink.co.uk>2018-07-05 12:19:18 +0100
committerMartin Blanchard <martin.blanchard@codethink.co.uk>2018-09-07 13:57:29 +0100
commit43651af0febce2b0638215dba9ab7159f34cfbe0 (patch)
tree302d72120214b2a8f3fb14411aca590df8722c13
parentc0ef71064e9160d926960158edd90bc7f15075a1 (diff)
downloadbuildstream-43651af0febce2b0638215dba9ab7159f34cfbe0.tar.gz
_sandboxremote.py: Implement the REAPI client
The remote execution client is implemented as a remote sandbox that sends sources and build commands to a REAPI server and fetches results once remotely executed. New file. https://gitlab.com/BuildStream/buildstream/issues/454
-rw-r--r--buildstream/sandbox/_sandboxremote.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
new file mode 100644
index 000000000..296b20351
--- /dev/null
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -0,0 +1,226 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2018 Bloomberg LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Jim MacArthur <jim.macarthur@codethink.co.uk>
+
+import os
+from urllib.parse import urlparse
+
+import grpc
+
+from . import Sandbox
+from ..storage._filebaseddirectory import FileBasedDirectory
+from ..storage._casbaseddirectory import CasBasedDirectory
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._artifactcache.cascache import CASCache
+
+
+class SandboxError(Exception):
+ pass
+
+
+# SandboxRemote()
+#
+# This isn't really a sandbox, it's a stub which sends all the sources and build
+# commands to a remote server and retrieves the results from it.
+#
+class SandboxRemote(Sandbox):
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.cascache = None
+
+ url = urlparse(kwargs['server_url'])
+ if not url.scheme or not url.hostname or not url.port:
+ raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
+ .format(kwargs['server_url']) +
+ "It should be of the form <protocol>://<domain name>:<port>.")
+ elif url.scheme != 'http':
+ raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
+ "Only plain HTTP is currenlty supported (no HTTPS).")
+
+ self.server_url = '{}:{}'.format(url.hostname, url.port)
+
+ def _get_cascache(self):
+ if self.cascache is None:
+ self.cascache = CASCache(self._get_context())
+ self.cascache.setup_remotes(use_config=True)
+ return self.cascache
+
+ def run_remote_command(self, command, input_root_digest, working_directory, environment):
+ # Sends an execution request to the remote execution server.
+ #
+ # This function blocks until it gets a response from the server.
+ #
+ environment_variables = [remote_execution_pb2.Command.
+ EnvironmentVariable(name=k, value=v)
+ for (k, v) in environment.items()]
+
+ # Create and send the Command object.
+ remote_command = remote_execution_pb2.Command(arguments=command,
+ working_directory=working_directory,
+ environment_variables=environment_variables,
+ output_files=[],
+ output_directories=[self._output_directory],
+ platform=None)
+
+ cascache = self._get_cascache()
+ # Upload the Command message to the remote CAS server
+ command_digest = cascache.push_message(self._get_project(), remote_command)
+ if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
+ # Command push failed
+ return None
+
+ # Create and send the action.
+ action = remote_execution_pb2.Action(command_digest=command_digest,
+ input_root_digest=input_root_digest,
+ timeout=None,
+ do_not_cache=False)
+
+ # Upload the Action message to the remote CAS server
+ action_digest = cascache.push_message(self._get_project(), action)
+ if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest):
+ # Action push failed
+ return None
+
+ # Next, try to create a communication channel to the BuildGrid server.
+ channel = grpc.insecure_channel(self.server_url)
+ stub = remote_execution_pb2_grpc.ExecutionStub(channel)
+ request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
+ skip_cache_lookup=False)
+ try:
+ operation_iterator = stub.Execute(request)
+ except grpc.RpcError:
+ return None
+
+ operation = None
+ with self._get_context().timed_activity("Waiting for the remote build to complete"):
+ # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
+ # which will check the server is actually contactable. However, calling it when the
+ # server is available seems to cause .code() to hang forever.
+ for operation in operation_iterator:
+ if operation.done:
+ break
+
+ return operation
+
+ def process_job_output(self, output_directories, output_files):
+ # Reads the remote execution server response to an execution request.
+ #
+ # output_directories is an array of OutputDirectory objects.
+ # output_files is an array of OutputFile objects.
+ #
+ # We only specify one output_directory, so it's an error
+ # for there to be any output files or more than one directory at the moment.
+ #
+ if output_files:
+ raise SandboxError("Output files were returned when we didn't request any.")
+ elif not output_directories:
+ error_text = "No output directory was returned from the build server."
+ raise SandboxError(error_text)
+ elif len(output_directories) > 1:
+ error_text = "More than one output directory was returned from the build server: {}."
+ raise SandboxError(error_text.format(output_directories))
+
+ tree_digest = output_directories[0].tree_digest
+ if tree_digest is None or not tree_digest.hash:
+ raise SandboxError("Output directory structure had no digest attached.")
+
+ cascache = self._get_cascache()
+ # Now do a pull to ensure we have the necessary parts.
+ dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
+ if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
+ raise SandboxError("Output directory structure pulling from remote failed.")
+
+ path_components = os.path.split(self._output_directory)
+
+ # Now what we have is a digest for the output. Once we return, the calling process will
+ # attempt to descend into our directory and find that directory, so we need to overwrite
+ # that.
+
+ if not path_components:
+ # The artifact wants the whole directory; we could just return the returned hash in its
+ # place, but we don't have a means to do that yet.
+ raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
+
+ # At the moment, we will get the whole directory back in the first directory argument and we need
+ # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
+ # from another hash will be interesting, though...
+
+ new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
+ self._set_virtual_directory(new_dir)
+
+ def run(self, command, flags, *, cwd=None, env=None):
+ # Upload sources
+ upload_vdir = self.get_virtual_directory()
+
+ if isinstance(upload_vdir, FileBasedDirectory):
+ # Make a new temporary directory to put source in
+ upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
+ upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
+
+ upload_vdir.recalculate_hash()
+
+ cascache = self._get_cascache()
+ # Now, push that key (without necessarily needing a ref) to the remote.
+ vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
+ if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
+ raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
+
+ # Set up environment and working directory
+ if cwd is None:
+ cwd = self._get_work_directory()
+
+ if cwd is None:
+ cwd = '/'
+
+ if env is None:
+ env = self._get_environment()
+
+ # We want command args as a list of strings
+ if isinstance(command, str):
+ command = [command]
+
+ # Now transmit the command to execute
+ operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
+
+ if operation is None:
+ # Failure of remote execution, usually due to an error in BuildStream
+ # NB This error could be raised in __run_remote_command
+ raise SandboxError("No response returned from server")
+
+ assert not operation.HasField('error') and operation.HasField('response')
+
+ execution_response = remote_execution_pb2.ExecuteResponse()
+ # The response is expected to be an ExecutionResponse message
+ assert operation.response.Is(execution_response.DESCRIPTOR)
+
+ operation.response.Unpack(execution_response)
+
+ if execution_response.status.code != 0:
+ # A normal error during the build: the remote execution system
+ # has worked correctly but the command failed.
+ # execution_response.error also contains 'message' (str) and
+ # 'details' (iterator of Any) which we ignore at the moment.
+ return execution_response.status.code
+
+ action_result = execution_response.result
+
+ self.process_job_output(action_result.output_directories, action_result.output_files)
+
+ return 0