summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim MacArthur <jim.macarthur@codethink.co.uk>2018-10-11 20:57:09 +0100
committerJim MacArthur <jim.macarthur@codethink.co.uk>2018-10-11 20:57:09 +0100
commit046739aa1cfe6e78f1485a586c706b106d363a2a (patch)
tree3dc6d36654100712bb71729df8cab672994d3d37
parenta0712eadd423c2bc98f195d972030455d65a81ae (diff)
downloadbuildstream-jmac/hackathon-rbe-execution.tar.gz
A series of terrible hacks to make BuildStream run things on the RBE test instance. Missing various checks.jmac/hackathon-rbe-execution
-rw-r--r--buildstream/_artifactcache/cascache.py94
-rw-r--r--buildstream/sandbox/_sandboxremote.py24
2 files changed, 56 insertions, 62 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 3e63608be..2d79abdcd 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -43,6 +43,11 @@ from .._exceptions import ArtifactError
from . import ArtifactCache
+from google.oauth2 import service_account
+import google
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
+
# The default limit for gRPC messages is 4 MiB.
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
@@ -232,7 +237,7 @@ class CASCache(ArtifactCache):
ref = self.get_artifact_fullname(element, key)
project = element._get_project()
-
+ return False
for remote in self._remotes[project]:
try:
remote.init()
@@ -299,29 +304,8 @@ class CASCache(ArtifactCache):
for ref in refs:
tree = self.resolve_ref(ref)
- # Check whether ref is already on the server in which case
- # there is no need to push the artifact
- try:
- request = buildstream_pb2.GetReferenceRequest()
- request.key = ref
- response = remote.ref_storage.GetReference(request)
-
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
- # ref is already on the server with the same tree
- continue
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- # Intentionally re-raise RpcError for outer except block.
- raise
-
self._send_directory(remote, tree)
- request = buildstream_pb2.UpdateReferenceRequest()
- request.keys.append(ref)
- request.digest.hash = tree.hash
- request.digest.size_bytes = tree.size_bytes
- remote.ref_storage.UpdateReference(request)
skipped_remote = False
except grpc.RpcError as e:
@@ -367,9 +351,9 @@ class CASCache(ArtifactCache):
push_remotes = [r for r in self._remotes[project] if r.spec.push]
- if not push_remotes:
- raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
- "servers are configured as push remotes.")
+ #if not push_remotes:
+ # raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
+ # "servers are configured as push remotes.")
if directory.ref is None:
return
@@ -400,7 +384,7 @@ class CASCache(ArtifactCache):
def _verify_digest_on_remote(self, remote, digest):
# Check whether ref is already on the server in which case
# there is no need to push the artifact
- request = remote_execution_pb2.FindMissingBlobsRequest()
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
request.blob_digests.extend([digest])
response = remote.cas.FindMissingBlobs(request)
@@ -812,13 +796,17 @@ class CASCache(ArtifactCache):
remote = _CASRemote(remote_spec)
remote.init()
- request = buildstream_pb2.StatusRequest()
- response = remote.ref_storage.Status(request)
-
- if remote_spec.push and not response.allow_updates:
+ print("Remote init complete - firing status request...")
+ #request = buildstream_pb2.StatusRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
+ #response = remote.ref_storage.Status(request)
+ print("Status obtained")
+
+ if False:# remote_spec.push:
+ print("Cannot push")
q.put('Artifact server does not allow push')
else:
# No error
+ print("No errors detected")
q.put(None)
except grpc.RpcError as e:
@@ -1000,9 +988,10 @@ class CASCache(ArtifactCache):
return dirdigest
def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
- resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
- digest.hash, str(digest.size_bytes)])
-
+ instance_name="projects/bazelcon18-rbe-shared/instances/default_instance"
+ resource_name = instance_name+'/'+('/'.join(['uploads', str(u_uid), 'blobs',
+ digest.hash, str(digest.size_bytes)]))
+ print("Trying to send resource named: {}".format(resource_name))
def request_stream(resname, instream):
offset = 0
finished = False
@@ -1033,7 +1022,7 @@ class CASCache(ArtifactCache):
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
- request = remote_execution_pb2.FindMissingBlobsRequest()
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
for required_digest in required_blobs_group:
d = request.blob_digests.add()
@@ -1093,28 +1082,14 @@ class _CASRemote():
elif url.scheme == 'https':
port = url.port or 443
- if self.spec.server_cert:
- with open(self.spec.server_cert, 'rb') as f:
- server_cert_bytes = f.read()
- else:
- server_cert_bytes = None
-
- if self.spec.client_key:
- with open(self.spec.client_key, 'rb') as f:
- client_key_bytes = f.read()
- else:
- client_key_bytes = None
+ SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
+ SERVICE_ACCOUNT_FILE = '/tmp/key.json'
- if self.spec.client_cert:
- with open(self.spec.client_cert, 'rb') as f:
- client_cert_bytes = f.read()
- else:
- client_cert_bytes = None
+ credentials = service_account.Credentials.from_service_account_file(
+ SERVICE_ACCOUNT_FILE, scopes=SCOPES)
+ http_request = google_auth_transport_requests.Request()
- credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
- private_key=client_key_bytes,
- certificate_chain=client_cert_bytes)
- self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
+ self.channel = google_auth_transport_grpc.secure_authorized_channel(credentials, http_request, 'remotebuildexecution.googleapis.com:443')
else:
raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
@@ -1125,7 +1100,7 @@ class _CASRemote():
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
try:
- request = remote_execution_pb2.GetCapabilitiesRequest()
+ request = remote_execution_pb2.GetCapabilitiesRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
response = self.capabilities.GetCapabilities(request)
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
@@ -1136,9 +1111,10 @@ class _CASRemote():
raise
# Check whether the server supports BatchReadBlobs()
+ print("Testing BatchBlobsRequest")
self.batch_read_supported = False
try:
- request = remote_execution_pb2.BatchReadBlobsRequest()
+ request = remote_execution_pb2.BatchReadBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
response = self.cas.BatchReadBlobs(request)
self.batch_read_supported = True
except grpc.RpcError as e:
@@ -1146,9 +1122,10 @@ class _CASRemote():
raise
# Check whether the server supports BatchUpdateBlobs()
+ print("Testing BatchUpdateBlobsRequest")
self.batch_update_supported = False
try:
- request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
response = self.cas.BatchUpdateBlobs(request)
self.batch_update_supported = True
except grpc.RpcError as e:
@@ -1156,6 +1133,7 @@ class _CASRemote():
e.code() != grpc.StatusCode.PERMISSION_DENIED):
raise
+ print("Remote init complete")
self._initialized = True
@@ -1209,7 +1187,7 @@ class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
self._size = 0
self._sent = False
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index ab0c31bff..8c677e595 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -29,6 +29,10 @@ from ..storage._casbaseddirectory import CasBasedDirectory
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
+from google.oauth2 import service_account
+import google
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
class SandboxError(Exception):
pass
@@ -49,6 +53,8 @@ class SandboxRemote(Sandbox):
raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
.format(kwargs['server_url']) +
"It should be of the form <protocol>://<domain name>:<port>.")
+ elif url.scheme == 'https':
+ print("Using secure mode to '{}'.".format(url))
elif url.scheme != 'http':
raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
"Only plain HTTP is currenlty supported (no HTTPS).")
@@ -92,10 +98,18 @@ class SandboxRemote(Sandbox):
return None
# Next, try to create a communication channel to the BuildGrid server.
- channel = grpc.insecure_channel(self.server_url)
+ SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
+ SERVICE_ACCOUNT_FILE = '/tmp/key.json'
+
+ credentials = service_account.Credentials.from_service_account_file(
+ SERVICE_ACCOUNT_FILE, scopes=SCOPES)
+ http_request = google_auth_transport_requests.Request()
+ channel = google_auth_transport_grpc.secure_authorized_channel(credentials, http_request, 'remotebuildexecution.googleapis.com:443')
+ #channel = grpc.secure_channel(self.server_url, credentials)
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
- skip_cache_lookup=False)
+ skip_cache_lookup=False,
+ instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
try:
operation_iterator = stub.Execute(request)
except grpc.RpcError:
@@ -133,7 +147,7 @@ class SandboxRemote(Sandbox):
tree_digest = output_directories[0].tree_digest
if tree_digest is None or not tree_digest.hash:
raise SandboxError("Output directory structure had no digest attached.")
-
+ print("Output of job: Tree digest is {}/{}".format(tree_digest.hash, tree_digest.size_bytes))
context = self._get_context()
cascache = context.artifactcache
# Now do a pull to ensure we have the necessary parts.
@@ -212,6 +226,8 @@ class SandboxRemote(Sandbox):
raise SandboxError("Remote server failed at executing the build request.")
action_result = execution_response.result
+ print("Exit code: {}".format(action_result.exit_code))
+ print("Stdout digest: {}/{}".format(action_result.stdout_digest.hash, action_result.stdout_digest.size_bytes))
if action_result.exit_code != 0:
# A normal error during the build: the remote execution system
@@ -219,7 +235,7 @@ class SandboxRemote(Sandbox):
# action_result.stdout and action_result.stderr also contains
# build command outputs which we ignore at the moment.
return action_result.exit_code
-
+
self.process_job_output(action_result.output_directories, action_result.output_files)
return 0