summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2018-12-05 16:36:00 +0000
committerRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2018-12-05 16:36:00 +0000
commita53d6d1f81913d639175cbd503d9d71ec7133a4e (patch)
treeeb5f8d46ac8cafe8b51744f151463bf01f23d18f
parenteb1ed4108b75ed919f471e448fa6088b1503a328 (diff)
parent9d77351f0b05f6b37b4235742ac3a0ca2d6feb93 (diff)
downloadbuildstream-a53d6d1f81913d639175cbd503d9d71ec7133a4e.tar.gz
Merge branch 'raoul/628-RE-flow-optimisation' into 'master'
Remote-execution client flow optimisation Closes #628 See merge request BuildStream/buildstream!982
-rw-r--r--buildstream/_artifactcache/cascache.py5
-rw-r--r--buildstream/sandbox/_sandboxremote.py197
-rw-r--r--buildstream/utils.py17
-rw-r--r--doc/source/format_project.rst7
4 files changed, 152 insertions, 74 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 2ae36d22a..9ca757d4d 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -427,10 +427,7 @@ class CASCache():
def push_message(self, remote, message):
message_buffer = message.SerializeToString()
- message_sha = hashlib.sha256(message_buffer)
- message_digest = remote_execution_pb2.Digest()
- message_digest.hash = message_sha.hexdigest()
- message_digest.size_bytes = len(message_buffer)
+ message_digest = utils._message_digest(message_buffer)
remote.init()
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index a967629fe..52e450fb5 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -26,6 +26,8 @@ from functools import partial
import grpc
+from .. import utils
+from .._message import Message, MessageType
from . import Sandbox, SandboxCommandError
from .sandbox import _SandboxBatch
from ..storage._filebaseddirectory import FileBasedDirectory
@@ -39,7 +41,7 @@ from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from .._artifactcache.cascache import CASRemote, CASRemoteSpec
-class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service')):
+class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
pass
@@ -59,6 +61,10 @@ class SandboxRemote(Sandbox):
self.storage_url = config.storage_service['url']
self.exec_url = config.exec_service['url']
+ if config.action_service:
+ self.action_url = config.action_service['url']
+ else:
+ self.action_url = None
self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
server_cert=config.storage_service['server-cert'],
@@ -66,6 +72,9 @@ class SandboxRemote(Sandbox):
client_cert=config.storage_service['client-cert'])
self.operation_name = None
+ def info(self, msg):
+ self._get_context().message(Message(None, MessageType.INFO, msg))
+
@staticmethod
def specs_from_config_node(config_node, basedir):
@@ -88,12 +97,19 @@ class SandboxRemote(Sandbox):
tls_keys = ['client-key', 'client-cert', 'server-cert']
- _yaml.node_validate(remote_config, ['execution-service', 'storage-service', 'url'])
+ _yaml.node_validate(
+ remote_config,
+ ['execution-service', 'storage-service', 'url', 'action-cache-service'])
remote_exec_service_config = require_node(remote_config, 'execution-service')
remote_exec_storage_config = require_node(remote_config, 'storage-service')
+ remote_exec_action_config = remote_config.get('action-cache-service')
_yaml.node_validate(remote_exec_service_config, ['url'])
_yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)
+ if remote_exec_action_config:
+ _yaml.node_validate(remote_exec_action_config, ['url'])
+ else:
+ remote_config['action-service'] = None
if 'url' in remote_config:
if 'execution-service' not in remote_config:
@@ -114,59 +130,17 @@ class SandboxRemote(Sandbox):
"remote-execution configuration. Your config is missing '{}'."
.format(str(provenance), tls_keys, key))
- spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
+ spec = RemoteExecutionSpec(remote_config['execution-service'],
+ remote_config['storage-service'],
+ remote_config['action-cache-service'])
return spec
- def run_remote_command(self, command, input_root_digest, working_directory, environment):
+ def run_remote_command(self, channel, action_digest):
# Sends an execution request to the remote execution server.
#
# This function blocks until it gets a response from the server.
- #
- environment_variables = [remote_execution_pb2.Command.
- EnvironmentVariable(name=k, value=v)
- for (k, v) in environment.items()]
-
- config = self._get_config()
- platform = remote_execution_pb2.Platform()
- platform.properties.extend([remote_execution_pb2.Platform.
- Property(name="OSFamily", value=config.build_os),
- remote_execution_pb2.Platform.
- Property(name="ISA", value=config.build_arch)])
-
- # Create and send the Command object.
- remote_command = remote_execution_pb2.Command(arguments=command,
- working_directory=working_directory,
- environment_variables=environment_variables,
- output_files=[],
- output_directories=[self._output_directory],
- platform=platform)
- context = self._get_context()
- cascache = context.get_cascache()
- casremote = CASRemote(self.storage_remote_spec)
-
- # Upload the Command message to the remote CAS server
- command_digest = cascache.push_message(casremote, remote_command)
-
- # Create and send the action.
- action = remote_execution_pb2.Action(command_digest=command_digest,
- input_root_digest=input_root_digest,
- timeout=None,
- do_not_cache=False)
-
- # Upload the Action message to the remote CAS server
- action_digest = cascache.push_message(casremote, action)
-
- # Next, try to create a communication channel to the BuildGrid server.
- url = urlparse(self.exec_url)
- if not url.port:
- raise SandboxError("You must supply a protocol and port number in the execution-service url, "
- "for example: http://buildservice:50051.")
- if url.scheme == 'http':
- channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
- else:
- raise SandboxError("Remote execution currently only supports the 'http' protocol "
- "and '{}' was supplied.".format(url.scheme))
+ # Try to create a communication channel to the BuildGrid server.
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
skip_cache_lookup=False)
@@ -286,13 +260,12 @@ class SandboxRemote(Sandbox):
# to replace the sandbox's virtual directory with that. Creating a new virtual directory object
# from another hash will be interesting, though...
- new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
+ new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
self._set_virtual_directory(new_dir)
def _run(self, command, flags, *, cwd, env):
- # Upload sources
+ # set up virtual dircetory
upload_vdir = self.get_virtual_directory()
-
cascache = self._get_context().get_cascache()
if isinstance(upload_vdir, FileBasedDirectory):
# Make a new temporary directory to put source in
@@ -301,16 +274,111 @@ class SandboxRemote(Sandbox):
upload_vdir.recalculate_hash()
- casremote = CASRemote(self.storage_remote_spec)
- # Now, push that key (without necessarily needing a ref) to the remote.
+ # Generate action_digest first
+ input_root_digest = upload_vdir.ref
+ command_proto = self._create_command(command, cwd, env)
+ command_digest = utils._message_digest(command_proto.SerializeToString())
+ action = remote_execution_pb2.Action(command_digest=command_digest,
+ input_root_digest=input_root_digest)
+ action_digest = utils._message_digest(action.SerializeToString())
+
+ # Next, try to create a communication channel to the BuildGrid server.
+ url = urlparse(self.exec_url)
+ if not url.port:
+ raise SandboxError("You must supply a protocol and port number in the execution-service url, "
+ "for example: http://buildservice:50051.")
+ if url.scheme == 'http':
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
+ else:
+ raise SandboxError("Remote execution currently only supports the 'http' protocol "
+ "and '{}' was supplied.".format(url.scheme))
+
+ # check action cache download and download if there
+ action_result = self._check_action_cache(action_digest)
+
+ if not action_result:
+ casremote = CASRemote(self.storage_remote_spec)
+
+ # Now, push that key (without necessarily needing a ref) to the remote.
+ try:
+ cascache.push_directory(casremote, upload_vdir)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+
+ if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
+ raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
+
+ # Push command and action
+ try:
+ cascache.push_message(casremote, command_proto)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push command to remote: {}".format(e))
+
+ try:
+ cascache.push_message(casremote, action)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push action to remote: {}".format(e))
+
+ # Now request to execute the action
+ operation = self.run_remote_command(channel, action_digest)
+ action_result = self._extract_action_result(operation)
+
+ if action_result.exit_code != 0:
+ # A normal error during the build: the remote execution system
+ # has worked correctly but the command failed.
+ # action_result.stdout and action_result.stderr also contains
+ # build command outputs which we ignore at the moment.
+ return action_result.exit_code
+
+ # Get output of build
+ self.process_job_output(action_result.output_directories, action_result.output_files)
+
+ return 0
+
+ def _check_action_cache(self, action_digest):
+ # Checks the action cache to see if this artifact has already been built
+ #
+ # Should return either the action response or None if not found, raise
+ # Sandboxerror if other grpc error was raised
+ if not self.action_url:
+ return None
+ url = urlparse(self.action_url)
+ if not url.port:
+ raise SandboxError("You must supply a protocol and port number in the action-cache-service url, "
+ "for example: http://buildservice:50051.")
+ if not url.scheme == "http":
+ raise SandboxError("Currently only support http for the action cache"
+ "and {} was supplied".format(url.scheme))
+
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
+ request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
+ stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
try:
- cascache.push_directory(casremote, upload_vdir)
+ result = stub.GetActionResult(request)
except grpc.RpcError as e:
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SandboxError("Failed to query action cache: {} ({})"
+ .format(e.code(), e.details()))
+ else:
+ return None
+ else:
+ self.info("Action result found in action cache")
+ return result
- # Now transmit the command to execute
- operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
+ def _create_command(self, command, working_directory, environment):
+ # Creates a command proto
+ environment_variables = [remote_execution_pb2.Command.
+ EnvironmentVariable(name=k, value=v)
+ for (k, v) in environment.items()]
+ return remote_execution_pb2.Command(arguments=command,
+ working_directory=working_directory,
+ environment_variables=environment_variables,
+ output_files=[],
+ output_directories=[self._output_directory],
+ platform=None)
+ @staticmethod
+ def _extract_action_result(operation):
if operation is None:
# Failure of remote execution, usually due to an error in BuildStream
raise SandboxError("No response returned from server")
@@ -331,18 +399,7 @@ class SandboxRemote(Sandbox):
else:
raise SandboxError("Remote server failed at executing the build request.")
- action_result = execution_response.result
-
- if action_result.exit_code != 0:
- # A normal error during the build: the remote execution system
- # has worked correctly but the command failed.
- # action_result.stdout and action_result.stderr also contains
- # build command outputs which we ignore at the moment.
- return action_result.exit_code
-
- self.process_job_output(action_result.output_directories, action_result.output_files)
-
- return 0
+ return execution_response.result
def _create_batch(self, main_group, flags, *, collect=None):
return _SandboxRemoteBatch(self, main_group, flags, collect=collect)
diff --git a/buildstream/utils.py b/buildstream/utils.py
index 94c990357..2fe9ab7dc 100644
--- a/buildstream/utils.py
+++ b/buildstream/utils.py
@@ -41,6 +41,7 @@ import psutil
from . import _signals
from ._exceptions import BstError, ErrorDomain
+from ._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
# The magic number for timestamps: 2011-11-11 11:11:11
_magic_timestamp = calendar.timegm([2011, 11, 11, 11, 11, 11])
@@ -1242,3 +1243,19 @@ def _deduplicate(iterable, key=None):
def _get_link_mtime(path):
path_stat = os.lstat(path)
return path_stat.st_mtime
+
+
+# _message_digest()
+#
+# Args:
+# message_buffer (str): String to create digest of
+#
+# Returns:
+# (remote_execution_pb2.Digest): Content digest
+#
+def _message_digest(message_buffer):
+ sha = hashlib.sha256(message_buffer)
+ digest = remote_execution_pb2.Digest()
+ digest.hash = sha.hexdigest()
+ digest.size_bytes = len(message_buffer)
+ return digest
diff --git a/doc/source/format_project.rst b/doc/source/format_project.rst
index 59ee05e85..a85bf6470 100644
--- a/doc/source/format_project.rst
+++ b/doc/source/format_project.rst
@@ -238,6 +238,8 @@ using the `remote-execution` option:
server-cert: server.crt
client-cert: client.crt
client-key: client.key
+ action-cache-service:
+ url: http://bar.action.com:50052
The execution-service part of remote execution does not support encrypted
connections yet, so the protocol must always be http.
@@ -245,6 +247,11 @@ connections yet, so the protocol must always be http.
storage-service specifies a remote CAS store and the parameters are the
same as those used to specify an :ref:`artifact server <artifacts>`.
+The action-cache-service specifies where built actions are cached, allowing
+buildstream to check whether an action has already been executed and download it
+if so. This is similar to the artifact cache but REAPI specified, and is
+optional for remote execution to work.
+
The storage service may be the same endpoint used for artifact
caching. Remote execution cannot work without push access to the
storage endpoint, so you must specify a client certificate and key,