diff options
author | Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> | 2018-12-05 16:36:00 +0000 |
---|---|---|
committer | Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> | 2018-12-05 16:36:00 +0000 |
commit | a53d6d1f81913d639175cbd503d9d71ec7133a4e (patch) | |
tree | eb5f8d46ac8cafe8b51744f151463bf01f23d18f | |
parent | eb1ed4108b75ed919f471e448fa6088b1503a328 (diff) | |
parent | 9d77351f0b05f6b37b4235742ac3a0ca2d6feb93 (diff) | |
download | buildstream-a53d6d1f81913d639175cbd503d9d71ec7133a4e.tar.gz |
Merge branch 'raoul/628-RE-flow-optimisation' into 'master'
Remote-execution client flow optimisation
Closes #628
See merge request BuildStream/buildstream!982
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 5 | ||||
-rw-r--r-- | buildstream/sandbox/_sandboxremote.py | 197 | ||||
-rw-r--r-- | buildstream/utils.py | 17 | ||||
-rw-r--r-- | doc/source/format_project.rst | 7 |
4 files changed, 152 insertions, 74 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 2ae36d22a..9ca757d4d 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -427,10 +427,7 @@ class CASCache(): def push_message(self, remote, message): message_buffer = message.SerializeToString() - message_sha = hashlib.sha256(message_buffer) - message_digest = remote_execution_pb2.Digest() - message_digest.hash = message_sha.hexdigest() - message_digest.size_bytes = len(message_buffer) + message_digest = utils._message_digest(message_buffer) remote.init() diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py index a967629fe..52e450fb5 100644 --- a/buildstream/sandbox/_sandboxremote.py +++ b/buildstream/sandbox/_sandboxremote.py @@ -26,6 +26,8 @@ from functools import partial import grpc +from .. import utils +from .._message import Message, MessageType from . import Sandbox, SandboxCommandError from .sandbox import _SandboxBatch from ..storage._filebaseddirectory import FileBasedDirectory @@ -39,7 +41,7 @@ from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc from .._artifactcache.cascache import CASRemote, CASRemoteSpec -class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service')): +class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')): pass @@ -59,6 +61,10 @@ class SandboxRemote(Sandbox): self.storage_url = config.storage_service['url'] self.exec_url = config.exec_service['url'] + if config.action_service: + self.action_url = config.action_service['url'] + else: + self.action_url = None self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True, server_cert=config.storage_service['server-cert'], @@ -66,6 +72,9 @@ class SandboxRemote(Sandbox): client_cert=config.storage_service['client-cert']) self.operation_name = None + def info(self, msg): + self._get_context().message(Message(None, MessageType.INFO, msg)) + @staticmethod def specs_from_config_node(config_node, basedir): @@ -88,12 +97,19 @@ class SandboxRemote(Sandbox): tls_keys = ['client-key', 'client-cert', 'server-cert'] - _yaml.node_validate(remote_config, ['execution-service', 'storage-service', 'url']) + _yaml.node_validate( + remote_config, + ['execution-service', 'storage-service', 'url', 'action-cache-service']) remote_exec_service_config = require_node(remote_config, 'execution-service') remote_exec_storage_config = require_node(remote_config, 'storage-service') + remote_exec_action_config = remote_config.get('action-cache-service') _yaml.node_validate(remote_exec_service_config, ['url']) _yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys) + if remote_exec_action_config: + _yaml.node_validate(remote_exec_action_config, ['url']) + else: + remote_config['action-service'] = None if 'url' in remote_config: if 'execution-service' not in remote_config: @@ -114,59 +130,17 @@ class SandboxRemote(Sandbox): "remote-execution configuration. Your config is missing '{}'." .format(str(provenance), tls_keys, key)) - spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service']) + spec = RemoteExecutionSpec(remote_config['execution-service'], + remote_config['storage-service'], + remote_config['action-cache-service']) return spec - def run_remote_command(self, command, input_root_digest, working_directory, environment): + def run_remote_command(self, channel, action_digest): # Sends an execution request to the remote execution server. # # This function blocks until it gets a response from the server. - # - environment_variables = [remote_execution_pb2.Command. - EnvironmentVariable(name=k, value=v) - for (k, v) in environment.items()] - - config = self._get_config() - platform = remote_execution_pb2.Platform() - platform.properties.extend([remote_execution_pb2.Platform. - Property(name="OSFamily", value=config.build_os), - remote_execution_pb2.Platform. - Property(name="ISA", value=config.build_arch)]) - - # Create and send the Command object. - remote_command = remote_execution_pb2.Command(arguments=command, - working_directory=working_directory, - environment_variables=environment_variables, - output_files=[], - output_directories=[self._output_directory], - platform=platform) - context = self._get_context() - cascache = context.get_cascache() - casremote = CASRemote(self.storage_remote_spec) - - # Upload the Command message to the remote CAS server - command_digest = cascache.push_message(casremote, remote_command) - - # Create and send the action. - action = remote_execution_pb2.Action(command_digest=command_digest, - input_root_digest=input_root_digest, - timeout=None, - do_not_cache=False) - - # Upload the Action message to the remote CAS server - action_digest = cascache.push_message(casremote, action) - - # Next, try to create a communication channel to the BuildGrid server. - url = urlparse(self.exec_url) - if not url.port: - raise SandboxError("You must supply a protocol and port number in the execution-service url, " - "for example: http://buildservice:50051.") - if url.scheme == 'http': - channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) - else: - raise SandboxError("Remote execution currently only supports the 'http' protocol " - "and '{}' was supplied.".format(url.scheme)) + # Try to create a communication channel to the BuildGrid server. stub = remote_execution_pb2_grpc.ExecutionStub(channel) request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest, skip_cache_lookup=False) @@ -286,13 +260,12 @@ class SandboxRemote(Sandbox): # to replace the sandbox's virtual directory with that. Creating a new virtual directory object # from another hash will be interesting, though... - new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest) + new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest) self._set_virtual_directory(new_dir) def _run(self, command, flags, *, cwd, env): - # Upload sources + # set up virtual dircetory upload_vdir = self.get_virtual_directory() - cascache = self._get_context().get_cascache() if isinstance(upload_vdir, FileBasedDirectory): # Make a new temporary directory to put source in @@ -301,16 +274,111 @@ class SandboxRemote(Sandbox): upload_vdir.recalculate_hash() - casremote = CASRemote(self.storage_remote_spec) - # Now, push that key (without necessarily needing a ref) to the remote. + # Generate action_digest first + input_root_digest = upload_vdir.ref + command_proto = self._create_command(command, cwd, env) + command_digest = utils._message_digest(command_proto.SerializeToString()) + action = remote_execution_pb2.Action(command_digest=command_digest, + input_root_digest=input_root_digest) + action_digest = utils._message_digest(action.SerializeToString()) + + # Next, try to create a communication channel to the BuildGrid server. + url = urlparse(self.exec_url) + if not url.port: + raise SandboxError("You must supply a protocol and port number in the execution-service url, " + "for example: http://buildservice:50051.") + if url.scheme == 'http': + channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) + else: + raise SandboxError("Remote execution currently only supports the 'http' protocol " + "and '{}' was supplied.".format(url.scheme)) + + # check action cache download and download if there + action_result = self._check_action_cache(action_digest) + + if not action_result: + casremote = CASRemote(self.storage_remote_spec) + + # Now, push that key (without necessarily needing a ref) to the remote. + try: + cascache.push_directory(casremote, upload_vdir) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + + if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref): + raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.") + + # Push command and action + try: + cascache.push_message(casremote, command_proto) + except grpc.RpcError as e: + raise SandboxError("Failed to push command to remote: {}".format(e)) + + try: + cascache.push_message(casremote, action) + except grpc.RpcError as e: + raise SandboxError("Failed to push action to remote: {}".format(e)) + + # Now request to execute the action + operation = self.run_remote_command(channel, action_digest) + action_result = self._extract_action_result(operation) + + if action_result.exit_code != 0: + # A normal error during the build: the remote execution system + # has worked correctly but the command failed. + # action_result.stdout and action_result.stderr also contains + # build command outputs which we ignore at the moment. + return action_result.exit_code + + # Get output of build + self.process_job_output(action_result.output_directories, action_result.output_files) + + return 0 + + def _check_action_cache(self, action_digest): + # Checks the action cache to see if this artifact has already been built + # + # Should return either the action response or None if not found, raise + # Sandboxerror if other grpc error was raised + if not self.action_url: + return None + url = urlparse(self.action_url) + if not url.port: + raise SandboxError("You must supply a protocol and port number in the action-cache-service url, " + "for example: http://buildservice:50051.") + if not url.scheme == "http": + raise SandboxError("Currently only support http for the action cache" + "and {} was supplied".format(url.scheme)) + + channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) + request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest) + stub = remote_execution_pb2_grpc.ActionCacheStub(channel) try: - cascache.push_directory(casremote, upload_vdir) + result = stub.GetActionResult(request) except grpc.RpcError as e: - raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SandboxError("Failed to query action cache: {} ({})" + .format(e.code(), e.details())) + else: + return None + else: + self.info("Action result found in action cache") + return result - # Now transmit the command to execute - operation = self.run_remote_command(command, upload_vdir.ref, cwd, env) + def _create_command(self, command, working_directory, environment): + # Creates a command proto + environment_variables = [remote_execution_pb2.Command. + EnvironmentVariable(name=k, value=v) + for (k, v) in environment.items()] + return remote_execution_pb2.Command(arguments=command, + working_directory=working_directory, + environment_variables=environment_variables, + output_files=[], + output_directories=[self._output_directory], + platform=None) + @staticmethod + def _extract_action_result(operation): if operation is None: # Failure of remote execution, usually due to an error in BuildStream raise SandboxError("No response returned from server") @@ -331,18 +399,7 @@ class SandboxRemote(Sandbox): else: raise SandboxError("Remote server failed at executing the build request.") - action_result = execution_response.result - - if action_result.exit_code != 0: - # A normal error during the build: the remote execution system - # has worked correctly but the command failed. - # action_result.stdout and action_result.stderr also contains - # build command outputs which we ignore at the moment. - return action_result.exit_code - - self.process_job_output(action_result.output_directories, action_result.output_files) - - return 0 + return execution_response.result def _create_batch(self, main_group, flags, *, collect=None): return _SandboxRemoteBatch(self, main_group, flags, collect=collect) diff --git a/buildstream/utils.py b/buildstream/utils.py index 94c990357..2fe9ab7dc 100644 --- a/buildstream/utils.py +++ b/buildstream/utils.py @@ -41,6 +41,7 @@ import psutil from . import _signals from ._exceptions import BstError, ErrorDomain +from ._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 # The magic number for timestamps: 2011-11-11 11:11:11 _magic_timestamp = calendar.timegm([2011, 11, 11, 11, 11, 11]) @@ -1242,3 +1243,19 @@ def _deduplicate(iterable, key=None): def _get_link_mtime(path): path_stat = os.lstat(path) return path_stat.st_mtime + + +# _message_digest() +# +# Args: +# message_buffer (str): String to create digest of +# +# Returns: +# (remote_execution_pb2.Digest): Content digest +# +def _message_digest(message_buffer): + sha = hashlib.sha256(message_buffer) + digest = remote_execution_pb2.Digest() + digest.hash = sha.hexdigest() + digest.size_bytes = len(message_buffer) + return digest diff --git a/doc/source/format_project.rst b/doc/source/format_project.rst index 59ee05e85..a85bf6470 100644 --- a/doc/source/format_project.rst +++ b/doc/source/format_project.rst @@ -238,6 +238,8 @@ using the `remote-execution` option: server-cert: server.crt client-cert: client.crt client-key: client.key + action-cache-service: + url: http://bar.action.com:50052 The execution-service part of remote execution does not support encrypted connections yet, so the protocol must always be http. @@ -245,6 +247,11 @@ connections yet, so the protocol must always be http. storage-service specifies a remote CAS store and the parameters are the same as those used to specify an :ref:`artifact server <artifacts>`. +The action-cache-service specifies where built actions are cached, allowing +buildstream to check whether an action has already been executed and download it +if so. This is similar to the artifact cache but REAPI specified, and is +optional for remote execution to work. + The storage service may be the same endpoint used for artifact caching. Remote execution cannot work without push access to the storage endpoint, so you must specify a client certificate and key, |