summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValentin David <valentin.david@codethink.co.uk>2018-11-15 12:48:41 +0100
committerValentin David <valentin.david@codethink.co.uk>2018-11-19 14:11:56 +0100
commit9252a18180ce79d70c193768293baa0f0eff9981 (patch)
treecdb53f258dfb001c879fa6ad954b1a3ba33cfbe1
parent300011b262524ca9306b81810eddc05ebe3abcb6 (diff)
downloadbuildstream-9252a18180ce79d70c193768293baa0f0eff9981.tar.gz
Avoid copying temporary file when adding object to CAS in server.
The file is already a temporary file and does not need copy. ENOSPC is thrown during that copy in issue #609. Fixes #678.
-rw-r--r--buildstream/_artifactcache/cascache.py42
-rw-r--r--buildstream/_artifactcache/casserver.py2
2 files changed, 26 insertions, 18 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 826468c6d..1f3f975b7 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -26,6 +26,7 @@ import stat
import tempfile
import uuid
import errno
+import contextlib
from urllib.parse import urlparse
import grpc
@@ -371,13 +372,14 @@ class CASCache(ArtifactCache):
# digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
+ # link_directly (bool): Whether file given by path can be linked
#
# Returns:
# (Digest): The digest of the added object
#
# Either `path` or `buffer` must be passed, but not both.
#
- def add_object(self, *, digest=None, path=None, buffer=None):
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
@@ -387,28 +389,34 @@ class CASCache(ArtifactCache):
try:
h = hashlib.sha256()
# Always write out new file to avoid corruption if input file is modified
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
- # Set mode bits to 0644
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
-
- if path:
- with open(path, 'rb') as f:
- for chunk in iter(lambda: f.read(4096), b""):
- h.update(chunk)
- out.write(chunk)
+ with contextlib.ExitStack() as stack:
+ if path is not None and link_directly:
+ tmp = stack.enter_context(open(path, 'rb'))
+ for chunk in iter(lambda: tmp.read(4096), b""):
+ h.update(chunk)
else:
- h.update(buffer)
- out.write(buffer)
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
+ # Set mode bits to 0644
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
- out.flush()
+ if path:
+ with open(path, 'rb') as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ h.update(chunk)
+ tmp.write(chunk)
+ else:
+ h.update(buffer)
+ tmp.write(buffer)
+
+ tmp.flush()
digest.hash = h.hexdigest()
- digest.size_bytes = os.fstat(out.fileno()).st_size
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
# Place file at final location
objpath = self.objpath(digest)
os.makedirs(os.path.dirname(objpath), exist_ok=True)
- os.link(out.name, objpath)
+ os.link(tmp.name, objpath)
except FileExistsError as e:
# We can ignore the failed link() if the object is already in the repo.
@@ -802,7 +810,7 @@ class CASCache(ArtifactCache):
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
self._fetch_blob(remote, digest, f)
- added_digest = self.add_object(path=f.name)
+ added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
return objpath
@@ -813,7 +821,7 @@ class CASCache(ArtifactCache):
f.write(data)
f.flush()
- added_digest = self.add_object(path=f.name)
+ added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
# Helper function for _fetch_directory().
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index a1432d6fd..a9854aa40 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -211,7 +211,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.flush()
- digest = self.cas.add_object(path=out.name)
+ digest = self.cas.add_object(path=out.name, link_directly=True)
if digest.hash != client_digest.hash:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response