From 046739aa1cfe6e78f1485a586c706b106d363a2a Mon Sep 17 00:00:00 2001 From: Jim MacArthur Date: Thu, 11 Oct 2018 20:57:09 +0100 Subject: A series of terrible hacks to make BuildStream run things on the RBE test instance. Missing various checks. --- buildstream/_artifactcache/cascache.py | 94 +++++++++++++--------------------- buildstream/sandbox/_sandboxremote.py | 24 +++++++-- 2 files changed, 56 insertions(+), 62 deletions(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 3e63608be..2d79abdcd 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -43,6 +43,11 @@ from .._exceptions import ArtifactError from . import ArtifactCache +from google.oauth2 import service_account +import google +from google.auth.transport import grpc as google_auth_transport_grpc +from google.auth.transport import requests as google_auth_transport_requests + # The default limit for gRPC messages is 4 MiB. # Limit payload to 1 MiB to leave sufficient headroom for metadata. @@ -232,7 +237,7 @@ class CASCache(ArtifactCache): ref = self.get_artifact_fullname(element, key) project = element._get_project() - + return False for remote in self._remotes[project]: try: remote.init() @@ -299,29 +304,8 @@ class CASCache(ArtifactCache): for ref in refs: tree = self.resolve_ref(ref) - # Check whether ref is already on the server in which case - # there is no need to push the artifact - try: - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - response = remote.ref_storage.GetReference(request) - - if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: - # ref is already on the server with the same tree - continue - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - # Intentionally re-raise RpcError for outer except block. - raise - self._send_directory(remote, tree) - request = buildstream_pb2.UpdateReferenceRequest() - request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes - remote.ref_storage.UpdateReference(request) skipped_remote = False except grpc.RpcError as e: @@ -367,9 +351,9 @@ class CASCache(ArtifactCache): push_remotes = [r for r in self._remotes[project] if r.spec.push] - if not push_remotes: - raise ArtifactError("CASCache: push_directory was called, but no remote artifact " + - "servers are configured as push remotes.") + #if not push_remotes: + # raise ArtifactError("CASCache: push_directory was called, but no remote artifact " + + # "servers are configured as push remotes.") if directory.ref is None: return @@ -400,7 +384,7 @@ class CASCache(ArtifactCache): def _verify_digest_on_remote(self, remote, digest): # Check whether ref is already on the server in which case # there is no need to push the artifact - request = remote_execution_pb2.FindMissingBlobsRequest() + request = remote_execution_pb2.FindMissingBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance") request.blob_digests.extend([digest]) response = remote.cas.FindMissingBlobs(request) @@ -812,13 +796,17 @@ class CASCache(ArtifactCache): remote = _CASRemote(remote_spec) remote.init() - request = buildstream_pb2.StatusRequest() - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: + print("Remote init complete - firing status request...") + #request = buildstream_pb2.StatusRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance") + #response = remote.ref_storage.Status(request) + print("Status obtained") + + if False:# remote_spec.push: + print("Cannot push") q.put('Artifact server does not allow push') else: # No error + print("No errors detected") q.put(None) except grpc.RpcError as e: @@ -1000,9 +988,10 @@ class CASCache(ArtifactCache): return dirdigest def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()): - resource_name = '/'.join(['uploads', str(u_uid), 'blobs', - digest.hash, str(digest.size_bytes)]) - + instance_name="projects/bazelcon18-rbe-shared/instances/default_instance" + resource_name = instance_name+'/'+('/'.join(['uploads', str(u_uid), 'blobs', + digest.hash, str(digest.size_bytes)])) + print("Trying to send resource named: {}".format(resource_name)) def request_stream(resname, instream): offset = 0 finished = False @@ -1033,7 +1022,7 @@ class CASCache(ArtifactCache): missing_blobs = dict() # Limit size of FindMissingBlobs request for required_blobs_group in _grouper(required_blobs, 512): - request = remote_execution_pb2.FindMissingBlobsRequest() + request = remote_execution_pb2.FindMissingBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance") for required_digest in required_blobs_group: d = request.blob_digests.add() @@ -1093,28 +1082,14 @@ class _CASRemote(): elif url.scheme == 'https': port = url.port or 443 - if self.spec.server_cert: - with open(self.spec.server_cert, 'rb') as f: - server_cert_bytes = f.read() - else: - server_cert_bytes = None - - if self.spec.client_key: - with open(self.spec.client_key, 'rb') as f: - client_key_bytes = f.read() - else: - client_key_bytes = None + SCOPES = ['https://www.googleapis.com/auth/cloud-platform'] + SERVICE_ACCOUNT_FILE = '/tmp/key.json' - if self.spec.client_cert: - with open(self.spec.client_cert, 'rb') as f: - client_cert_bytes = f.read() - else: - client_cert_bytes = None + credentials = service_account.Credentials.from_service_account_file( + SERVICE_ACCOUNT_FILE, scopes=SCOPES) + http_request = google_auth_transport_requests.Request() - credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes, - private_key=client_key_bytes, - certificate_chain=client_cert_bytes) - self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) + self.channel = google_auth_transport_grpc.secure_authorized_channel(credentials, http_request, 'remotebuildexecution.googleapis.com:443') else: raise ArtifactError("Unsupported URL: {}".format(self.spec.url)) @@ -1125,7 +1100,7 @@ class _CASRemote(): self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES try: - request = remote_execution_pb2.GetCapabilitiesRequest() + request = remote_execution_pb2.GetCapabilitiesRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance") response = self.capabilities.GetCapabilities(request) server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes: @@ -1136,9 +1111,10 @@ class _CASRemote(): raise # Check whether the server supports BatchReadBlobs() + print("Testing BatchBlobsRequest") self.batch_read_supported = False try: - request = remote_execution_pb2.BatchReadBlobsRequest() + request = remote_execution_pb2.BatchReadBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance") response = self.cas.BatchReadBlobs(request) self.batch_read_supported = True except grpc.RpcError as e: @@ -1146,9 +1122,10 @@ class _CASRemote(): raise # Check whether the server supports BatchUpdateBlobs() + print("Testing BatchUpdateBlobsRequest") self.batch_update_supported = False try: - request = remote_execution_pb2.BatchUpdateBlobsRequest() + request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance") response = self.cas.BatchUpdateBlobs(request) self.batch_update_supported = True except grpc.RpcError as e: @@ -1156,6 +1133,7 @@ class _CASRemote(): e.code() != grpc.StatusCode.PERMISSION_DENIED): raise + print("Remote init complete") self._initialized = True @@ -1209,7 +1187,7 @@ class _CASBatchUpdate(): def __init__(self, remote): self._remote = remote self._max_total_size_bytes = remote.max_batch_total_size_bytes - self._request = remote_execution_pb2.BatchUpdateBlobsRequest() + self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance") self._size = 0 self._sent = False diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py index ab0c31bff..8c677e595 100644 --- a/buildstream/sandbox/_sandboxremote.py +++ b/buildstream/sandbox/_sandboxremote.py @@ -29,6 +29,10 @@ from ..storage._casbaseddirectory import CasBasedDirectory from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.google.rpc import code_pb2 +from google.oauth2 import service_account +import google +from google.auth.transport import grpc as google_auth_transport_grpc +from google.auth.transport import requests as google_auth_transport_requests class SandboxError(Exception): pass @@ -49,6 +53,8 @@ class SandboxRemote(Sandbox): raise SandboxError("Configured remote URL '{}' does not match the expected layout. " .format(kwargs['server_url']) + "It should be of the form ://:.") + elif url.scheme == 'https': + print("Using secure mode to '{}'.".format(url)) elif url.scheme != 'http': raise SandboxError("Configured remote '{}' uses an unsupported protocol. " "Only plain HTTP is currenlty supported (no HTTPS).") @@ -92,10 +98,18 @@ class SandboxRemote(Sandbox): return None # Next, try to create a communication channel to the BuildGrid server. - channel = grpc.insecure_channel(self.server_url) + SCOPES = ['https://www.googleapis.com/auth/cloud-platform'] + SERVICE_ACCOUNT_FILE = '/tmp/key.json' + + credentials = service_account.Credentials.from_service_account_file( + SERVICE_ACCOUNT_FILE, scopes=SCOPES) + http_request = google_auth_transport_requests.Request() + channel = google_auth_transport_grpc.secure_authorized_channel(credentials, http_request, 'remotebuildexecution.googleapis.com:443') + #channel = grpc.secure_channel(self.server_url, credentials) stub = remote_execution_pb2_grpc.ExecutionStub(channel) request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest, - skip_cache_lookup=False) + skip_cache_lookup=False, + instance_name="projects/bazelcon18-rbe-shared/instances/default_instance") try: operation_iterator = stub.Execute(request) except grpc.RpcError: @@ -133,7 +147,7 @@ class SandboxRemote(Sandbox): tree_digest = output_directories[0].tree_digest if tree_digest is None or not tree_digest.hash: raise SandboxError("Output directory structure had no digest attached.") - + print("Output of job: Tree digest is {}/{}".format(tree_digest.hash, tree_digest.size_bytes)) context = self._get_context() cascache = context.artifactcache # Now do a pull to ensure we have the necessary parts. @@ -212,6 +226,8 @@ class SandboxRemote(Sandbox): raise SandboxError("Remote server failed at executing the build request.") action_result = execution_response.result + print("Exit code: {}".format(action_result.exit_code)) + print("Stdout digest: {}/{}".format(action_result.stdout_digest.hash, action_result.stdout_digest.size_bytes)) if action_result.exit_code != 0: # A normal error during the build: the remote execution system @@ -219,7 +235,7 @@ class SandboxRemote(Sandbox): # action_result.stdout and action_result.stderr also contains # build command outputs which we ignore at the moment. return action_result.exit_code - + self.process_job_output(action_result.output_directories, action_result.output_files) return 0 -- cgit v1.2.1