summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2018-11-29 16:08:26 +0000
committerRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2018-12-05 15:33:44 +0000
commit4a56bf2a3a3c2b332a5e832b1f58c027a6608d07 (patch)
tree6fdce31e65e54edc00f271b00a5db22e8976caed
parent9ef1a8a261061eb7d44ca2ae91cb84cd24f0c11e (diff)
downloadbuildstream-4a56bf2a3a3c2b332a5e832b1f58c027a6608d07.tar.gz
_sandboxremote.py: Add checks to action cache before attempting to push.
Stops unneccesary pushing of builds that have already been built, just checks the action cache to begin with. Fixes #628
-rw-r--r--buildstream/sandbox/_sandboxremote.py160
1 files changed, 93 insertions, 67 deletions
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index a967629fe..2e596be32 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -26,6 +26,7 @@ from functools import partial
import grpc
+from .. import utils
from . import Sandbox, SandboxCommandError
from .sandbox import _SandboxBatch
from ..storage._filebaseddirectory import FileBasedDirectory
@@ -117,56 +118,12 @@ class SandboxRemote(Sandbox):
spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
return spec
- def run_remote_command(self, command, input_root_digest, working_directory, environment):
+ def run_remote_command(self, channel, action_digest):
# Sends an execution request to the remote execution server.
#
# This function blocks until it gets a response from the server.
- #
- environment_variables = [remote_execution_pb2.Command.
- EnvironmentVariable(name=k, value=v)
- for (k, v) in environment.items()]
-
- config = self._get_config()
- platform = remote_execution_pb2.Platform()
- platform.properties.extend([remote_execution_pb2.Platform.
- Property(name="OSFamily", value=config.build_os),
- remote_execution_pb2.Platform.
- Property(name="ISA", value=config.build_arch)])
-
- # Create and send the Command object.
- remote_command = remote_execution_pb2.Command(arguments=command,
- working_directory=working_directory,
- environment_variables=environment_variables,
- output_files=[],
- output_directories=[self._output_directory],
- platform=platform)
- context = self._get_context()
- cascache = context.get_cascache()
- casremote = CASRemote(self.storage_remote_spec)
-
- # Upload the Command message to the remote CAS server
- command_digest = cascache.push_message(casremote, remote_command)
-
- # Create and send the action.
- action = remote_execution_pb2.Action(command_digest=command_digest,
- input_root_digest=input_root_digest,
- timeout=None,
- do_not_cache=False)
-
- # Upload the Action message to the remote CAS server
- action_digest = cascache.push_message(casremote, action)
-
- # Next, try to create a communication channel to the BuildGrid server.
- url = urlparse(self.exec_url)
- if not url.port:
- raise SandboxError("You must supply a protocol and port number in the execution-service url, "
- "for example: http://buildservice:50051.")
- if url.scheme == 'http':
- channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
- else:
- raise SandboxError("Remote execution currently only supports the 'http' protocol "
- "and '{}' was supplied.".format(url.scheme))
+ # Try to create a communication channel to the BuildGrid server.
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
skip_cache_lookup=False)
@@ -286,13 +243,12 @@ class SandboxRemote(Sandbox):
# to replace the sandbox's virtual directory with that. Creating a new virtual directory object
# from another hash will be interesting, though...
- new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
+ new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
self._set_virtual_directory(new_dir)
def _run(self, command, flags, *, cwd, env):
- # Upload sources
+ # set up virtual dircetory
upload_vdir = self.get_virtual_directory()
-
cascache = self._get_context().get_cascache()
if isinstance(upload_vdir, FileBasedDirectory):
# Make a new temporary directory to put source in
@@ -301,16 +257,97 @@ class SandboxRemote(Sandbox):
upload_vdir.recalculate_hash()
- casremote = CASRemote(self.storage_remote_spec)
- # Now, push that key (without necessarily needing a ref) to the remote.
+ # Generate action_digest first
+ input_root_digest = upload_vdir.ref
+ command_proto = self._create_command(command, cwd, env)
+ command_digest = utils._message_digest(command_proto.SerializeToString())
+ action = remote_execution_pb2.Action(command_digest=command_digest,
+ input_root_digest=input_root_digest)
+ action_digest = utils._message_digest(action.SerializeToString())
+
+ # Next, try to create a communication channel to the BuildGrid server.
+ url = urlparse(self.exec_url)
+ if not url.port:
+ raise SandboxError("You must supply a protocol and port number in the execution-service url, "
+ "for example: http://buildservice:50051.")
+ if url.scheme == 'http':
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
+ else:
+ raise SandboxError("Remote execution currently only supports the 'http' protocol "
+ "and '{}' was supplied.".format(url.scheme))
+
+ # check action cache download and download if there
+ action_result = self._check_action_cache(channel, action_digest)
+
+ if not action_result:
+ casremote = CASRemote(self.storage_remote_spec)
+
+ # Now, push that key (without necessarily needing a ref) to the remote.
+ try:
+ cascache.push_directory(casremote, upload_vdir)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+
+ if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
+ raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
+
+ # Push command and action
+ try:
+ cascache.push_message(casremote, command_proto)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push command to remote: {}".format(e))
+
+ try:
+ cascache.push_message(casremote, action)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push action to remote: {}".format(e))
+
+ # Now request to execute the action
+ operation = self.run_remote_command(channel, action_digest)
+ action_result = self._extract_action_result(operation)
+
+ if action_result.exit_code != 0:
+ # A normal error during the build: the remote execution system
+ # has worked correctly but the command failed.
+ # action_result.stdout and action_result.stderr also contains
+ # build command outputs which we ignore at the moment.
+ return action_result.exit_code
+
+ # Get output of build
+ self.process_job_output(action_result.output_directories, action_result.output_files)
+
+ return 0
+
+ def _check_action_cache(self, channel, action_digest):
+ # Checks the action cache to see if this artifact has already been built
+ #
+ # Should return either the action response or None if not found, raise
+ # Sandboxerror if other grpc error was raised
+ request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
+ stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
try:
- cascache.push_directory(casremote, upload_vdir)
+ return stub.GetActionResult(request)
except grpc.RpcError as e:
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SandboxError("Failed to query action cache: {} ({})"
+ .format(e.code(), e.details()))
+ else:
+ return None
- # Now transmit the command to execute
- operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
+ def _create_command(self, command, working_directory, environment):
+ # Creates a command proto
+ environment_variables = [remote_execution_pb2.Command.
+ EnvironmentVariable(name=k, value=v)
+ for (k, v) in environment.items()]
+ return remote_execution_pb2.Command(arguments=command,
+ working_directory=working_directory,
+ environment_variables=environment_variables,
+ output_files=[],
+ output_directories=[self._output_directory],
+ platform=None)
+ @staticmethod
+ def _extract_action_result(operation):
if operation is None:
# Failure of remote execution, usually due to an error in BuildStream
raise SandboxError("No response returned from server")
@@ -331,18 +368,7 @@ class SandboxRemote(Sandbox):
else:
raise SandboxError("Remote server failed at executing the build request.")
- action_result = execution_response.result
-
- if action_result.exit_code != 0:
- # A normal error during the build: the remote execution system
- # has worked correctly but the command failed.
- # action_result.stdout and action_result.stderr also contains
- # build command outputs which we ignore at the moment.
- return action_result.exit_code
-
- self.process_job_output(action_result.output_directories, action_result.output_files)
-
- return 0
+ return execution_response.result
def _create_batch(self, main_group, flags, *, collect=None):
return _SandboxRemoteBatch(self, main_group, flags, collect=collect)