summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbescoto <bescoto@2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109>2003-02-12 16:24:08 +0000
committerbescoto <bescoto@2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109>2003-02-12 16:24:08 +0000
commit61ef4309f5246529e09db428c2bfe5c3bb29a3a8 (patch)
tree921ee55e6911307a98bbf85c71f31746473f4099
parentadc9d7a9a9cd90c8d078b1276cfdad98d0303d07 (diff)
downloadrdiff-backup-61ef4309f5246529e09db428c2bfe5c3bb29a3a8.tar.gz
Added CachedIndexableProcessor to rorpiter
git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@279 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
-rw-r--r--rdiff-backup/rdiff_backup/rorpiter.py50
-rw-r--r--rdiff-backup/testing/rorpitertest.py28
2 files changed, 75 insertions, 3 deletions
diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py
index f75d8e8..3849fdc 100644
--- a/rdiff-backup/rdiff_backup/rorpiter.py
+++ b/rdiff-backup/rdiff_backup/rorpiter.py
@@ -440,5 +440,53 @@ class CacheIndexable:
def get(self, index):
"""Return element with index index from cache"""
try: return self.cache_dict[index]
- except KeyError: return None
+ except KeyError:
+ assert index > self.cache_indicies[0], index
+ return None
+
+class CachedIndexableProcessor:
+ """Reorder indicies, then feed into some function in order
+
+ Use this class when you want to run some function on a stream of
+ objects in index order. However, the objects may be slightly out
+ of index order. This class will cache a certain number, and then
+ reorder them.
+
+ An error is signaled if the indicies arrive too out of order.
+
+ """
+ def __init__(self, function, cache_size):
+ """CIP initializer. function is called on every elem."""
+ self.function = function
+ self.cache_size = cache_size
+ self.cache_indicies = []
+ self.cache_dict = {}
+
+ def process(self, elem):
+ """Call CIP (and underlying self.function) on indexed elem"""
+ index = elem.index
+ self.cache_dict[index] = elem
+ if self.cache_indicies and index <= self.cache_indicies[-1]:
+ assert index > self.cache_indicies[0]
+ self.cache_indicies.append(index)
+ self.cache_indicies.sort() # Ack, n log n, should be log n!!!
+ else: self.cache_indicies.append(index)
+
+ if len(self.cache_indicies) > self.cache_size:
+ first_index = self.cache_indicies[0]
+ first_elem = self.cache_dict[first_index]
+ del self.cache_indicies[0]
+ del self.cache_dict[first_index]
+ self.function(first_elem)
+
+ __call__ = process
+
+ def close(self):
+ """Flush cache by running function on remaining elems"""
+ while self.cache_indicies:
+ index = self.cache_indicies[0]
+ elem = self.cache_dict[index]
+ del self.cache_indicies[0]
+ del self.cache_dict[index]
+ self.function(elem)
diff --git a/rdiff-backup/testing/rorpitertest.py b/rdiff-backup/testing/rorpitertest.py
index 35f5916..acde06c 100644
--- a/rdiff-backup/testing/rorpitertest.py
+++ b/rdiff-backup/testing/rorpitertest.py
@@ -280,7 +280,8 @@ class CacheIndexableTest(unittest.TestCase):
assert ci.get((3,)) == self.d[(3,)]
assert ci.get((4,)) == self.d[(4,)]
- assert ci.get((1,)) is None
+ assert ci.get((3,5)) is None
+ self.assertRaises(AssertionError, ci.get, (1,))
def testEqual(self):
"""Make sure CI doesn't alter properties of underlying iter"""
@@ -288,7 +289,30 @@ class CacheIndexableTest(unittest.TestCase):
l1 = list(self.get_iter())
l2 = list(rorpiter.CacheIndexable(iter(l1), 10))
assert l1 == l2, (l1, l2)
-
+
+
+class CacheIndexableProcessorTest(unittest.TestCase):
+ def function(self, elem):
+ """Called by CIP on each elem"""
+ self.l.append(elem)
+
+ def testReorder(self):
+ """Test the reordering abilities of CIP"""
+ CIP = rorpiter.CachedIndexableProcessor(self.function, 3)
+ in_list = [rorpiter.IndexedTuple((x,), (x,)) for x in range(6)]
+ self.l = []
+
+ CIP(in_list[0])
+ CIP(in_list[2])
+ CIP(in_list[1])
+ CIP(in_list[5])
+ CIP(in_list[3])
+ CIP(in_list[4])
+ self.assertRaises(AssertionError, CIP, in_list[0])
+
+ CIP.close()
+ assert self.l == in_list, (self.l, in_list)
+
if __name__ == "__main__": unittest.main()