summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbescoto <bescoto@2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109>2003-02-21 05:00:21 +0000
committerbescoto <bescoto@2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109>2003-02-21 05:00:21 +0000
commit39f64abf52669a32d2d58a7a056b89e6aa5feae7 (patch)
treeb1764494595d05777b7345bb01b1d3e6fbed76f4
parent241a8cb9d9df719d5703005557c5fb23ffeae98f (diff)
downloadrdiff-backup-39f64abf52669a32d2d58a7a056b89e6aa5feae7.tar.gz
Iterfiles and iterrorps now can contain exceptions.
git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@283 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
-rw-r--r--rdiff-backup/rdiff_backup/iterfile.py59
-rw-r--r--rdiff-backup/rdiff_backup/rorpiter.py76
-rw-r--r--rdiff-backup/testing/iterfiletest.py38
-rw-r--r--rdiff-backup/testing/rorpitertest.py23
4 files changed, 97 insertions, 99 deletions
diff --git a/rdiff-backup/rdiff_backup/iterfile.py b/rdiff-backup/rdiff_backup/iterfile.py
index 84be7cc..9d52d2f 100644
--- a/rdiff-backup/rdiff_backup/iterfile.py
+++ b/rdiff-backup/rdiff_backup/iterfile.py
@@ -40,10 +40,15 @@ class UnwrapFile:
def _get(self):
"""Return pair (type, data) next in line on the file
- type is a single character which is either "o" for object, "f"
- for file, "c" for a continution of a file, or None if no more
- data can be read. Data is either the file's data, if type is
- "c" or "f", or the actual object if the type is "o".
+ type is a single character which is either
+ "o" for object,
+ "f" for file,
+ "c" for a continution of a file,
+ "e" for an exception, or
+ None if no more data can be read.
+
+ Data is either the file's data, if type is "c" or "f", or the
+ actual object if the type is "o" or "e".
"""
header = self.file.read(8)
@@ -52,7 +57,7 @@ class UnwrapFile:
assert None, "Header %s is only %d bytes" % (header, len(header))
type, length = header[0], C.str2long(header[1:])
buf = self.file.read(length)
- if type == "o": return type, cPickle.loads(buf)
+ if type == "o" or type == "e": return type, cPickle.loads(buf)
else: return type, buf
@@ -74,7 +79,7 @@ class IterWrappingFile(UnwrapFile):
self.currently_in_file.close() # no error checking by this point
type, data = self._get()
if not type: raise StopIteration
- if type == "o": return data
+ if type == "o" or type == "e": return data
elif type == "f":
file = IterVirtualFile(self, data)
if data: self.currently_in_file = file
@@ -125,6 +130,9 @@ class IterVirtualFile(UnwrapFile):
"""Read a chunk from the file and add it to the buffer"""
assert self.iwf.currently_in_file
type, data = self._get()
+ if type == "e":
+ self.iwf.currently_in_file = None
+ raise data
assert type == "c", "Type is %s instead of c" % type
if data:
self.buffer += data
@@ -179,39 +187,44 @@ class FileWrappingIter:
otherwise return true.
"""
- array_buf = self.array_buf
- if self.currently_in_file:
- array_buf.fromstring("c")
- array_buf.fromstring(self.addfromfile())
+ if self.currently_in_file: self.addfromfile("c")
else:
try: currentobj = self.iter.next()
except StopIteration: return None
if hasattr(currentobj, "read") and hasattr(currentobj, "close"):
self.currently_in_file = currentobj
- array_buf.fromstring("f")
- array_buf.fromstring(self.addfromfile())
+ self.addfromfile("f")
else:
pickle = cPickle.dumps(currentobj, 1)
- array_buf.fromstring("o")
- array_buf.fromstring(C.long2str(long(len(pickle))))
- array_buf.fromstring(pickle)
+ self.array_buf.fromstring("o")
+ self.array_buf.fromstring(C.long2str(long(len(pickle))))
+ self.array_buf.fromstring(pickle)
return 1
- def addfromfile(self):
- """Read a chunk from the current file and return it"""
- # Check file read for errors, buf = "" if find one
+ def addfromfile(self, prefix_letter):
+ """Read a chunk from the current file and add to array_buf
+
+ prefix_letter and the length will be prepended to the file
+ data. If there is an exception while reading the file, the
+ exception will be added to array_buf instead.
+
+ """
buf = robust.check_common_error(self.read_error_handler,
self.currently_in_file.read,
[Globals.blocksize])
- if not buf:
+ if buf == "" or buf is None:
assert not self.currently_in_file.close()
- self.currently_in_file = None
- return C.long2str(long(len(buf))) + buf
+ self.currently_in_file = None
+ if buf is None: # error occurred above, encode exception
+ prefix_letter = "e"
+ buf = cPickle.dumps(self.last_exception, 1)
+ total = "".join((prefix_letter, C.long2str(long(len(buf))), buf))
+ self.array_buf.fromstring(total)
def read_error_handler(self, exc, blocksize):
"""Log error when reading from file"""
- log.Log("Error '%s' reading from fileobj, truncating" % (str(exc),), 2)
- return ""
+ self.last_exception = exc
+ return None
def _l2s_old(self, l):
"""Convert long int to string of 7 characters"""
diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py
index aebb6fc..ee4a7ed 100644
--- a/rdiff-backup/rdiff_backup/rorpiter.py
+++ b/rdiff-backup/rdiff_backup/rorpiter.py
@@ -30,8 +30,7 @@ files), where files is the number of files attached (usually 1 or
from __future__ import generators
import os, tempfile, UserList, types
-import librsync, Globals, Rdiff, Hardlink, robust, log, static, \
- rpath, iterfile
+import Globals, rpath, iterfile
class RORPIterException(Exception): pass
@@ -50,13 +49,22 @@ def FromRaw(raw_iter):
rorp = rpath.RORPath(index, data)
if num_files:
assert num_files == 1, "Only one file accepted right now"
- rorp.setfile(getnext(raw_iter))
+ rorp.setfile(get_next_file(raw_iter))
yield rorp
-def getnext(iter):
+class ErrorFile:
+ """Used by get_next_file below, file-like that just raises error"""
+ def __init__(self, exc):
+ """Initialize new ErrorFile. exc is the exception to raise on read"""
+ self.exc = exc
+ def read(self, l=-1): raise self.exc
+ def close(self): return None
+
+def get_next_file(iter):
"""Return the next element of an iterator, raising error if none"""
try: next = iter.next()
except StopIteration: raise RORPIterException("Unexpected end to iter")
+ if isinstance(next, Exception): return ErrorFile(next)
return next
def ToFile(rorp_iter):
@@ -258,7 +266,7 @@ class IterTreeReducer:
base_index = to_be_finished.base_index
if base_index != index[:len(base_index)]:
# out of the tree, finish with to_be_finished
- to_be_finished.call_end_proc()
+ to_be_finished.end_process()
del branches[-1]
if not branches: return None
branches[-1].branch_process(to_be_finished)
@@ -271,18 +279,12 @@ class IterTreeReducer:
self.branches.append(branch)
return branch
- def process_w_branch(self, branch, args):
- """Run start_process on latest branch"""
- robust.check_common_error(branch.on_error,
- branch.start_process, args)
- if not branch.caught_exception: branch.start_successful = 1
-
def Finish(self):
"""Call at end of sequence to tie everything up"""
if self.index is None or self.root_fast_processed: return
while 1:
to_be_finished = self.branches.pop()
- to_be_finished.call_end_proc()
+ to_be_finished.end_process()
if not self.branches: break
self.branches[-1].branch_process(to_be_finished)
@@ -303,26 +305,19 @@ class IterTreeReducer:
if self.root_branch.can_fast_process(*args):
self.root_branch.fast_process(*args)
self.root_fast_processed = 1
- else: self.process_w_branch(self.root_branch, args)
+ else: self.root_branch.start_process(*args)
self.index = index
return 1
-
- if index <= self.index:
- log.Log("Warning: oldindex %s >= newindex %s" %
- (self.index, index), 2)
- return 1
+ assert index > self.index, "Index out of order"
if self.finish_branches(index) is None:
return None # We are no longer in the main tree
last_branch = self.branches[-1]
- if last_branch.start_successful:
- if last_branch.can_fast_process(*args):
- robust.check_common_error(last_branch.on_error,
- last_branch.fast_process, args)
- else:
- branch = self.add_branch(index)
- self.process_w_branch(branch, args)
- else: last_branch.log_prev_error(index)
+ if last_branch.can_fast_process(*args):
+ last_branch.fast_process(*args)
+ else:
+ branch = self.add_branch(index)
+ branch.start_process(*args)
self.index = index
return 1
@@ -338,17 +333,6 @@ class ITRBranch:
"""
base_index = index = None
- finished = None
- caught_exception = start_successful = None
-
- def call_end_proc(self):
- """Runs the end_process on self, checking for errors"""
- if self.finished or not self.start_successful:
- self.caught_exception = 1
- if self.caught_exception: self.log_prev_error(self.base_index)
- else: robust.check_common_error(self.on_error, self.end_process)
- self.finished = 1
-
def start_process(self, *args):
"""Do some initial processing (stub)"""
pass
@@ -359,7 +343,6 @@ class ITRBranch:
def branch_process(self, branch):
"""Process a branch right after it is finished (stub)"""
- assert branch.finished
pass
def can_fast_process(self, *args):
@@ -370,20 +353,6 @@ class ITRBranch:
"""Process args without new child branch (stub)"""
pass
- def on_error(self, exc, *args):
- """This is run on any exception in start/end-process"""
- self.caught_exception = 1
- if args and args[0] and isinstance(args[0], tuple):
- filename = os.path.join(*args[0])
- elif self.index: filename = os.path.join(*self.index)
- else: filename = "."
- log.Log("Error '%s' processing %s" % (exc, filename), 2)
-
- def log_prev_error(self, index):
- """Call function if no pending exception"""
- log.Log("Skipping %s because of previous error" % \
- (index and os.path.join(*index) or '()',), 2)
-
class CacheIndexable:
"""Cache last few indexed elements in iterator
@@ -423,5 +392,6 @@ class CacheIndexable:
"""Return element with index index from cache"""
try: return self.cache_dict[index]
except KeyError:
- assert index > self.cache_indicies[0], index
+ assert index >= self.cache_indicies[0], \
+ repr((index, self.cache_indicies[0]))
return None
diff --git a/rdiff-backup/testing/iterfiletest.py b/rdiff-backup/testing/iterfiletest.py
index 63975d0..c8c77c6 100644
--- a/rdiff-backup/testing/iterfiletest.py
+++ b/rdiff-backup/testing/iterfiletest.py
@@ -3,6 +3,17 @@ from commontest import *
from rdiff_backup.iterfile import *
from rdiff_backup import lazy
+class FileException:
+ """Like a file, but raise exception after certain # bytes read"""
+ def __init__(self, max):
+ self.count = 0
+ self.max = max
+ def read(self, l):
+ self.count += l
+ if self.count > self.max: raise IOError(13, "Permission Denied")
+ return "a"*l
+ def close(self): return None
+
class testIterFile(unittest.TestCase):
def setUp(self):
@@ -15,6 +26,33 @@ class testIterFile(unittest.TestCase):
assert lazy.Iter.equal(itm(),
IterWrappingFile(FileWrappingIter(itm())))
+ def testFile(self):
+ """Test sending files through iters"""
+ buf1 = "hello"*10000
+ file1 = StringIO.StringIO(buf1)
+ buf2 = "goodbye"*10000
+ file2 = StringIO.StringIO(buf2)
+ file_iter = FileWrappingIter(iter([file1, file2]))
+
+ new_iter = IterWrappingFile(file_iter)
+ assert new_iter.next().read() == buf1
+ assert new_iter.next().read() == buf2
+ self.assertRaises(StopIteration, new_iter.next)
+
+ def testFileException(self):
+ """Test encoding a file which raises an exception"""
+ f = FileException(100*1024)
+ new_iter = IterWrappingFile(FileWrappingIter(iter([f, "foo"])))
+ f_out = new_iter.next()
+ assert f_out.read(10000) == "a"*10000
+ try: buf = f_out.read(100*1024)
+ except IOError: pass
+ else: assert 0, len(buf)
+
+ assert new_iter.next() == "foo"
+ self.assertRaises(StopIteration, new_iter.next)
+
+
class testBufferedRead(unittest.TestCase):
def testBuffering(self):
"""Test buffering a StringIO"""
diff --git a/rdiff-backup/testing/rorpitertest.py b/rdiff-backup/testing/rorpitertest.py
index acde06c..2d33dc5 100644
--- a/rdiff-backup/testing/rorpitertest.py
+++ b/rdiff-backup/testing/rorpitertest.py
@@ -291,28 +291,5 @@ class CacheIndexableTest(unittest.TestCase):
assert l1 == l2, (l1, l2)
-class CacheIndexableProcessorTest(unittest.TestCase):
- def function(self, elem):
- """Called by CIP on each elem"""
- self.l.append(elem)
-
- def testReorder(self):
- """Test the reordering abilities of CIP"""
- CIP = rorpiter.CachedIndexableProcessor(self.function, 3)
- in_list = [rorpiter.IndexedTuple((x,), (x,)) for x in range(6)]
- self.l = []
-
- CIP(in_list[0])
- CIP(in_list[2])
- CIP(in_list[1])
- CIP(in_list[5])
- CIP(in_list[3])
- CIP(in_list[4])
- self.assertRaises(AssertionError, CIP, in_list[0])
-
- CIP.close()
- assert self.l == in_list, (self.l, in_list)
-
-
if __name__ == "__main__": unittest.main()