diff options
author | Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> | 2018-11-29 16:08:26 +0000 |
---|---|---|
committer | Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> | 2018-12-05 15:33:44 +0000 |
commit | 4a56bf2a3a3c2b332a5e832b1f58c027a6608d07 (patch) | |
tree | 6fdce31e65e54edc00f271b00a5db22e8976caed | |
parent | 9ef1a8a261061eb7d44ca2ae91cb84cd24f0c11e (diff) | |
download | buildstream-4a56bf2a3a3c2b332a5e832b1f58c027a6608d07.tar.gz |
_sandboxremote.py: Add checks to action cache before attempting to push.
Stops unneccesary pushing of builds that have already been built, just checks
the action cache to begin with.
Fixes #628
-rw-r--r-- | buildstream/sandbox/_sandboxremote.py | 160 |
1 files changed, 93 insertions, 67 deletions
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py index a967629fe..2e596be32 100644 --- a/buildstream/sandbox/_sandboxremote.py +++ b/buildstream/sandbox/_sandboxremote.py @@ -26,6 +26,7 @@ from functools import partial import grpc +from .. import utils from . import Sandbox, SandboxCommandError from .sandbox import _SandboxBatch from ..storage._filebaseddirectory import FileBasedDirectory @@ -117,56 +118,12 @@ class SandboxRemote(Sandbox): spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service']) return spec - def run_remote_command(self, command, input_root_digest, working_directory, environment): + def run_remote_command(self, channel, action_digest): # Sends an execution request to the remote execution server. # # This function blocks until it gets a response from the server. - # - environment_variables = [remote_execution_pb2.Command. - EnvironmentVariable(name=k, value=v) - for (k, v) in environment.items()] - - config = self._get_config() - platform = remote_execution_pb2.Platform() - platform.properties.extend([remote_execution_pb2.Platform. - Property(name="OSFamily", value=config.build_os), - remote_execution_pb2.Platform. - Property(name="ISA", value=config.build_arch)]) - - # Create and send the Command object. - remote_command = remote_execution_pb2.Command(arguments=command, - working_directory=working_directory, - environment_variables=environment_variables, - output_files=[], - output_directories=[self._output_directory], - platform=platform) - context = self._get_context() - cascache = context.get_cascache() - casremote = CASRemote(self.storage_remote_spec) - - # Upload the Command message to the remote CAS server - command_digest = cascache.push_message(casremote, remote_command) - - # Create and send the action. - action = remote_execution_pb2.Action(command_digest=command_digest, - input_root_digest=input_root_digest, - timeout=None, - do_not_cache=False) - - # Upload the Action message to the remote CAS server - action_digest = cascache.push_message(casremote, action) - - # Next, try to create a communication channel to the BuildGrid server. - url = urlparse(self.exec_url) - if not url.port: - raise SandboxError("You must supply a protocol and port number in the execution-service url, " - "for example: http://buildservice:50051.") - if url.scheme == 'http': - channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) - else: - raise SandboxError("Remote execution currently only supports the 'http' protocol " - "and '{}' was supplied.".format(url.scheme)) + # Try to create a communication channel to the BuildGrid server. stub = remote_execution_pb2_grpc.ExecutionStub(channel) request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest, skip_cache_lookup=False) @@ -286,13 +243,12 @@ class SandboxRemote(Sandbox): # to replace the sandbox's virtual directory with that. Creating a new virtual directory object # from another hash will be interesting, though... - new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest) + new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest) self._set_virtual_directory(new_dir) def _run(self, command, flags, *, cwd, env): - # Upload sources + # set up virtual dircetory upload_vdir = self.get_virtual_directory() - cascache = self._get_context().get_cascache() if isinstance(upload_vdir, FileBasedDirectory): # Make a new temporary directory to put source in @@ -301,16 +257,97 @@ class SandboxRemote(Sandbox): upload_vdir.recalculate_hash() - casremote = CASRemote(self.storage_remote_spec) - # Now, push that key (without necessarily needing a ref) to the remote. + # Generate action_digest first + input_root_digest = upload_vdir.ref + command_proto = self._create_command(command, cwd, env) + command_digest = utils._message_digest(command_proto.SerializeToString()) + action = remote_execution_pb2.Action(command_digest=command_digest, + input_root_digest=input_root_digest) + action_digest = utils._message_digest(action.SerializeToString()) + + # Next, try to create a communication channel to the BuildGrid server. + url = urlparse(self.exec_url) + if not url.port: + raise SandboxError("You must supply a protocol and port number in the execution-service url, " + "for example: http://buildservice:50051.") + if url.scheme == 'http': + channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) + else: + raise SandboxError("Remote execution currently only supports the 'http' protocol " + "and '{}' was supplied.".format(url.scheme)) + + # check action cache download and download if there + action_result = self._check_action_cache(channel, action_digest) + + if not action_result: + casremote = CASRemote(self.storage_remote_spec) + + # Now, push that key (without necessarily needing a ref) to the remote. + try: + cascache.push_directory(casremote, upload_vdir) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + + if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref): + raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.") + + # Push command and action + try: + cascache.push_message(casremote, command_proto) + except grpc.RpcError as e: + raise SandboxError("Failed to push command to remote: {}".format(e)) + + try: + cascache.push_message(casremote, action) + except grpc.RpcError as e: + raise SandboxError("Failed to push action to remote: {}".format(e)) + + # Now request to execute the action + operation = self.run_remote_command(channel, action_digest) + action_result = self._extract_action_result(operation) + + if action_result.exit_code != 0: + # A normal error during the build: the remote execution system + # has worked correctly but the command failed. + # action_result.stdout and action_result.stderr also contains + # build command outputs which we ignore at the moment. + return action_result.exit_code + + # Get output of build + self.process_job_output(action_result.output_directories, action_result.output_files) + + return 0 + + def _check_action_cache(self, channel, action_digest): + # Checks the action cache to see if this artifact has already been built + # + # Should return either the action response or None if not found, raise + # Sandboxerror if other grpc error was raised + request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest) + stub = remote_execution_pb2_grpc.ActionCacheStub(channel) try: - cascache.push_directory(casremote, upload_vdir) + return stub.GetActionResult(request) except grpc.RpcError as e: - raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SandboxError("Failed to query action cache: {} ({})" + .format(e.code(), e.details())) + else: + return None - # Now transmit the command to execute - operation = self.run_remote_command(command, upload_vdir.ref, cwd, env) + def _create_command(self, command, working_directory, environment): + # Creates a command proto + environment_variables = [remote_execution_pb2.Command. + EnvironmentVariable(name=k, value=v) + for (k, v) in environment.items()] + return remote_execution_pb2.Command(arguments=command, + working_directory=working_directory, + environment_variables=environment_variables, + output_files=[], + output_directories=[self._output_directory], + platform=None) + @staticmethod + def _extract_action_result(operation): if operation is None: # Failure of remote execution, usually due to an error in BuildStream raise SandboxError("No response returned from server") @@ -331,18 +368,7 @@ class SandboxRemote(Sandbox): else: raise SandboxError("Remote server failed at executing the build request.") - action_result = execution_response.result - - if action_result.exit_code != 0: - # A normal error during the build: the remote execution system - # has worked correctly but the command failed. - # action_result.stdout and action_result.stderr also contains - # build command outputs which we ignore at the moment. - return action_result.exit_code - - self.process_job_output(action_result.output_directories, action_result.output_files) - - return 0 + return execution_response.result def _create_batch(self, main_group, flags, *, collect=None): return _SandboxRemoteBatch(self, main_group, flags, collect=collect) |