summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Clatworthy <ian.clatworthy@canonical.com>2009-12-08 16:26:34 +1000
committerIan Clatworthy <ian.clatworthy@canonical.com>2009-12-08 16:26:34 +1000
commitffc55f3dbcb537ab815b986c454b156d4bb6fd29 (patch)
tree8fafc39f516e7c1d70f620cb7378f0e50d2ca4b1
parent922c1a78a099c9a7d19d0de135a7df8951e956e0 (diff)
downloadpython-fastimport-ffc55f3dbcb537ab815b986c454b156d4bb6fd29.tar.gz
Merge John's smarter caching of blobs to improve memory footprint
-rw-r--r--NEWS3
-rw-r--r--cache_manager.py144
2 files changed, 138 insertions, 9 deletions
diff --git a/NEWS b/NEWS
index 33caab0..6ba3dd7 100644
--- a/NEWS
+++ b/NEWS
@@ -56,6 +56,9 @@ Improvements
* Large repositories now compress better thanks to a change in
how file-ids are assigned. (Ian Clatworthy, John Arbash Meinel)
+* Memory usage is improved by flushing blobs to a disk cache
+ when appropriate. (John Arbash Meinel)
+
* If a fast-import source ends in ".gz", it is assumed to be in
gzip format and the stream is implicitly uncompressed. This
means fast-import dump files generated by fast-export-from-xxx
diff --git a/cache_manager.py b/cache_manager.py
index 27c14e4..f998b66 100644
--- a/cache_manager.py
+++ b/cache_manager.py
@@ -16,11 +16,49 @@
"""A manager of caches."""
+import atexit
+import os
+import shutil
+import tempfile
+import time
+import weakref
from bzrlib import lru_cache, trace
from bzrlib.plugins.fastimport import branch_mapper, helpers
+
+class _Cleanup(object):
+ """This class makes sure we clean up when CacheManager goes away.
+
+ We use a helper class to ensure that we are never in a refcycle.
+ """
+
+ def __init__(self, disk_blobs):
+ self.disk_blobs = disk_blobs
+ self.tempdir = None
+ self.small_blobs = None
+
+ def __del__(self):
+ self.finalize()
+
+ def finalize(self):
+ if self.disk_blobs is not None:
+ for info in self.disk_blobs.itervalues():
+ if info[-1] is not None:
+ os.unlink(info[-1])
+ self.disk_blobs = None
+ if self.small_blobs is not None:
+ self.small_blobs.close()
+ self.small_blobs = None
+ if self.tempdir is not None:
+ shutils.rmtree(self.tempdir)
+
+
class CacheManager(object):
+
+ _small_blob_threshold = 25*1024
+ _sticky_cache_size = 300*1024*1024
+ _sticky_flushed_size = 100*1024*1024
def __init__(self, info=None, verbose=False, inventory_cache_size=10):
"""Create a manager of caches.
@@ -31,9 +69,18 @@ class CacheManager(object):
self.verbose = verbose
# dataref -> data. datref is either :mark or the sha-1.
- # Sticky blobs aren't removed after being referenced.
+ # Sticky blobs are referenced more than once, and are saved until their
+ # refcount goes to 0
self._blobs = {}
self._sticky_blobs = {}
+ self._sticky_memory_bytes = 0
+ # if we overflow our memory cache, then we will dump large blobs to
+ # disk in this directory
+ self._tempdir = None
+ # id => (offset, n_bytes, fname)
+ # if fname is None, then the content is stored in the small file
+ self._disk_blobs = {}
+ self._cleanup = _Cleanup(self._disk_blobs)
# revision-id -> Inventory cache
# these are large and we probably don't need too many as
@@ -112,28 +159,107 @@ class CacheManager(object):
self.heads.clear()
self.inventories.clear()
+ def _flush_blobs_to_disk(self):
+ blobs = self._sticky_blobs.keys()
+ sticky_blobs = self._sticky_blobs
+ total_blobs = len(sticky_blobs)
+ blobs.sort(key=lambda k:len(sticky_blobs[k]))
+ if self._tempdir is None:
+ tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-')
+ self._tempdir = tempdir
+ self._cleanup.tempdir = self._tempdir
+ self._cleanup.small_blobs = tempfile.TemporaryFile(
+ prefix='small-blobs-', dir=self._tempdir)
+ small_blob_ref = weakref.ref(self._cleanup.small_blobs)
+ # Even though we add it to _Cleanup it seems that the object can be
+ # destroyed 'too late' for cleanup to actually occur. Probably a
+ # combination of bzr's "die directly, don't clean up" and how
+ # exceptions close the running stack.
+ def exit_cleanup():
+ small_blob = small_blob_ref()
+ if small_blob is not None:
+ small_blob.close()
+ shutil.rmtree(tempdir, ignore_errors=True)
+ atexit.register(exit_cleanup)
+ count = 0
+ bytes = 0
+ n_small_bytes = 0
+ while self._sticky_memory_bytes > self._sticky_flushed_size:
+ id = blobs.pop()
+ blob = self._sticky_blobs.pop(id)
+ n_bytes = len(blob)
+ self._sticky_memory_bytes -= n_bytes
+ if n_bytes < self._small_blob_threshold:
+ f = self._cleanup.small_blobs
+ f.seek(0, os.SEEK_END)
+ self._disk_blobs[id] = (f.tell(), n_bytes, None)
+ f.write(blob)
+ n_small_bytes += n_bytes
+ else:
+ fd, name = tempfile.mkstemp(prefix='blob-', dir=self._tempdir)
+ os.write(fd, blob)
+ os.close(fd)
+ self._disk_blobs[id] = (0, n_bytes, name)
+ bytes += n_bytes
+ del blob
+ count += 1
+ trace.note('flushed %d/%d blobs w/ %.1fMB (%.1fMB small) to disk'
+ % (count, total_blobs, bytes / 1024. / 1024,
+ n_small_bytes / 1024. / 1024))
+
+
def store_blob(self, id, data):
"""Store a blob of data."""
# Note: If we're not reference counting, everything has to be sticky
if not self._blob_ref_counts or id in self._blob_ref_counts:
self._sticky_blobs[id] = data
+ self._sticky_memory_bytes += len(data)
+ if self._sticky_memory_bytes > self._sticky_cache_size:
+ self._flush_blobs_to_disk()
elif data == '':
# Empty data is always sticky
self._sticky_blobs[id] = data
else:
self._blobs[id] = data
+ def _decref(self, id, cache, fn):
+ if not self._blob_ref_counts:
+ return False
+ count = self._blob_ref_counts.get(id, None)
+ if count is not None:
+ count -= 1
+ if count <= 0:
+ del cache[id]
+ if fn is not None:
+ os.unlink(fn)
+ del self._blob_ref_counts[id]
+ return True
+ else:
+ self._blob_ref_counts[id] = count
+ return False
+
def fetch_blob(self, id):
"""Fetch a blob of data."""
- try:
- b = self._sticky_blobs[id]
- if self._blob_ref_counts and b != '':
- self._blob_ref_counts[id] -= 1
- if self._blob_ref_counts[id] == 0:
- del self._sticky_blobs[id]
- return b
- except KeyError:
+ if id in self._blobs:
return self._blobs.pop(id)
+ if id in self._disk_blobs:
+ (offset, n_bytes, fn) = self._disk_blobs[id]
+ if fn is None:
+ f = self._cleanup.small_blobs
+ f.seek(offset)
+ content = f.read(n_bytes)
+ else:
+ fp = open(fn, 'rb')
+ try:
+ content = fp.read()
+ finally:
+ fp.close()
+ self._decref(id, self._disk_blobs, fn)
+ return content
+ content = self._sticky_blobs[id]
+ if self._decref(id, self._sticky_blobs, None):
+ self._sticky_memory_bytes -= len(content)
+ return content
def track_heads(self, cmd):
"""Track the repository heads given a CommitCommand.