diff options
author | Jürg Billeter <j@bitron.ch> | 2018-08-15 14:13:08 +0000 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2018-08-15 14:13:08 +0000 |
commit | 007624429f22972a9d2d2620cbbbad18411ff4c9 (patch) | |
tree | a8958e438a2d5c4ef802fcc0f0fd55ff20df6684 | |
parent | 76f34a633e43790eab6592c5f1385f00c5ba2e83 (diff) | |
parent | 6a9d737e56077ba735b83fc94040e5707ce10d84 (diff) | |
download | buildstream-007624429f22972a9d2d2620cbbbad18411ff4c9.tar.gz |
Merge branch 'juerg/cas' into 'master'
CAS: Fix resource_name format for blobs
Closes #572
See merge request BuildStream/buildstream!660
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 8 | ||||
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 87 |
2 files changed, 78 insertions, 17 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 4fea98626..00d09773c 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -24,6 +24,7 @@ import os import signal import stat import tempfile +import uuid from urllib.parse import urlparse import grpc @@ -309,8 +310,11 @@ class CASCache(ArtifactCache): # Upload any blobs missing on the server skipped_remote = False for digest in missing_blobs.values(): + uuid_ = uuid.uuid4() + resource_name = '/'.join(['uploads', str(uuid_), 'blobs', + digest.hash, str(digest.size_bytes)]) + def request_stream(): - resource_name = os.path.join(digest.hash, str(digest.size_bytes)) with open(self.objpath(digest), 'rb') as f: assert os.fstat(f.fileno()).st_size == digest.size_bytes offset = 0 @@ -747,7 +751,7 @@ class CASCache(ArtifactCache): yield from self._required_blobs(dirnode.digest) def _fetch_blob(self, remote, digest, out): - resource_name = os.path.join(digest.hash, str(digest.size_bytes)) + resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) request = bytestream_pb2.ReadRequest() request.resource_name = resource_name request.read_offset = 0 diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 73e1ac67a..0af65729b 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -23,6 +23,7 @@ import os import signal import sys import tempfile +import uuid import click import grpc @@ -130,12 +131,21 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): def Read(self, request, context): resource_name = request.resource_name - client_digest = _digest_from_resource_name(resource_name) - assert request.read_offset <= client_digest.size_bytes + client_digest = _digest_from_download_resource_name(resource_name) + if client_digest is None: + context.set_code(grpc.StatusCode.NOT_FOUND) + return + + if request.read_offset > client_digest.size_bytes: + context.set_code(grpc.StatusCode.OUT_OF_RANGE) + return try: with open(self.cas.objpath(client_digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == client_digest.size_bytes + if os.fstat(f.fileno()).st_size != client_digest.size_bytes: + context.set_code(grpc.StatusCode.NOT_FOUND) + return + if request.read_offset > 0: f.seek(request.read_offset) @@ -163,12 +173,18 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): resource_name = None with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: for request in request_iterator: - assert not finished - assert request.write_offset == offset + if finished or request.write_offset != offset: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response + if resource_name is None: # First request resource_name = request.resource_name - client_digest = _digest_from_resource_name(resource_name) + client_digest = _digest_from_upload_resource_name(resource_name) + if client_digest is None: + context.set_code(grpc.StatusCode.NOT_FOUND) + return response + try: _clean_up_cache(self.cas, client_digest.size_bytes) except ArtifactTooLargeException as e: @@ -177,14 +193,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): return response elif request.resource_name: # If it is set on subsequent calls, it **must** match the value of the first request. - assert request.resource_name == resource_name + if request.resource_name != resource_name: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response out.write(request.data) offset += len(request.data) if request.finish_write: - assert client_digest.size_bytes == offset + if client_digest.size_bytes != offset: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response out.flush() digest = self.cas.add_object(path=out.name) - assert digest.hash == client_digest.hash + if digest.hash != client_digest.hash: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response finished = True assert finished @@ -247,13 +269,48 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): return response -def _digest_from_resource_name(resource_name): +def _digest_from_download_resource_name(resource_name): + parts = resource_name.split('/') + + # Accept requests from non-conforming BuildStream 1.1.x clients + if len(parts) == 2: + parts.insert(0, 'blobs') + + if len(parts) != 3 or parts[0] != 'blobs': + return None + + try: + digest = remote_execution_pb2.Digest() + digest.hash = parts[1] + digest.size_bytes = int(parts[2]) + return digest + except ValueError: + return None + + +def _digest_from_upload_resource_name(resource_name): parts = resource_name.split('/') - assert len(parts) == 2 - digest = remote_execution_pb2.Digest() - digest.hash = parts[0] - digest.size_bytes = int(parts[1]) - return digest + + # Accept requests from non-conforming BuildStream 1.1.x clients + if len(parts) == 2: + parts.insert(0, 'uploads') + parts.insert(1, str(uuid.uuid4())) + parts.insert(2, 'blobs') + + if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs': + return None + + try: + uuid_ = uuid.UUID(hex=parts[1]) + if uuid_.version != 4: + return None + + digest = remote_execution_pb2.Digest() + digest.hash = parts[3] + digest.size_bytes = int(parts[4]) + return digest + except ValueError: + return None def _has_object(cas, digest): |