diff options
author | Ian Clatworthy <ian.clatworthy@canonical.com> | 2009-12-08 16:26:34 +1000 |
---|---|---|
committer | Ian Clatworthy <ian.clatworthy@canonical.com> | 2009-12-08 16:26:34 +1000 |
commit | ffc55f3dbcb537ab815b986c454b156d4bb6fd29 (patch) | |
tree | 8fafc39f516e7c1d70f620cb7378f0e50d2ca4b1 /cache_manager.py | |
parent | 922c1a78a099c9a7d19d0de135a7df8951e956e0 (diff) | |
download | bzr-fastimport-ffc55f3dbcb537ab815b986c454b156d4bb6fd29.tar.gz |
Merge John's smarter caching of blobs to improve memory footprint
Diffstat (limited to 'cache_manager.py')
-rw-r--r-- | cache_manager.py | 144 |
1 files changed, 135 insertions, 9 deletions
diff --git a/cache_manager.py b/cache_manager.py index 27c14e4..f998b66 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -16,11 +16,49 @@ """A manager of caches.""" +import atexit +import os +import shutil +import tempfile +import time +import weakref from bzrlib import lru_cache, trace from bzrlib.plugins.fastimport import branch_mapper, helpers + +class _Cleanup(object): + """This class makes sure we clean up when CacheManager goes away. + + We use a helper class to ensure that we are never in a refcycle. + """ + + def __init__(self, disk_blobs): + self.disk_blobs = disk_blobs + self.tempdir = None + self.small_blobs = None + + def __del__(self): + self.finalize() + + def finalize(self): + if self.disk_blobs is not None: + for info in self.disk_blobs.itervalues(): + if info[-1] is not None: + os.unlink(info[-1]) + self.disk_blobs = None + if self.small_blobs is not None: + self.small_blobs.close() + self.small_blobs = None + if self.tempdir is not None: + shutils.rmtree(self.tempdir) + + class CacheManager(object): + + _small_blob_threshold = 25*1024 + _sticky_cache_size = 300*1024*1024 + _sticky_flushed_size = 100*1024*1024 def __init__(self, info=None, verbose=False, inventory_cache_size=10): """Create a manager of caches. @@ -31,9 +69,18 @@ class CacheManager(object): self.verbose = verbose # dataref -> data. datref is either :mark or the sha-1. - # Sticky blobs aren't removed after being referenced. + # Sticky blobs are referenced more than once, and are saved until their + # refcount goes to 0 self._blobs = {} self._sticky_blobs = {} + self._sticky_memory_bytes = 0 + # if we overflow our memory cache, then we will dump large blobs to + # disk in this directory + self._tempdir = None + # id => (offset, n_bytes, fname) + # if fname is None, then the content is stored in the small file + self._disk_blobs = {} + self._cleanup = _Cleanup(self._disk_blobs) # revision-id -> Inventory cache # these are large and we probably don't need too many as @@ -112,28 +159,107 @@ class CacheManager(object): self.heads.clear() self.inventories.clear() + def _flush_blobs_to_disk(self): + blobs = self._sticky_blobs.keys() + sticky_blobs = self._sticky_blobs + total_blobs = len(sticky_blobs) + blobs.sort(key=lambda k:len(sticky_blobs[k])) + if self._tempdir is None: + tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-') + self._tempdir = tempdir + self._cleanup.tempdir = self._tempdir + self._cleanup.small_blobs = tempfile.TemporaryFile( + prefix='small-blobs-', dir=self._tempdir) + small_blob_ref = weakref.ref(self._cleanup.small_blobs) + # Even though we add it to _Cleanup it seems that the object can be + # destroyed 'too late' for cleanup to actually occur. Probably a + # combination of bzr's "die directly, don't clean up" and how + # exceptions close the running stack. + def exit_cleanup(): + small_blob = small_blob_ref() + if small_blob is not None: + small_blob.close() + shutil.rmtree(tempdir, ignore_errors=True) + atexit.register(exit_cleanup) + count = 0 + bytes = 0 + n_small_bytes = 0 + while self._sticky_memory_bytes > self._sticky_flushed_size: + id = blobs.pop() + blob = self._sticky_blobs.pop(id) + n_bytes = len(blob) + self._sticky_memory_bytes -= n_bytes + if n_bytes < self._small_blob_threshold: + f = self._cleanup.small_blobs + f.seek(0, os.SEEK_END) + self._disk_blobs[id] = (f.tell(), n_bytes, None) + f.write(blob) + n_small_bytes += n_bytes + else: + fd, name = tempfile.mkstemp(prefix='blob-', dir=self._tempdir) + os.write(fd, blob) + os.close(fd) + self._disk_blobs[id] = (0, n_bytes, name) + bytes += n_bytes + del blob + count += 1 + trace.note('flushed %d/%d blobs w/ %.1fMB (%.1fMB small) to disk' + % (count, total_blobs, bytes / 1024. / 1024, + n_small_bytes / 1024. / 1024)) + + def store_blob(self, id, data): """Store a blob of data.""" # Note: If we're not reference counting, everything has to be sticky if not self._blob_ref_counts or id in self._blob_ref_counts: self._sticky_blobs[id] = data + self._sticky_memory_bytes += len(data) + if self._sticky_memory_bytes > self._sticky_cache_size: + self._flush_blobs_to_disk() elif data == '': # Empty data is always sticky self._sticky_blobs[id] = data else: self._blobs[id] = data + def _decref(self, id, cache, fn): + if not self._blob_ref_counts: + return False + count = self._blob_ref_counts.get(id, None) + if count is not None: + count -= 1 + if count <= 0: + del cache[id] + if fn is not None: + os.unlink(fn) + del self._blob_ref_counts[id] + return True + else: + self._blob_ref_counts[id] = count + return False + def fetch_blob(self, id): """Fetch a blob of data.""" - try: - b = self._sticky_blobs[id] - if self._blob_ref_counts and b != '': - self._blob_ref_counts[id] -= 1 - if self._blob_ref_counts[id] == 0: - del self._sticky_blobs[id] - return b - except KeyError: + if id in self._blobs: return self._blobs.pop(id) + if id in self._disk_blobs: + (offset, n_bytes, fn) = self._disk_blobs[id] + if fn is None: + f = self._cleanup.small_blobs + f.seek(offset) + content = f.read(n_bytes) + else: + fp = open(fn, 'rb') + try: + content = fp.read() + finally: + fp.close() + self._decref(id, self._disk_blobs, fn) + return content + content = self._sticky_blobs[id] + if self._decref(id, self._sticky_blobs, None): + self._sticky_memory_bytes -= len(content) + return content def track_heads(self, cmd): """Track the repository heads given a CommitCommand. |