summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Arbash Meinel <john@arbash-meinel.com>2009-11-13 02:00:16 -0600
committerJohn Arbash Meinel <john@arbash-meinel.com>2009-11-13 02:00:16 -0600
commite7503b47b93ca6efe91bdd3d7fff0c7ecbff1786 (patch)
tree349b5a0c628c2bafe162b6ff03dfb77788a1ccbb
parentaa2c5cc815f2399449af2e5937dc727376424e48 (diff)
downloadpython-fastimport-e7503b47b93ca6efe91bdd3d7fff0c7ecbff1786.tar.gz
Dump sticky blobs to disk when memory pressure gets high.
-rw-r--r--cache_manager.py115
1 files changed, 106 insertions, 9 deletions
diff --git a/cache_manager.py b/cache_manager.py
index 3ecfddc..4227554 100644
--- a/cache_manager.py
+++ b/cache_manager.py
@@ -16,11 +16,46 @@
"""A manager of caches."""
+import os
+import shutil
+import tempfile
+import time
from bzrlib import lru_cache, trace
from bzrlib.plugins.fastimport import helpers
+
+class _Cleanup(object):
+ """This class makes sure we clean up when CacheManager goes away.
+
+ We use a helper class to ensure that we are never in a refcycle.
+ """
+
+ def __init__(self, disk_blobs):
+ self.disk_blobs = disk_blobs
+ self.tempdir = None
+ self.small_blobs = None
+
+ def __del__(self):
+ self.finalize()
+
+ def finalize(self):
+ if self.disk_blobs is not None:
+ for info in self.disk_blobs.itervalues():
+ info[-1].close()
+ self.disk_blobs = None
+ if self.small_blobs is not None:
+ self.small_blobs.close()
+ self.small_blobs = None
+ if self.tempdir is not None:
+ shutils.rmtree(self.tempdir)
+
+
class CacheManager(object):
+
+ _small_blob_threshold = 100*1024
+ _sticky_cache_size = 200*1024*1024
+ _sticky_flushed_size = 100*1024*1024
def __init__(self, info=None, verbose=False, inventory_cache_size=10):
"""Create a manager of caches.
@@ -31,9 +66,17 @@ class CacheManager(object):
self.verbose = verbose
# dataref -> data. datref is either :mark or the sha-1.
- # Sticky blobs aren't removed after being referenced.
+ # Sticky blobs are referenced more than once, and are saved until their
+ # refcount goes to 0
self._blobs = {}
self._sticky_blobs = {}
+ self._sticky_memory_bytes = 0
+ # if we overflow our memory cache, then we will dump large blobs to
+ # disk in this directory
+ self._tempdir = None
+ # id => TemporaryFile
+ self._disk_blobs = {}
+ self._cleanup = _Cleanup(self._disk_blobs)
# revision-id -> Inventory cache
# these are large and we probably don't need too many as
@@ -108,28 +151,82 @@ class CacheManager(object):
self.heads.clear()
self.inventories.clear()
+ def _flush_blobs_to_disk(self):
+ blobs = self._sticky_blobs.keys()
+ sticky_blobs = self._sticky_blobs
+ blobs.sort(key=lambda k:len(sticky_blobs[k]))
+ if self._tempdir is None:
+ self._tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-')
+ self._cleanup.tempdir = self._tempdir
+ self._cleanup.small_blobs = tempfile.TemporaryFile(
+ prefix='small-blobs-')
+ count = 0
+ bytes = 0
+ n_small_bytes = 0
+ while self._sticky_memory_bytes > self._sticky_flushed_size:
+ id = blobs.pop()
+ blob = self._sticky_blobs.pop(id)
+ n_bytes = len(blob)
+ self._sticky_memory_bytes -= n_bytes
+ if n_bytes < self._small_blob_threshold:
+ f = self._cleanup.small_blobs
+ f.seek(0, os.SEEK_END)
+ self._disk_blobs[id] = (True, f.tell(), n_bytes, f)
+ n_small_bytes += n_bytes
+ else:
+ f = tempfile.TemporaryFile(prefix='blob-', dir=self._tempdir)
+ self._disk_blobs[id] = (False, 0, n_bytes, f)
+ f.write(blob)
+ bytes += n_bytes
+ del blob
+ count += 1
+ trace.note('flushed %d blobs w/ %.1fMB (%.1fMB small) to disk'
+ % (count, bytes / 1024. / 1024,
+ n_small_bytes / 1024. / 1024))
+
+
def store_blob(self, id, data):
"""Store a blob of data."""
# Note: If we're not reference counting, everything has to be sticky
if not self._blob_ref_counts or id in self._blob_ref_counts:
self._sticky_blobs[id] = data
+ self._sticky_memory_bytes += len(data)
+ if self._sticky_memory_bytes > self._sticky_cache_size:
+ self._flush_blobs_to_disk()
elif data == '':
# Empty data is always sticky
self._sticky_blobs[id] = data
else:
self._blobs[id] = data
+ def _decref(self, id, cache, f):
+ if not self._blob_ref_counts:
+ return
+ count = self._blob_ref_counts.get(id, None)
+ if count is not None:
+ count -= 1
+ if count <= 0:
+ del cache[id]
+ if f is not None:
+ f.close()
+ del self._blob_ref_counts[id]
+ else:
+ self._blob_ref_counts[id] = count
+
def fetch_blob(self, id):
"""Fetch a blob of data."""
- try:
- b = self._sticky_blobs[id]
- if self._blob_ref_counts and b != '':
- self._blob_ref_counts[id] -= 1
- if self._blob_ref_counts[id] == 0:
- del self._sticky_blobs[id]
- return b
- except KeyError:
+ if id in self._blobs:
return self._blobs.pop(id)
+ if id in self._disk_blobs:
+ (is_small, offset, n_bytes, f) = self._disk_blobs[id]
+ f.seek(offset)
+ content = f.read(n_bytes)
+ self._decref(id, self._disk_blobs, f)
+ return content
+ content = self._sticky_blobs[id]
+ self._sticky_memory_bytes -= len(content)
+ self._decref(id, self._sticky_blobs, None)
+ return content
def track_heads(self, cmd):
"""Track the repository heads given a CommitCommand.