summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2018-08-23 11:58:25 +0000
committerJürg Billeter <j@bitron.ch>2018-08-23 11:58:25 +0000
commit9983fc3a9b7ce3497e235b137729543d30c4fca0 (patch)
tree4896c5bec16119f22e8baa0c4496a437b440e970
parentd02e36b885fe7216c8b5f514fd9d8311cc9eb4e6 (diff)
parentd9a97c332ccb55ff99ed887fc6e755371a4c5b13 (diff)
downloadbuildstream-9983fc3a9b7ce3497e235b137729543d30c4fca0.tar.gz
Merge branch 'juerg/cas-1.2' into 'bst-1.2'
CAS: Fix resource_name format for blobs See merge request BuildStream/buildstream!711
-rw-r--r--NEWS9
-rw-r--r--buildstream/_artifactcache/cascache.py14
-rw-r--r--buildstream/_artifactcache/casserver.py87
3 files changed, 90 insertions, 20 deletions
diff --git a/NEWS b/NEWS
index 0fe501f02..20ecb6eb5 100644
--- a/NEWS
+++ b/NEWS
@@ -1,4 +1,13 @@
=================
+buildstream 1.1.7
+=================
+
+ o Fix CAS resource_name format
+
+ Artifact servers need to be updated.
+
+
+=================
buildstream 1.1.6
=================
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 2a40de773..c6402717c 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -24,6 +24,7 @@ import os
import signal
import stat
import tempfile
+import uuid
import errno
from urllib.parse import urlparse
@@ -315,8 +316,11 @@ class CASCache(ArtifactCache):
# Upload any blobs missing on the server
skipped_remote = False
for digest in missing_blobs.values():
- def request_stream():
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
+ uuid_ = uuid.uuid4()
+ resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
+ digest.hash, str(digest.size_bytes)])
+
+ def request_stream(resname):
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
offset = 0
@@ -330,12 +334,12 @@ class CASCache(ArtifactCache):
request.write_offset = offset
# max. 64 kB chunks
request.data = f.read(chunk_size)
- request.resource_name = resource_name # pylint: disable=cell-var-from-loop
+ request.resource_name = resname
request.finish_write = remaining <= 0
yield request
offset += chunk_size
finished = request.finish_write
- response = remote.bytestream.Write(request_stream())
+ response = remote.bytestream.Write(request_stream(resource_name))
request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
@@ -772,7 +776,7 @@ class CASCache(ArtifactCache):
yield from self._required_blobs(dirnode.digest)
def _fetch_blob(self, remote, digest, out):
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
+ resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index 73e1ac67a..0af65729b 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -23,6 +23,7 @@ import os
import signal
import sys
import tempfile
+import uuid
import click
import grpc
@@ -130,12 +131,21 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
def Read(self, request, context):
resource_name = request.resource_name
- client_digest = _digest_from_resource_name(resource_name)
- assert request.read_offset <= client_digest.size_bytes
+ client_digest = _digest_from_download_resource_name(resource_name)
+ if client_digest is None:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return
+
+ if request.read_offset > client_digest.size_bytes:
+ context.set_code(grpc.StatusCode.OUT_OF_RANGE)
+ return
try:
with open(self.cas.objpath(client_digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
+ if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return
+
if request.read_offset > 0:
f.seek(request.read_offset)
@@ -163,12 +173,18 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
resource_name = None
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
for request in request_iterator:
- assert not finished
- assert request.write_offset == offset
+ if finished or request.write_offset != offset:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
+
if resource_name is None:
# First request
resource_name = request.resource_name
- client_digest = _digest_from_resource_name(resource_name)
+ client_digest = _digest_from_upload_resource_name(resource_name)
+ if client_digest is None:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return response
+
try:
_clean_up_cache(self.cas, client_digest.size_bytes)
except ArtifactTooLargeException as e:
@@ -177,14 +193,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
return response
elif request.resource_name:
# If it is set on subsequent calls, it **must** match the value of the first request.
- assert request.resource_name == resource_name
+ if request.resource_name != resource_name:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
out.write(request.data)
offset += len(request.data)
if request.finish_write:
- assert client_digest.size_bytes == offset
+ if client_digest.size_bytes != offset:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
out.flush()
digest = self.cas.add_object(path=out.name)
- assert digest.hash == client_digest.hash
+ if digest.hash != client_digest.hash:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
finished = True
assert finished
@@ -247,13 +269,48 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
return response
-def _digest_from_resource_name(resource_name):
+def _digest_from_download_resource_name(resource_name):
+ parts = resource_name.split('/')
+
+ # Accept requests from non-conforming BuildStream 1.1.x clients
+ if len(parts) == 2:
+ parts.insert(0, 'blobs')
+
+ if len(parts) != 3 or parts[0] != 'blobs':
+ return None
+
+ try:
+ digest = remote_execution_pb2.Digest()
+ digest.hash = parts[1]
+ digest.size_bytes = int(parts[2])
+ return digest
+ except ValueError:
+ return None
+
+
+def _digest_from_upload_resource_name(resource_name):
parts = resource_name.split('/')
- assert len(parts) == 2
- digest = remote_execution_pb2.Digest()
- digest.hash = parts[0]
- digest.size_bytes = int(parts[1])
- return digest
+
+ # Accept requests from non-conforming BuildStream 1.1.x clients
+ if len(parts) == 2:
+ parts.insert(0, 'uploads')
+ parts.insert(1, str(uuid.uuid4()))
+ parts.insert(2, 'blobs')
+
+ if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
+ return None
+
+ try:
+ uuid_ = uuid.UUID(hex=parts[1])
+ if uuid_.version != 4:
+ return None
+
+ digest = remote_execution_pb2.Digest()
+ digest.hash = parts[3]
+ digest.size_bytes = int(parts[4])
+ return digest
+ except ValueError:
+ return None
def _has_object(cas, digest):