From 61ef4309f5246529e09db428c2bfe5c3bb29a3a8 Mon Sep 17 00:00:00 2001 From: bescoto Date: Wed, 12 Feb 2003 16:24:08 +0000 Subject: Added CachedIndexableProcessor to rorpiter git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@279 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/rdiff_backup/rorpiter.py | 50 ++++++++++++++++++++++++++++++++++- rdiff-backup/testing/rorpitertest.py | 28 ++++++++++++++++++-- 2 files changed, 75 insertions(+), 3 deletions(-) (limited to 'rdiff-backup') diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py index f75d8e8..3849fdc 100644 --- a/rdiff-backup/rdiff_backup/rorpiter.py +++ b/rdiff-backup/rdiff_backup/rorpiter.py @@ -440,5 +440,53 @@ class CacheIndexable: def get(self, index): """Return element with index index from cache""" try: return self.cache_dict[index] - except KeyError: return None + except KeyError: + assert index > self.cache_indicies[0], index + return None + +class CachedIndexableProcessor: + """Reorder indicies, then feed into some function in order + + Use this class when you want to run some function on a stream of + objects in index order. However, the objects may be slightly out + of index order. This class will cache a certain number, and then + reorder them. + + An error is signaled if the indicies arrive too out of order. + + """ + def __init__(self, function, cache_size): + """CIP initializer. function is called on every elem.""" + self.function = function + self.cache_size = cache_size + self.cache_indicies = [] + self.cache_dict = {} + + def process(self, elem): + """Call CIP (and underlying self.function) on indexed elem""" + index = elem.index + self.cache_dict[index] = elem + if self.cache_indicies and index <= self.cache_indicies[-1]: + assert index > self.cache_indicies[0] + self.cache_indicies.append(index) + self.cache_indicies.sort() # Ack, n log n, should be log n!!! + else: self.cache_indicies.append(index) + + if len(self.cache_indicies) > self.cache_size: + first_index = self.cache_indicies[0] + first_elem = self.cache_dict[first_index] + del self.cache_indicies[0] + del self.cache_dict[first_index] + self.function(first_elem) + + __call__ = process + + def close(self): + """Flush cache by running function on remaining elems""" + while self.cache_indicies: + index = self.cache_indicies[0] + elem = self.cache_dict[index] + del self.cache_indicies[0] + del self.cache_dict[index] + self.function(elem) diff --git a/rdiff-backup/testing/rorpitertest.py b/rdiff-backup/testing/rorpitertest.py index 35f5916..acde06c 100644 --- a/rdiff-backup/testing/rorpitertest.py +++ b/rdiff-backup/testing/rorpitertest.py @@ -280,7 +280,8 @@ class CacheIndexableTest(unittest.TestCase): assert ci.get((3,)) == self.d[(3,)] assert ci.get((4,)) == self.d[(4,)] - assert ci.get((1,)) is None + assert ci.get((3,5)) is None + self.assertRaises(AssertionError, ci.get, (1,)) def testEqual(self): """Make sure CI doesn't alter properties of underlying iter""" @@ -288,7 +289,30 @@ class CacheIndexableTest(unittest.TestCase): l1 = list(self.get_iter()) l2 = list(rorpiter.CacheIndexable(iter(l1), 10)) assert l1 == l2, (l1, l2) - + + +class CacheIndexableProcessorTest(unittest.TestCase): + def function(self, elem): + """Called by CIP on each elem""" + self.l.append(elem) + + def testReorder(self): + """Test the reordering abilities of CIP""" + CIP = rorpiter.CachedIndexableProcessor(self.function, 3) + in_list = [rorpiter.IndexedTuple((x,), (x,)) for x in range(6)] + self.l = [] + + CIP(in_list[0]) + CIP(in_list[2]) + CIP(in_list[1]) + CIP(in_list[5]) + CIP(in_list[3]) + CIP(in_list[4]) + self.assertRaises(AssertionError, CIP, in_list[0]) + + CIP.close() + assert self.l == in_list, (self.l, in_list) + if __name__ == "__main__": unittest.main() -- cgit v1.2.1