summaryrefslogtreecommitdiff
path: root/rdiff-backup/rdiff_backup/rorpiter.py
diff options
context:
space:
mode:
Diffstat (limited to 'rdiff-backup/rdiff_backup/rorpiter.py')
-rw-r--r--rdiff-backup/rdiff_backup/rorpiter.py747
1 files changed, 518 insertions, 229 deletions
diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py
index 2e9bd06..875ab1e 100644
--- a/rdiff-backup/rdiff_backup/rorpiter.py
+++ b/rdiff-backup/rdiff_backup/rorpiter.py
@@ -17,248 +17,240 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
-"""Operations on Iterators of Read Only Remote Paths"""
+"""Operations on Iterators of Read Only Remote Paths
+
+The main structure will be an iterator that yields RORPaths.
+Every RORPath has a "raw" form that makes it more amenable to
+being turned into a file. The raw form of the iterator yields
+each RORPath in the form of the tuple (index, data_dictionary,
+files), where files is the number of files attached (usually 1 or
+0). After that, if a file is attached, it yields that file.
+
+"""
from __future__ import generators
-import tempfile, UserList, types, librsync
-from static import *
-from log import *
-from rpath import *
-from robust import *
-from iterfile import *
-import Globals, Rdiff, Hardlink
+import tempfile, UserList, types, librsync, Globals, Rdiff, \
+ Hardlink, robust, log, static, rpath, iterfile, TempFile
+
class RORPIterException(Exception): pass
-class RORPIter:
- """Functions relating to iterators of Read Only RPaths
+def ToRaw(rorp_iter):
+ """Convert a rorp iterator to raw form"""
+ for rorp in rorp_iter:
+ if rorp.file:
+ yield (rorp.index, rorp.data, 1)
+ yield rorp.file
+ else: yield (rorp.index, rorp.data, 0)
+
+def FromRaw(raw_iter):
+ """Convert raw rorp iter back to standard form"""
+ for index, data, num_files in raw_iter:
+ rorp = rpath.RORPath(index, data)
+ if num_files:
+ assert num_files == 1, "Only one file accepted right now"
+ rorp.setfile(getnext(raw_iter))
+ yield rorp
+
+def ToFile(rorp_iter):
+ """Return file version of iterator"""
+ return iterfile.FileWrappingIter(ToRaw(rorp_iter))
+
+def FromFile(fileobj):
+ """Recover rorp iterator from file interface"""
+ return FromRaw(iterfile.IterWrappingFile(fileobj))
+
+def IterateRPaths(base_rp):
+ """Return an iterator yielding RPaths with given base rp"""
+ yield base_rp
+ if base_rp.isdir():
+ dirlisting = base_rp.listdir()
+ dirlisting.sort()
+ for filename in dirlisting:
+ for rp in IterateRPaths(base_rp.append(filename)):
+ yield rp
+
+def Signatures(rp_iter):
+ """Yield signatures of rpaths in given rp_iter"""
+ def error_handler(exc, rp):
+ log.Log("Error generating signature for %s" % rp.path)
+ return None
+
+ for rp in rp_iter:
+ if rp.isplaceholder(): yield rp
+ else:
+ rorp = rp.getRORPath()
+ if rp.isreg():
+ if rp.isflaglinked(): rorp.flaglinked()
+ else:
+ fp = robust.check_common_error(
+ error_handler, Rdiff.get_signature, (rp,))
+ if fp: rorp.setfile(fp)
+ else: continue
+ yield rorp
+
+def GetSignatureIter(base_rp):
+ """Return a signature iterator recurring over the base_rp"""
+ return Signatures(IterateRPaths(base_rp))
+
+def CollateIterators(*rorp_iters):
+ """Collate RORPath iterators by index
- The main structure will be an iterator that yields RORPaths.
- Every RORPath has a "raw" form that makes it more amenable to
- being turned into a file. The raw form of the iterator yields
- each RORPath in the form of the tuple (index, data_dictionary,
- files), where files is the number of files attached (usually 1 or
- 0). After that, if a file is attached, it yields that file.
+ So it takes two or more iterators of rorps and returns an
+ iterator yielding tuples like (rorp1, rorp2) with the same
+ index. If one or the other lacks that index, it will be None
"""
- def ToRaw(rorp_iter):
- """Convert a rorp iterator to raw form"""
- for rorp in rorp_iter:
- if rorp.file:
- yield (rorp.index, rorp.data, 1)
- yield rorp.file
- else: yield (rorp.index, rorp.data, 0)
-
- def FromRaw(raw_iter):
- """Convert raw rorp iter back to standard form"""
- for index, data, num_files in raw_iter:
- rorp = RORPath(index, data)
- if num_files:
- assert num_files == 1, "Only one file accepted right now"
- rorp.setfile(RORPIter.getnext(raw_iter))
- yield rorp
+ # overflow[i] means that iter[i] has been exhausted
+ # rorps[i] is None means that it is time to replenish it.
+ iter_num = len(rorp_iters)
+ if iter_num == 2:
+ return Collate2Iters(rorp_iters[0], rorp_iters[1])
+ overflow = [None] * iter_num
+ rorps = overflow[:]
+
+ def setrorps(overflow, rorps):
+ """Set the overflow and rorps list"""
+ for i in range(iter_num):
+ if not overflow[i] and rorps[i] is None:
+ try: rorps[i] = rorp_iters[i].next()
+ except StopIteration:
+ overflow[i] = 1
+ rorps[i] = None
- def ToFile(rorp_iter):
- """Return file version of iterator"""
- return FileWrappingIter(RORPIter.ToRaw(rorp_iter))
-
- def FromFile(fileobj):
- """Recover rorp iterator from file interface"""
- return RORPIter.FromRaw(IterWrappingFile(fileobj))
-
- def IterateRPaths(base_rp):
- """Return an iterator yielding RPaths with given base rp"""
- yield base_rp
- if base_rp.isdir():
- dirlisting = base_rp.listdir()
- dirlisting.sort()
- for filename in dirlisting:
- for rp in RORPIter.IterateRPaths(base_rp.append(filename)):
- yield rp
-
- def Signatures(rp_iter):
- """Yield signatures of rpaths in given rp_iter"""
- def error_handler(exc, rp):
- Log("Error generating signature for %s" % rp.path)
- return None
-
- for rp in rp_iter:
- if rp.isplaceholder(): yield rp
- else:
- rorp = rp.getRORPath()
- if rp.isreg():
- if rp.isflaglinked(): rorp.flaglinked()
- else:
- fp = Robust.check_common_error(
- error_handler, Rdiff.get_signature, (rp,))
- if fp: rorp.setfile(fp)
- else: continue
- yield rorp
-
- def GetSignatureIter(base_rp):
- """Return a signature iterator recurring over the base_rp"""
- return RORPIter.Signatures(RORPIter.IterateRPaths(base_rp))
-
- def CollateIterators(*rorp_iters):
- """Collate RORPath iterators by index
-
- So it takes two or more iterators of rorps and returns an
- iterator yielding tuples like (rorp1, rorp2) with the same
- index. If one or the other lacks that index, it will be None
+ def getleastindex(rorps):
+ """Return the first index in rorps, assuming rorps isn't empty"""
+ return min(map(lambda rorp: rorp.index,
+ filter(lambda x: x, rorps)))
- """
- # overflow[i] means that iter[i] has been exhausted
- # rorps[i] is None means that it is time to replenish it.
- iter_num = len(rorp_iters)
- if iter_num == 2:
- return RORPIter.Collate2Iters(rorp_iters[0], rorp_iters[1])
- overflow = [None] * iter_num
- rorps = overflow[:]
-
- def setrorps(overflow, rorps):
- """Set the overflow and rorps list"""
- for i in range(iter_num):
- if not overflow[i] and rorps[i] is None:
- try: rorps[i] = rorp_iters[i].next()
- except StopIteration:
- overflow[i] = 1
- rorps[i] = None
-
- def getleastindex(rorps):
- """Return the first index in rorps, assuming rorps isn't empty"""
- return min(map(lambda rorp: rorp.index,
- filter(lambda x: x, rorps)))
-
- def yield_tuples(iter_num, overflow, rorps):
- while 1:
- setrorps(overflow, rorps)
- if not None in overflow: break
-
- index = getleastindex(rorps)
- yieldval = []
- for i in range(iter_num):
- if rorps[i] and rorps[i].index == index:
- yieldval.append(rorps[i])
- rorps[i] = None
- else: yieldval.append(None)
- yield IndexedTuple(index, yieldval)
- return yield_tuples(iter_num, overflow, rorps)
-
- def Collate2Iters(riter1, riter2):
- """Special case of CollateIterators with 2 arguments
-
- This does the same thing but is faster because it doesn't have
- to consider the >2 iterator case. Profiler says speed is
- important here.
-
- """
- relem1, relem2 = None, None
+ def yield_tuples(iter_num, overflow, rorps):
while 1:
- if not relem1:
- try: relem1 = riter1.next()
- except StopIteration:
- if relem2: yield IndexedTuple(index2, (None, relem2))
- for relem2 in riter2:
- yield IndexedTuple(relem2.index, (None, relem2))
- break
- index1 = relem1.index
- if not relem2:
- try: relem2 = riter2.next()
- except StopIteration:
- if relem1: yield IndexedTuple(index1, (relem1, None))
- for relem1 in riter1:
- yield IndexedTuple(relem1.index, (relem1, None))
- break
- index2 = relem2.index
-
- if index1 < index2:
- yield IndexedTuple(index1, (relem1, None))
- relem1 = None
- elif index1 == index2:
- yield IndexedTuple(index1, (relem1, relem2))
- relem1, relem2 = None, None
- else: # index2 is less
- yield IndexedTuple(index2, (None, relem2))
- relem2 = None
-
- def getnext(iter):
- """Return the next element of an iterator, raising error if none"""
- try: next = iter.next()
- except StopIteration: raise RORPIterException("Unexpected end to iter")
- return next
-
- def GetDiffIter(sig_iter, new_iter):
- """Return delta iterator from sig_iter to new_iter
-
- The accompanying file for each will be a delta as produced by
- rdiff, unless the destination file does not exist, in which
- case it will be the file in its entirety.
-
- sig_iter may be composed of rorps, but new_iter should have
- full RPaths.
+ setrorps(overflow, rorps)
+ if not None in overflow: break
- """
- collated_iter = RORPIter.CollateIterators(sig_iter, new_iter)
- for rorp, rp in collated_iter: yield RORPIter.diffonce(rorp, rp)
-
- def diffonce(sig_rorp, new_rp):
- """Return one diff rorp, based from signature rorp and orig rp"""
- if sig_rorp and Globals.preserve_hardlinks and sig_rorp.isflaglinked():
- if new_rp: diff_rorp = new_rp.getRORPath()
- else: diff_rorp = RORPath(sig_rorp.index)
- diff_rorp.flaglinked()
- return diff_rorp
- elif sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg():
- diff_rorp = new_rp.getRORPath()
- #fp = sig_rorp.open("rb")
- #print "---------------------", fp
- #tmp_sig_rp = RPath(Globals.local_connection, "/tmp/sig")
- #tmp_sig_rp.delete()
- #tmp_sig_rp.write_from_fileobj(fp)
- #diff_rorp.setfile(Rdiff.get_delta_sigfileobj(tmp_sig_rp.open("rb"),
- # new_rp))
- diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"),
- new_rp))
- diff_rorp.set_attached_filetype('diff')
+ index = getleastindex(rorps)
+ yieldval = []
+ for i in range(iter_num):
+ if rorps[i] and rorps[i].index == index:
+ yieldval.append(rorps[i])
+ rorps[i] = None
+ else: yieldval.append(None)
+ yield IndexedTuple(index, yieldval)
+ return yield_tuples(iter_num, overflow, rorps)
+
+def Collate2Iters(riter1, riter2):
+ """Special case of CollateIterators with 2 arguments
+
+ This does the same thing but is faster because it doesn't have
+ to consider the >2 iterator case. Profiler says speed is
+ important here.
+
+ """
+ relem1, relem2 = None, None
+ while 1:
+ if not relem1:
+ try: relem1 = riter1.next()
+ except StopIteration:
+ if relem2: yield IndexedTuple(index2, (None, relem2))
+ for relem2 in riter2:
+ yield IndexedTuple(relem2.index, (None, relem2))
+ break
+ index1 = relem1.index
+ if not relem2:
+ try: relem2 = riter2.next()
+ except StopIteration:
+ if relem1: yield IndexedTuple(index1, (relem1, None))
+ for relem1 in riter1:
+ yield IndexedTuple(relem1.index, (relem1, None))
+ break
+ index2 = relem2.index
+
+ if index1 < index2:
+ yield IndexedTuple(index1, (relem1, None))
+ relem1 = None
+ elif index1 == index2:
+ yield IndexedTuple(index1, (relem1, relem2))
+ relem1, relem2 = None, None
+ else: # index2 is less
+ yield IndexedTuple(index2, (None, relem2))
+ relem2 = None
+
+def getnext(iter):
+ """Return the next element of an iterator, raising error if none"""
+ try: next = iter.next()
+ except StopIteration: raise RORPIterException("Unexpected end to iter")
+ return next
+
+def GetDiffIter(sig_iter, new_iter):
+ """Return delta iterator from sig_iter to new_iter
+
+ The accompanying file for each will be a delta as produced by
+ rdiff, unless the destination file does not exist, in which
+ case it will be the file in its entirety.
+
+ sig_iter may be composed of rorps, but new_iter should have
+ full RPaths.
+
+ """
+ collated_iter = CollateIterators(sig_iter, new_iter)
+ for rorp, rp in collated_iter: yield diffonce(rorp, rp)
+
+def diffonce(sig_rorp, new_rp):
+ """Return one diff rorp, based from signature rorp and orig rp"""
+ if sig_rorp and Globals.preserve_hardlinks and sig_rorp.isflaglinked():
+ if new_rp: diff_rorp = new_rp.getRORPath()
+ else: diff_rorp = rpath.RORPath(sig_rorp.index)
+ diff_rorp.flaglinked()
+ return diff_rorp
+ elif sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg():
+ diff_rorp = new_rp.getRORPath()
+ #fp = sig_rorp.open("rb")
+ #print "---------------------", fp
+ #tmp_sig_rp = RPath(Globals.local_connection, "/tmp/sig")
+ #tmp_sig_rp.delete()
+ #tmp_sig_rp.write_from_fileobj(fp)
+ #diff_rorp.setfile(Rdiff.get_delta_sigfileobj(tmp_sig_rp.open("rb"),
+ # new_rp))
+ diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"),
+ new_rp))
+ diff_rorp.set_attached_filetype('diff')
+ return diff_rorp
+ else:
+ # Just send over originial if diff isn't appropriate
+ if sig_rorp: sig_rorp.close_if_necessary()
+ if not new_rp: return rpath.RORPath(sig_rorp.index)
+ elif new_rp.isreg():
+ diff_rorp = new_rp.getRORPath(1)
+ diff_rorp.set_attached_filetype('snapshot')
return diff_rorp
- else:
- # Just send over originial if diff isn't appropriate
- if sig_rorp: sig_rorp.close_if_necessary()
- if not new_rp: return RORPath(sig_rorp.index)
- elif new_rp.isreg():
- diff_rorp = new_rp.getRORPath(1)
- diff_rorp.set_attached_filetype('snapshot')
- return diff_rorp
- else: return new_rp.getRORPath()
-
- def PatchIter(base_rp, diff_iter):
- """Patch the appropriate rps in basis_iter using diff_iter"""
- basis_iter = RORPIter.IterateRPaths(base_rp)
- collated_iter = RORPIter.CollateIterators(basis_iter, diff_iter)
- for basisrp, diff_rorp in collated_iter:
- RORPIter.patchonce_action(base_rp, basisrp, diff_rorp).execute()
-
- def patchonce_action(base_rp, basisrp, diff_rorp):
- """Return action patching basisrp using diff_rorp"""
- assert diff_rorp, "Missing diff index %s" % basisrp.index
- if not diff_rorp.lstat():
- return RobustAction(None, lambda init_val: basisrp.delete(), None)
-
- if Globals.preserve_hardlinks and diff_rorp.isflaglinked():
- if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
- tf = TempFileManager.new(basisrp)
- def init(): Hardlink.link_rp(diff_rorp, tf, basisrp)
- return Robust.make_tf_robustaction(init, tf, basisrp)
- elif basisrp and basisrp.isreg() and diff_rorp.isreg():
- if diff_rorp.get_attached_filetype() != 'diff':
- raise RPathException("File %s appears to have changed during"
- " processing, skipping" % (basisrp.path,))
- return Rdiff.patch_with_attribs_action(basisrp, diff_rorp)
- else: # Diff contains whole file, just copy it over
- if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
- return Robust.copy_with_attribs_action(diff_rorp, basisrp)
-
-MakeStatic(RORPIter)
+ else: return new_rp.getRORPath()
+
+def PatchIter(base_rp, diff_iter):
+ """Patch the appropriate rps in basis_iter using diff_iter"""
+ basis_iter = IterateRPaths(base_rp)
+ collated_iter = CollateIterators(basis_iter, diff_iter)
+ for basisrp, diff_rorp in collated_iter:
+ patchonce_action(base_rp, basisrp, diff_rorp).execute()
+
+def patchonce_action(base_rp, basisrp, diff_rorp):
+ """Return action patching basisrp using diff_rorp"""
+ assert diff_rorp, "Missing diff index %s" % basisrp.index
+ if not diff_rorp.lstat():
+ return robust.Action(None, lambda init_val: basisrp.delete(), None)
+
+ if Globals.preserve_hardlinks and diff_rorp.isflaglinked():
+ if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
+ tf = TempFile.new(basisrp)
+ def init(): Hardlink.link_rp(diff_rorp, tf, basisrp)
+ return robust.make_tf_robustaction(init, tf, basisrp)
+ elif basisrp and basisrp.isreg() and diff_rorp.isreg():
+ if diff_rorp.get_attached_filetype() != 'diff':
+ raise rpath.RPathException("File %s appears to have changed during"
+ " processing, skipping" % (basisrp.path,))
+ return Rdiff.patch_with_attribs_action(basisrp, diff_rorp)
+ else: # Diff contains whole file, just copy it over
+ if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
+ return robust.copy_with_attribs_action(diff_rorp, basisrp)
class IndexedTuple(UserList.UserList):
@@ -299,3 +291,300 @@ class IndexedTuple(UserList.UserList):
def __str__(self):
return "(%s).%s" % (", ".join(map(str, self.data)), self.index)
+
+
+class DirHandler:
+ """Handle directories when entering and exiting in mirror
+
+ The problem is that we may need to write to a directory that may
+ have only read and exec permissions. Also, when leaving a
+ directory tree, we may have modified the directory and thus
+ changed the mod and access times. These need to be updated when
+ leaving.
+
+ """
+ def __init__(self, rootrp):
+ """DirHandler initializer - call with root rpath of mirror dir"""
+ self.rootrp = rootrp
+ assert rootrp.index == ()
+ self.cur_dir_index = None # Current directory we have descended into
+ self.last_index = None # last index processed
+
+ # This dictionary maps indicies to (rpath, (atime, mtime),
+ # perms) triples. Either or both of the time pair and perms
+ # can be None, which means not to update the times or the
+ # perms when leaving. We don't have to update the perms if we
+ # didn't have to change them in the first place. If a
+ # directory is explicitly given, then we don't have to update
+ # anything because it will be done by the normal process.
+ self.index_dict = {}
+
+ def process_old_directories(self, new_dir_index):
+ """Update times/permissions for directories we are leaving
+
+ Returns greatest index of the current index that has been seen
+ before (i.e. no need to process up to then as new dir).
+
+ """
+ if self.cur_dir_index is None: return -1 # no previous directory
+
+ i = len(self.cur_dir_index)
+ while 1:
+ if new_dir_index[:i] == self.cur_dir_index[:i]:
+ return i
+ self.process_old_dir(self.cur_dir_index[:i])
+ i-=1
+
+ def process_old_dir(self, dir_index):
+ """Process outstanding changes for given dir index"""
+ rpath, times, perms = self.index_dict[dir_index]
+ if times: apply(rpath.settime, times)
+ if perms: rpath.chmod(perms)
+
+ def init_new_dirs(self, rpath, new_dir_index, common_dir_index):
+ """Initialize any new directories
+
+ Record the time, and change permissions if no write access.
+ Use rpath if it is given to access permissions and times.
+
+ """
+ for i in range(common_dir_index, len(new_dir_index)):
+ process_index = new_dir_index[:i]
+ if rpath.index == process_index:
+ self.index_dict[process_index] = (None, None, None)
+ else:
+ new_rpath = self.rootrp.new_index(process_index)
+ if new_rpath.hasfullperms(): perms = None
+ else: perms = new_rpath.getperms()
+ times = (new_rpath.getatime(), new_rpath.getmtime())
+ self.index_dict[process_index] = new_rpath, times, perms
+
+ def __call__(self, rpath):
+ """Given rpath, process containing directories"""
+ if rpath.isdir(): new_dir_index = rpath.index
+ elif not rpath.index: return # no directory contains root
+ else: new_dir_index = rpath.index[:-1]
+
+ common_dir_index = self.process_old_directories(new_dir_index)
+ self.init_new_dirs(rpath, new_dir_index, common_dir_index)
+ self.cur_dir_index = new_dir_index
+
+ def Finish(self):
+ """Process any remaining directories"""
+ indicies = self.index_dict.keys()
+ indicies.sort()
+ assert len(indicies) >= 1, indicies
+ indicies.reverse()
+ map(self.process_old_dir, indicies)
+
+
+def FillInIter(rpiter, rootrp):
+ """Given ordered rpiter and rootrp, fill in missing indicies with rpaths
+
+ For instance, suppose rpiter contains rpaths with indicies (),
+ (1,2), (2,5). Then return iter with rpaths (), (1,), (1,2), (2,),
+ (2,5). This is used when we need to process directories before or
+ after processing a file in that directory.
+
+ """
+ # Handle first element as special case
+ first_rp = rpiter.next() # StopIteration gets passed upwards
+ cur_index = first_rp.index
+ for i in range(len(cur_index)):
+ yield rootrp.new_index(cur_index[:i])
+ yield first_rp
+ del first_rp
+ old_index = cur_index
+
+ # Now do the others (1,2,3) (1,4,5)
+ for rp in rpiter:
+ cur_index = rp.index
+ if not cur_index[:-1] == old_index[:-1]: # Handle special case quickly
+ for i in range(1, len(cur_index)): # i==0 case already handled
+ if cur_index[:i] != old_index[:i]:
+ yield rootrp.new_index(cur_index[:i])
+ yield rp
+ old_index = cur_index
+
+
+class IterTreeReducer:
+ """Tree style reducer object for iterator
+
+ The indicies of a RORPIter form a tree type structure. This class
+ can be used on each element of an iter in sequence and the result
+ will be as if the corresponding tree was reduced. This tries to
+ bridge the gap between the tree nature of directories, and the
+ iterator nature of the connection between hosts and the temporal
+ order in which the files are processed.
+
+ """
+ def __init__(self, branch_class, branch_args):
+ """ITR initializer"""
+ self.branch_class = branch_class
+ self.branch_args = branch_args
+ self.index = None
+ self.root_branch = branch_class(*branch_args)
+ self.branches = [self.root_branch]
+
+ def finish_branches(self, index):
+ """Run Finish() on all branches index has passed
+
+ When we pass out of a branch, delete it and process it with
+ the parent. The innermost branches will be the last in the
+ list. Return None if we are out of the entire tree, and 1
+ otherwise.
+
+ """
+ branches = self.branches
+ while 1:
+ to_be_finished = branches[-1]
+ base_index = to_be_finished.base_index
+ if base_index != index[:len(base_index)]:
+ # out of the tree, finish with to_be_finished
+ to_be_finished.call_end_proc()
+ del branches[-1]
+ if not branches: return None
+ branches[-1].branch_process(to_be_finished)
+ else: return 1
+
+ def add_branch(self, index):
+ """Return branch of type self.branch_class, add to branch list"""
+ branch = self.branch_class(*self.branch_args)
+ branch.base_index = index
+ self.branches.append(branch)
+ return branch
+
+ def process_w_branch(self, branch, args):
+ """Run start_process on latest branch"""
+ robust.check_common_error(branch.on_error,
+ branch.start_process, args)
+ if not branch.caught_exception: branch.start_successful = 1
+
+ def Finish(self):
+ """Call at end of sequence to tie everything up"""
+ while 1:
+ to_be_finished = self.branches.pop()
+ to_be_finished.call_end_proc()
+ if not self.branches: break
+ self.branches[-1].branch_process(to_be_finished)
+
+ def __call__(self, *args):
+ """Process args, where args[0] is current position in iterator
+
+ Returns true if args successfully processed, false if index is
+ not in the current tree and thus the final result is
+ available.
+
+ Also note below we set self.index after doing the necessary
+ start processing, in case there is a crash in the middle.
+
+ """
+ index = args[0]
+ if self.index is None:
+ self.root_branch.base_index = index
+ self.process_w_branch(self.root_branch, args)
+ self.index = index
+ return 1
+
+ if index <= self.index:
+ log.Log("Warning: oldindex %s >= newindex %s" %
+ (self.index, index), 2)
+ return 1
+
+ if self.finish_branches(index) is None:
+ return None # We are no longer in the main tree
+ last_branch = self.branches[-1]
+ if last_branch.start_successful:
+ if last_branch.can_fast_process(*args):
+ last_branch.fast_process(*args)
+ else:
+ branch = self.add_branch(index)
+ self.process_w_branch(branch, args)
+ else: last_branch.log_prev_error(index)
+
+ self.index = index
+ return 1
+
+
+class ITRBranch:
+ """Helper class for IterTreeReducer below
+
+ There are five stub functions below: start_process, end_process,
+ branch_process, can_fast_process, and fast_process. A class that
+ subclasses this one will probably fill in these functions to do
+ more.
+
+ It is important that this class be pickable, so keep that in mind
+ when subclassing (this is used to resume failed sessions).
+
+ """
+ base_index = index = None
+ finished = None
+ caught_exception = start_successful = None
+
+ def call_end_proc(self):
+ """Runs the end_process on self, checking for errors"""
+ if self.finished or not self.start_successful:
+ self.caught_exception = 1
+ if self.caught_exception: self.log_prev_error(self.base_index)
+ else: robust.check_common_error(self.on_error, self.end_process)
+ self.finished = 1
+
+ def start_process(self, *args):
+ """Do some initial processing (stub)"""
+ pass
+
+ def end_process(self):
+ """Do any final processing before leaving branch (stub)"""
+ pass
+
+ def branch_process(self, branch):
+ """Process a branch right after it is finished (stub)"""
+ assert branch.finished
+ pass
+
+ def can_fast_process(self, *args):
+ """True if object can be processed without new branch (stub)"""
+ return None
+
+ def fast_process(self, *args):
+ """Process args without new child branch (stub)"""
+ pass
+
+ def on_error(self, exc, *args):
+ """This is run on any exception in start/end-process"""
+ self.caught_exception = 1
+ if args and args[0] and isinstance(args[0], tuple):
+ filename = os.path.join(*args[0])
+ elif self.index: filename = os.path.join(*self.index)
+ else: filename = "."
+ log.Log("Error '%s' processing %s" % (exc, filename), 2)
+
+ def log_prev_error(self, index):
+ """Call function if no pending exception"""
+ log.Log("Skipping %s because of previous error" %
+ (os.path.join(*index),), 2)
+
+
+class DestructiveSteppingFinalizer(ITRBranch):
+ """Finalizer that can work on an iterator of dsrpaths
+
+ The reason we have to use an IterTreeReducer is that some files
+ should be updated immediately, but for directories we sometimes
+ need to update all the files in the directory before finally
+ coming back to it.
+
+ """
+ dsrpath = None
+ def start_process(self, index, dsrpath):
+ self.dsrpath = dsrpath
+
+ def end_process(self):
+ if self.dsrpath: self.dsrpath.write_changes()
+
+ def can_fast_process(self, index, dsrpath):
+ return not self.dsrpath.isdir()
+
+ def fast_process(self, index, dsrpath):
+ if self.dsrpath: self.dsrpath.write_changes()
+