diff options
author | John Arbash Meinel <john@arbash-meinel.com> | 2009-11-13 02:00:16 -0600 |
---|---|---|
committer | John Arbash Meinel <john@arbash-meinel.com> | 2009-11-13 02:00:16 -0600 |
commit | e7503b47b93ca6efe91bdd3d7fff0c7ecbff1786 (patch) | |
tree | 349b5a0c628c2bafe162b6ff03dfb77788a1ccbb | |
parent | aa2c5cc815f2399449af2e5937dc727376424e48 (diff) | |
download | python-fastimport-e7503b47b93ca6efe91bdd3d7fff0c7ecbff1786.tar.gz |
Dump sticky blobs to disk when memory pressure gets high.
-rw-r--r-- | cache_manager.py | 115 |
1 files changed, 106 insertions, 9 deletions
diff --git a/cache_manager.py b/cache_manager.py index 3ecfddc..4227554 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -16,11 +16,46 @@ """A manager of caches.""" +import os +import shutil +import tempfile +import time from bzrlib import lru_cache, trace from bzrlib.plugins.fastimport import helpers + +class _Cleanup(object): + """This class makes sure we clean up when CacheManager goes away. + + We use a helper class to ensure that we are never in a refcycle. + """ + + def __init__(self, disk_blobs): + self.disk_blobs = disk_blobs + self.tempdir = None + self.small_blobs = None + + def __del__(self): + self.finalize() + + def finalize(self): + if self.disk_blobs is not None: + for info in self.disk_blobs.itervalues(): + info[-1].close() + self.disk_blobs = None + if self.small_blobs is not None: + self.small_blobs.close() + self.small_blobs = None + if self.tempdir is not None: + shutils.rmtree(self.tempdir) + + class CacheManager(object): + + _small_blob_threshold = 100*1024 + _sticky_cache_size = 200*1024*1024 + _sticky_flushed_size = 100*1024*1024 def __init__(self, info=None, verbose=False, inventory_cache_size=10): """Create a manager of caches. @@ -31,9 +66,17 @@ class CacheManager(object): self.verbose = verbose # dataref -> data. datref is either :mark or the sha-1. - # Sticky blobs aren't removed after being referenced. + # Sticky blobs are referenced more than once, and are saved until their + # refcount goes to 0 self._blobs = {} self._sticky_blobs = {} + self._sticky_memory_bytes = 0 + # if we overflow our memory cache, then we will dump large blobs to + # disk in this directory + self._tempdir = None + # id => TemporaryFile + self._disk_blobs = {} + self._cleanup = _Cleanup(self._disk_blobs) # revision-id -> Inventory cache # these are large and we probably don't need too many as @@ -108,28 +151,82 @@ class CacheManager(object): self.heads.clear() self.inventories.clear() + def _flush_blobs_to_disk(self): + blobs = self._sticky_blobs.keys() + sticky_blobs = self._sticky_blobs + blobs.sort(key=lambda k:len(sticky_blobs[k])) + if self._tempdir is None: + self._tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-') + self._cleanup.tempdir = self._tempdir + self._cleanup.small_blobs = tempfile.TemporaryFile( + prefix='small-blobs-') + count = 0 + bytes = 0 + n_small_bytes = 0 + while self._sticky_memory_bytes > self._sticky_flushed_size: + id = blobs.pop() + blob = self._sticky_blobs.pop(id) + n_bytes = len(blob) + self._sticky_memory_bytes -= n_bytes + if n_bytes < self._small_blob_threshold: + f = self._cleanup.small_blobs + f.seek(0, os.SEEK_END) + self._disk_blobs[id] = (True, f.tell(), n_bytes, f) + n_small_bytes += n_bytes + else: + f = tempfile.TemporaryFile(prefix='blob-', dir=self._tempdir) + self._disk_blobs[id] = (False, 0, n_bytes, f) + f.write(blob) + bytes += n_bytes + del blob + count += 1 + trace.note('flushed %d blobs w/ %.1fMB (%.1fMB small) to disk' + % (count, bytes / 1024. / 1024, + n_small_bytes / 1024. / 1024)) + + def store_blob(self, id, data): """Store a blob of data.""" # Note: If we're not reference counting, everything has to be sticky if not self._blob_ref_counts or id in self._blob_ref_counts: self._sticky_blobs[id] = data + self._sticky_memory_bytes += len(data) + if self._sticky_memory_bytes > self._sticky_cache_size: + self._flush_blobs_to_disk() elif data == '': # Empty data is always sticky self._sticky_blobs[id] = data else: self._blobs[id] = data + def _decref(self, id, cache, f): + if not self._blob_ref_counts: + return + count = self._blob_ref_counts.get(id, None) + if count is not None: + count -= 1 + if count <= 0: + del cache[id] + if f is not None: + f.close() + del self._blob_ref_counts[id] + else: + self._blob_ref_counts[id] = count + def fetch_blob(self, id): """Fetch a blob of data.""" - try: - b = self._sticky_blobs[id] - if self._blob_ref_counts and b != '': - self._blob_ref_counts[id] -= 1 - if self._blob_ref_counts[id] == 0: - del self._sticky_blobs[id] - return b - except KeyError: + if id in self._blobs: return self._blobs.pop(id) + if id in self._disk_blobs: + (is_small, offset, n_bytes, f) = self._disk_blobs[id] + f.seek(offset) + content = f.read(n_bytes) + self._decref(id, self._disk_blobs, f) + return content + content = self._sticky_blobs[id] + self._sticky_memory_bytes -= len(content) + self._decref(id, self._sticky_blobs, None) + return content def track_heads(self, cmd): """Track the repository heads given a CommitCommand. |