From e7503b47b93ca6efe91bdd3d7fff0c7ecbff1786 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Fri, 13 Nov 2009 02:00:16 -0600 Subject: Dump sticky blobs to disk when memory pressure gets high. --- cache_manager.py | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 106 insertions(+), 9 deletions(-) (limited to 'cache_manager.py') diff --git a/cache_manager.py b/cache_manager.py index 3ecfddc..4227554 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -16,11 +16,46 @@ """A manager of caches.""" +import os +import shutil +import tempfile +import time from bzrlib import lru_cache, trace from bzrlib.plugins.fastimport import helpers + +class _Cleanup(object): + """This class makes sure we clean up when CacheManager goes away. + + We use a helper class to ensure that we are never in a refcycle. + """ + + def __init__(self, disk_blobs): + self.disk_blobs = disk_blobs + self.tempdir = None + self.small_blobs = None + + def __del__(self): + self.finalize() + + def finalize(self): + if self.disk_blobs is not None: + for info in self.disk_blobs.itervalues(): + info[-1].close() + self.disk_blobs = None + if self.small_blobs is not None: + self.small_blobs.close() + self.small_blobs = None + if self.tempdir is not None: + shutils.rmtree(self.tempdir) + + class CacheManager(object): + + _small_blob_threshold = 100*1024 + _sticky_cache_size = 200*1024*1024 + _sticky_flushed_size = 100*1024*1024 def __init__(self, info=None, verbose=False, inventory_cache_size=10): """Create a manager of caches. @@ -31,9 +66,17 @@ class CacheManager(object): self.verbose = verbose # dataref -> data. datref is either :mark or the sha-1. - # Sticky blobs aren't removed after being referenced. + # Sticky blobs are referenced more than once, and are saved until their + # refcount goes to 0 self._blobs = {} self._sticky_blobs = {} + self._sticky_memory_bytes = 0 + # if we overflow our memory cache, then we will dump large blobs to + # disk in this directory + self._tempdir = None + # id => TemporaryFile + self._disk_blobs = {} + self._cleanup = _Cleanup(self._disk_blobs) # revision-id -> Inventory cache # these are large and we probably don't need too many as @@ -108,28 +151,82 @@ class CacheManager(object): self.heads.clear() self.inventories.clear() + def _flush_blobs_to_disk(self): + blobs = self._sticky_blobs.keys() + sticky_blobs = self._sticky_blobs + blobs.sort(key=lambda k:len(sticky_blobs[k])) + if self._tempdir is None: + self._tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-') + self._cleanup.tempdir = self._tempdir + self._cleanup.small_blobs = tempfile.TemporaryFile( + prefix='small-blobs-') + count = 0 + bytes = 0 + n_small_bytes = 0 + while self._sticky_memory_bytes > self._sticky_flushed_size: + id = blobs.pop() + blob = self._sticky_blobs.pop(id) + n_bytes = len(blob) + self._sticky_memory_bytes -= n_bytes + if n_bytes < self._small_blob_threshold: + f = self._cleanup.small_blobs + f.seek(0, os.SEEK_END) + self._disk_blobs[id] = (True, f.tell(), n_bytes, f) + n_small_bytes += n_bytes + else: + f = tempfile.TemporaryFile(prefix='blob-', dir=self._tempdir) + self._disk_blobs[id] = (False, 0, n_bytes, f) + f.write(blob) + bytes += n_bytes + del blob + count += 1 + trace.note('flushed %d blobs w/ %.1fMB (%.1fMB small) to disk' + % (count, bytes / 1024. / 1024, + n_small_bytes / 1024. / 1024)) + + def store_blob(self, id, data): """Store a blob of data.""" # Note: If we're not reference counting, everything has to be sticky if not self._blob_ref_counts or id in self._blob_ref_counts: self._sticky_blobs[id] = data + self._sticky_memory_bytes += len(data) + if self._sticky_memory_bytes > self._sticky_cache_size: + self._flush_blobs_to_disk() elif data == '': # Empty data is always sticky self._sticky_blobs[id] = data else: self._blobs[id] = data + def _decref(self, id, cache, f): + if not self._blob_ref_counts: + return + count = self._blob_ref_counts.get(id, None) + if count is not None: + count -= 1 + if count <= 0: + del cache[id] + if f is not None: + f.close() + del self._blob_ref_counts[id] + else: + self._blob_ref_counts[id] = count + def fetch_blob(self, id): """Fetch a blob of data.""" - try: - b = self._sticky_blobs[id] - if self._blob_ref_counts and b != '': - self._blob_ref_counts[id] -= 1 - if self._blob_ref_counts[id] == 0: - del self._sticky_blobs[id] - return b - except KeyError: + if id in self._blobs: return self._blobs.pop(id) + if id in self._disk_blobs: + (is_small, offset, n_bytes, f) = self._disk_blobs[id] + f.seek(offset) + content = f.read(n_bytes) + self._decref(id, self._disk_blobs, f) + return content + content = self._sticky_blobs[id] + self._sticky_memory_bytes -= len(content) + self._decref(id, self._sticky_blobs, None) + return content def track_heads(self, cmd): """Track the repository heads given a CommitCommand. -- cgit v1.2.1 From 9db9f8e95c23e13390e3cdec19b55f9efaadf781 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Fri, 13 Nov 2009 13:16:20 -0600 Subject: Switch to closing the large-content blobs that we store to disk. During the qt import, we end up with >2000 large blobs, and a few hundred MB of small blobs. We don't want to create more small blobs, because their disk space is not reclaimed, but >2000 is too many open file handles (at least on Windows). Using filenames works, but we aren't as guaranteed that things will clean up nicely. --- cache_manager.py | 61 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 21 deletions(-) (limited to 'cache_manager.py') diff --git a/cache_manager.py b/cache_manager.py index 4227554..f35134f 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -42,7 +42,8 @@ class _Cleanup(object): def finalize(self): if self.disk_blobs is not None: for info in self.disk_blobs.itervalues(): - info[-1].close() + if info[-1] is not None: + os.unlink(info[-1]) self.disk_blobs = None if self.small_blobs is not None: self.small_blobs.close() @@ -53,8 +54,8 @@ class _Cleanup(object): class CacheManager(object): - _small_blob_threshold = 100*1024 - _sticky_cache_size = 200*1024*1024 + _small_blob_threshold = 75*1024 + _sticky_cache_size = 300*1024*1024 _sticky_flushed_size = 100*1024*1024 def __init__(self, info=None, verbose=False, inventory_cache_size=10): @@ -74,9 +75,14 @@ class CacheManager(object): # if we overflow our memory cache, then we will dump large blobs to # disk in this directory self._tempdir = None - # id => TemporaryFile + # id => (offset, n_bytes, fname) + # if fname is None, then the content is stored in the small file self._disk_blobs = {} self._cleanup = _Cleanup(self._disk_blobs) + # atexit.register(self._cleanup.finalize) + # The main problem is that it won't let cleanup go away 'normally', so + # we really need a weakref callback... + # Perhaps just registering the shutil.rmtree? # revision-id -> Inventory cache # these are large and we probably don't need too many as @@ -154,12 +160,13 @@ class CacheManager(object): def _flush_blobs_to_disk(self): blobs = self._sticky_blobs.keys() sticky_blobs = self._sticky_blobs + total_blobs = len(sticky_blobs) blobs.sort(key=lambda k:len(sticky_blobs[k])) if self._tempdir is None: self._tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-') self._cleanup.tempdir = self._tempdir self._cleanup.small_blobs = tempfile.TemporaryFile( - prefix='small-blobs-') + prefix='small-blobs-', dir=self._tempdir) count = 0 bytes = 0 n_small_bytes = 0 @@ -171,17 +178,19 @@ class CacheManager(object): if n_bytes < self._small_blob_threshold: f = self._cleanup.small_blobs f.seek(0, os.SEEK_END) - self._disk_blobs[id] = (True, f.tell(), n_bytes, f) + self._disk_blobs[id] = (f.tell(), n_bytes, None) + f.write(blob) n_small_bytes += n_bytes else: - f = tempfile.TemporaryFile(prefix='blob-', dir=self._tempdir) - self._disk_blobs[id] = (False, 0, n_bytes, f) - f.write(blob) + fd, name = tempfile.mkstemp(prefix='blob-', dir=self._tempdir) + os.write(fd, blob) + os.close(fd) + self._disk_blobs[id] = (0, n_bytes, name) bytes += n_bytes del blob count += 1 - trace.note('flushed %d blobs w/ %.1fMB (%.1fMB small) to disk' - % (count, bytes / 1024. / 1024, + trace.note('flushed %d/%d blobs w/ %.1fMB (%.1fMB small) to disk' + % (count, total_blobs, bytes / 1024. / 1024, n_small_bytes / 1024. / 1024)) @@ -199,33 +208,43 @@ class CacheManager(object): else: self._blobs[id] = data - def _decref(self, id, cache, f): + def _decref(self, id, cache, fn): if not self._blob_ref_counts: - return + return False count = self._blob_ref_counts.get(id, None) if count is not None: count -= 1 if count <= 0: del cache[id] - if f is not None: - f.close() + if fn is not None: + os.unlink(fn) del self._blob_ref_counts[id] + return True else: self._blob_ref_counts[id] = count + return False def fetch_blob(self, id): """Fetch a blob of data.""" if id in self._blobs: return self._blobs.pop(id) if id in self._disk_blobs: - (is_small, offset, n_bytes, f) = self._disk_blobs[id] - f.seek(offset) - content = f.read(n_bytes) - self._decref(id, self._disk_blobs, f) + (offset, n_bytes, fn) = self._disk_blobs[id] + if fn is None: + f = self._cleanup.small_blobs + f.seek(offset) + content = f.read(n_bytes) + else: + fp = open(fn, 'rb') + try: + content = fp.read() + finally: + fp.close() + self._decref(id, self._disk_blobs, fn) return content content = self._sticky_blobs[id] - self._sticky_memory_bytes -= len(content) - self._decref(id, self._sticky_blobs, None) + if self._decref(id, self._sticky_blobs, None): + self._sticky_memory_bytes -= len(content) return content def track_heads(self, cmd): -- cgit v1.2.1 From 77704bab48d530910746349e33e2e2d851f3a3a3 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Sun, 15 Nov 2009 14:42:36 -0600 Subject: bzr *does* run atexit functions when exiting, but doesn't run deconstructors. Also, shrink the 'small blob' size a bit to allow data to be reclaimed. Though it did show up as *lots* of small files in the qt import. Something like 1-2k files in the first 2 dumps. --- cache_manager.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'cache_manager.py') diff --git a/cache_manager.py b/cache_manager.py index f35134f..6c9600f 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -16,10 +16,12 @@ """A manager of caches.""" +import atexit import os import shutil import tempfile import time +import weakref from bzrlib import lru_cache, trace from bzrlib.plugins.fastimport import helpers @@ -54,7 +56,7 @@ class _Cleanup(object): class CacheManager(object): - _small_blob_threshold = 75*1024 + _small_blob_threshold = 25*1024 _sticky_cache_size = 300*1024*1024 _sticky_flushed_size = 100*1024*1024 @@ -79,10 +81,6 @@ class CacheManager(object): # if fname is None, then the content is stored in the small file self._disk_blobs = {} self._cleanup = _Cleanup(self._disk_blobs) - # atexit.register(self._cleanup.finalize) - # The main problem is that it won't let cleanup go away 'normally', so - # we really need a weakref callback... - # Perhaps just registering the shutil.rmtree? # revision-id -> Inventory cache # these are large and we probably don't need too many as @@ -163,10 +161,22 @@ class CacheManager(object): total_blobs = len(sticky_blobs) blobs.sort(key=lambda k:len(sticky_blobs[k])) if self._tempdir is None: - self._tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-') + tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-') + self._tempdir = tempdir self._cleanup.tempdir = self._tempdir self._cleanup.small_blobs = tempfile.TemporaryFile( prefix='small-blobs-', dir=self._tempdir) + small_blob_ref = weakref.ref(self._cleanup.small_blobs) + # Even though we add it to _Cleanup it seems that the object can be + # destroyed 'too late' for cleanup to actually occur. Probably a + # combination of bzr's "die directly, don't clean up" and how + # exceptions close the running stack. + def exit_cleanup(): + small_blob = small_blob_ref() + if small_blob is not None: + small_blob.close() + shutil.rmtree(tempdir, ignore_errors=True) + atexit.register(exit_cleanup) count = 0 bytes = 0 n_small_bytes = 0 -- cgit v1.2.1