From 39f64abf52669a32d2d58a7a056b89e6aa5feae7 Mon Sep 17 00:00:00 2001 From: bescoto Date: Fri, 21 Feb 2003 05:00:21 +0000 Subject: Iterfiles and iterrorps now can contain exceptions. git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@283 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/rdiff_backup/iterfile.py | 59 ++++++++++++++++----------- rdiff-backup/rdiff_backup/rorpiter.py | 76 +++++++++++------------------------ rdiff-backup/testing/iterfiletest.py | 38 ++++++++++++++++++ rdiff-backup/testing/rorpitertest.py | 23 ----------- 4 files changed, 97 insertions(+), 99 deletions(-) diff --git a/rdiff-backup/rdiff_backup/iterfile.py b/rdiff-backup/rdiff_backup/iterfile.py index 84be7cc..9d52d2f 100644 --- a/rdiff-backup/rdiff_backup/iterfile.py +++ b/rdiff-backup/rdiff_backup/iterfile.py @@ -40,10 +40,15 @@ class UnwrapFile: def _get(self): """Return pair (type, data) next in line on the file - type is a single character which is either "o" for object, "f" - for file, "c" for a continution of a file, or None if no more - data can be read. Data is either the file's data, if type is - "c" or "f", or the actual object if the type is "o". + type is a single character which is either + "o" for object, + "f" for file, + "c" for a continution of a file, + "e" for an exception, or + None if no more data can be read. + + Data is either the file's data, if type is "c" or "f", or the + actual object if the type is "o" or "e". """ header = self.file.read(8) @@ -52,7 +57,7 @@ class UnwrapFile: assert None, "Header %s is only %d bytes" % (header, len(header)) type, length = header[0], C.str2long(header[1:]) buf = self.file.read(length) - if type == "o": return type, cPickle.loads(buf) + if type == "o" or type == "e": return type, cPickle.loads(buf) else: return type, buf @@ -74,7 +79,7 @@ class IterWrappingFile(UnwrapFile): self.currently_in_file.close() # no error checking by this point type, data = self._get() if not type: raise StopIteration - if type == "o": return data + if type == "o" or type == "e": return data elif type == "f": file = IterVirtualFile(self, data) if data: self.currently_in_file = file @@ -125,6 +130,9 @@ class IterVirtualFile(UnwrapFile): """Read a chunk from the file and add it to the buffer""" assert self.iwf.currently_in_file type, data = self._get() + if type == "e": + self.iwf.currently_in_file = None + raise data assert type == "c", "Type is %s instead of c" % type if data: self.buffer += data @@ -179,39 +187,44 @@ class FileWrappingIter: otherwise return true. """ - array_buf = self.array_buf - if self.currently_in_file: - array_buf.fromstring("c") - array_buf.fromstring(self.addfromfile()) + if self.currently_in_file: self.addfromfile("c") else: try: currentobj = self.iter.next() except StopIteration: return None if hasattr(currentobj, "read") and hasattr(currentobj, "close"): self.currently_in_file = currentobj - array_buf.fromstring("f") - array_buf.fromstring(self.addfromfile()) + self.addfromfile("f") else: pickle = cPickle.dumps(currentobj, 1) - array_buf.fromstring("o") - array_buf.fromstring(C.long2str(long(len(pickle)))) - array_buf.fromstring(pickle) + self.array_buf.fromstring("o") + self.array_buf.fromstring(C.long2str(long(len(pickle)))) + self.array_buf.fromstring(pickle) return 1 - def addfromfile(self): - """Read a chunk from the current file and return it""" - # Check file read for errors, buf = "" if find one + def addfromfile(self, prefix_letter): + """Read a chunk from the current file and add to array_buf + + prefix_letter and the length will be prepended to the file + data. If there is an exception while reading the file, the + exception will be added to array_buf instead. + + """ buf = robust.check_common_error(self.read_error_handler, self.currently_in_file.read, [Globals.blocksize]) - if not buf: + if buf == "" or buf is None: assert not self.currently_in_file.close() - self.currently_in_file = None - return C.long2str(long(len(buf))) + buf + self.currently_in_file = None + if buf is None: # error occurred above, encode exception + prefix_letter = "e" + buf = cPickle.dumps(self.last_exception, 1) + total = "".join((prefix_letter, C.long2str(long(len(buf))), buf)) + self.array_buf.fromstring(total) def read_error_handler(self, exc, blocksize): """Log error when reading from file""" - log.Log("Error '%s' reading from fileobj, truncating" % (str(exc),), 2) - return "" + self.last_exception = exc + return None def _l2s_old(self, l): """Convert long int to string of 7 characters""" diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py index aebb6fc..ee4a7ed 100644 --- a/rdiff-backup/rdiff_backup/rorpiter.py +++ b/rdiff-backup/rdiff_backup/rorpiter.py @@ -30,8 +30,7 @@ files), where files is the number of files attached (usually 1 or from __future__ import generators import os, tempfile, UserList, types -import librsync, Globals, Rdiff, Hardlink, robust, log, static, \ - rpath, iterfile +import Globals, rpath, iterfile class RORPIterException(Exception): pass @@ -50,13 +49,22 @@ def FromRaw(raw_iter): rorp = rpath.RORPath(index, data) if num_files: assert num_files == 1, "Only one file accepted right now" - rorp.setfile(getnext(raw_iter)) + rorp.setfile(get_next_file(raw_iter)) yield rorp -def getnext(iter): +class ErrorFile: + """Used by get_next_file below, file-like that just raises error""" + def __init__(self, exc): + """Initialize new ErrorFile. exc is the exception to raise on read""" + self.exc = exc + def read(self, l=-1): raise self.exc + def close(self): return None + +def get_next_file(iter): """Return the next element of an iterator, raising error if none""" try: next = iter.next() except StopIteration: raise RORPIterException("Unexpected end to iter") + if isinstance(next, Exception): return ErrorFile(next) return next def ToFile(rorp_iter): @@ -258,7 +266,7 @@ class IterTreeReducer: base_index = to_be_finished.base_index if base_index != index[:len(base_index)]: # out of the tree, finish with to_be_finished - to_be_finished.call_end_proc() + to_be_finished.end_process() del branches[-1] if not branches: return None branches[-1].branch_process(to_be_finished) @@ -271,18 +279,12 @@ class IterTreeReducer: self.branches.append(branch) return branch - def process_w_branch(self, branch, args): - """Run start_process on latest branch""" - robust.check_common_error(branch.on_error, - branch.start_process, args) - if not branch.caught_exception: branch.start_successful = 1 - def Finish(self): """Call at end of sequence to tie everything up""" if self.index is None or self.root_fast_processed: return while 1: to_be_finished = self.branches.pop() - to_be_finished.call_end_proc() + to_be_finished.end_process() if not self.branches: break self.branches[-1].branch_process(to_be_finished) @@ -303,26 +305,19 @@ class IterTreeReducer: if self.root_branch.can_fast_process(*args): self.root_branch.fast_process(*args) self.root_fast_processed = 1 - else: self.process_w_branch(self.root_branch, args) + else: self.root_branch.start_process(*args) self.index = index return 1 - - if index <= self.index: - log.Log("Warning: oldindex %s >= newindex %s" % - (self.index, index), 2) - return 1 + assert index > self.index, "Index out of order" if self.finish_branches(index) is None: return None # We are no longer in the main tree last_branch = self.branches[-1] - if last_branch.start_successful: - if last_branch.can_fast_process(*args): - robust.check_common_error(last_branch.on_error, - last_branch.fast_process, args) - else: - branch = self.add_branch(index) - self.process_w_branch(branch, args) - else: last_branch.log_prev_error(index) + if last_branch.can_fast_process(*args): + last_branch.fast_process(*args) + else: + branch = self.add_branch(index) + branch.start_process(*args) self.index = index return 1 @@ -338,17 +333,6 @@ class ITRBranch: """ base_index = index = None - finished = None - caught_exception = start_successful = None - - def call_end_proc(self): - """Runs the end_process on self, checking for errors""" - if self.finished or not self.start_successful: - self.caught_exception = 1 - if self.caught_exception: self.log_prev_error(self.base_index) - else: robust.check_common_error(self.on_error, self.end_process) - self.finished = 1 - def start_process(self, *args): """Do some initial processing (stub)""" pass @@ -359,7 +343,6 @@ class ITRBranch: def branch_process(self, branch): """Process a branch right after it is finished (stub)""" - assert branch.finished pass def can_fast_process(self, *args): @@ -370,20 +353,6 @@ class ITRBranch: """Process args without new child branch (stub)""" pass - def on_error(self, exc, *args): - """This is run on any exception in start/end-process""" - self.caught_exception = 1 - if args and args[0] and isinstance(args[0], tuple): - filename = os.path.join(*args[0]) - elif self.index: filename = os.path.join(*self.index) - else: filename = "." - log.Log("Error '%s' processing %s" % (exc, filename), 2) - - def log_prev_error(self, index): - """Call function if no pending exception""" - log.Log("Skipping %s because of previous error" % \ - (index and os.path.join(*index) or '()',), 2) - class CacheIndexable: """Cache last few indexed elements in iterator @@ -423,5 +392,6 @@ class CacheIndexable: """Return element with index index from cache""" try: return self.cache_dict[index] except KeyError: - assert index > self.cache_indicies[0], index + assert index >= self.cache_indicies[0], \ + repr((index, self.cache_indicies[0])) return None diff --git a/rdiff-backup/testing/iterfiletest.py b/rdiff-backup/testing/iterfiletest.py index 63975d0..c8c77c6 100644 --- a/rdiff-backup/testing/iterfiletest.py +++ b/rdiff-backup/testing/iterfiletest.py @@ -3,6 +3,17 @@ from commontest import * from rdiff_backup.iterfile import * from rdiff_backup import lazy +class FileException: + """Like a file, but raise exception after certain # bytes read""" + def __init__(self, max): + self.count = 0 + self.max = max + def read(self, l): + self.count += l + if self.count > self.max: raise IOError(13, "Permission Denied") + return "a"*l + def close(self): return None + class testIterFile(unittest.TestCase): def setUp(self): @@ -15,6 +26,33 @@ class testIterFile(unittest.TestCase): assert lazy.Iter.equal(itm(), IterWrappingFile(FileWrappingIter(itm()))) + def testFile(self): + """Test sending files through iters""" + buf1 = "hello"*10000 + file1 = StringIO.StringIO(buf1) + buf2 = "goodbye"*10000 + file2 = StringIO.StringIO(buf2) + file_iter = FileWrappingIter(iter([file1, file2])) + + new_iter = IterWrappingFile(file_iter) + assert new_iter.next().read() == buf1 + assert new_iter.next().read() == buf2 + self.assertRaises(StopIteration, new_iter.next) + + def testFileException(self): + """Test encoding a file which raises an exception""" + f = FileException(100*1024) + new_iter = IterWrappingFile(FileWrappingIter(iter([f, "foo"]))) + f_out = new_iter.next() + assert f_out.read(10000) == "a"*10000 + try: buf = f_out.read(100*1024) + except IOError: pass + else: assert 0, len(buf) + + assert new_iter.next() == "foo" + self.assertRaises(StopIteration, new_iter.next) + + class testBufferedRead(unittest.TestCase): def testBuffering(self): """Test buffering a StringIO""" diff --git a/rdiff-backup/testing/rorpitertest.py b/rdiff-backup/testing/rorpitertest.py index acde06c..2d33dc5 100644 --- a/rdiff-backup/testing/rorpitertest.py +++ b/rdiff-backup/testing/rorpitertest.py @@ -291,28 +291,5 @@ class CacheIndexableTest(unittest.TestCase): assert l1 == l2, (l1, l2) -class CacheIndexableProcessorTest(unittest.TestCase): - def function(self, elem): - """Called by CIP on each elem""" - self.l.append(elem) - - def testReorder(self): - """Test the reordering abilities of CIP""" - CIP = rorpiter.CachedIndexableProcessor(self.function, 3) - in_list = [rorpiter.IndexedTuple((x,), (x,)) for x in range(6)] - self.l = [] - - CIP(in_list[0]) - CIP(in_list[2]) - CIP(in_list[1]) - CIP(in_list[5]) - CIP(in_list[3]) - CIP(in_list[4]) - self.assertRaises(AssertionError, CIP, in_list[0]) - - CIP.close() - assert self.l == in_list, (self.l, in_list) - - if __name__ == "__main__": unittest.main() -- cgit v1.2.1