summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValentin David <valentin.david@codethink.co.uk>2018-11-15 12:48:41 +0100
committerValentin David <valentin.david@codethink.co.uk>2018-11-28 15:29:52 +0100
commit5ef19a0b31df84caed1e41719ef7ea5c6bd8a8bc (patch)
tree599e352a30c1c43925408e058997a807cd72ab07
parent227fa26d8991936d22d3b810c6a8b0bead703eb9 (diff)
downloadbuildstream-5ef19a0b31df84caed1e41719ef7ea5c6bd8a8bc.tar.gz
Avoid copying temporary file when adding object to CAS in server.
The file is already a temporary file and does not need copy. ENOSPC is thrown during that copy in issue #609. Fixes #678.
-rw-r--r--buildstream/_artifactcache/cascache.py42
-rw-r--r--buildstream/_artifactcache/casserver.py2
2 files changed, 26 insertions, 18 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 3218552fb..0f7a55dc1 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -25,6 +25,7 @@ import os
import stat
import tempfile
import uuid
+import contextlib
from urllib.parse import urlparse
import grpc
@@ -480,13 +481,14 @@ class CASCache():
# digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
+ # link_directly (bool): Whether file given by path can be linked
#
# Returns:
# (Digest): The digest of the added object
#
# Either `path` or `buffer` must be passed, but not both.
#
- def add_object(self, *, digest=None, path=None, buffer=None):
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
@@ -496,28 +498,34 @@ class CASCache():
try:
h = hashlib.sha256()
# Always write out new file to avoid corruption if input file is modified
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
- # Set mode bits to 0644
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
-
- if path:
- with open(path, 'rb') as f:
- for chunk in iter(lambda: f.read(4096), b""):
- h.update(chunk)
- out.write(chunk)
+ with contextlib.ExitStack() as stack:
+ if path is not None and link_directly:
+ tmp = stack.enter_context(open(path, 'rb'))
+ for chunk in iter(lambda: tmp.read(4096), b""):
+ h.update(chunk)
else:
- h.update(buffer)
- out.write(buffer)
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
+ # Set mode bits to 0644
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
- out.flush()
+ if path:
+ with open(path, 'rb') as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ h.update(chunk)
+ tmp.write(chunk)
+ else:
+ h.update(buffer)
+ tmp.write(buffer)
+
+ tmp.flush()
digest.hash = h.hexdigest()
- digest.size_bytes = os.fstat(out.fileno()).st_size
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
# Place file at final location
objpath = self.objpath(digest)
os.makedirs(os.path.dirname(objpath), exist_ok=True)
- os.link(out.name, objpath)
+ os.link(tmp.name, objpath)
except FileExistsError as e:
# We can ignore the failed link() if the object is already in the repo.
@@ -889,7 +897,7 @@ class CASCache():
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
self._fetch_blob(remote, digest, f)
- added_digest = self.add_object(path=f.name)
+ added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
return objpath
@@ -900,7 +908,7 @@ class CASCache():
f.write(data)
f.flush()
- added_digest = self.add_object(path=f.name)
+ added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
# Helper function for _fetch_directory().
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index f7dc89581..56e61f915 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -208,7 +208,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.flush()
- digest = self.cas.add_object(path=out.name)
+ digest = self.cas.add_object(path=out.name, link_directly=True)
if digest.hash != client_digest.hash:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response