diff options
author | bescoto <bescoto@2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109> | 2002-12-23 06:53:18 +0000 |
---|---|---|
committer | bescoto <bescoto@2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109> | 2002-12-23 06:53:18 +0000 |
commit | 9a0da726e2172321cdc1dcd21441f4ffc41e7931 (patch) | |
tree | 7f25f848386ca501b7f08c08c21af16f0d71330c /rdiff-backup/rdiff_backup/rorpiter.py | |
parent | e95a61773adb2f98499cf13ff543f4249ee38226 (diff) | |
download | rdiff-backup-9a0da726e2172321cdc1dcd21441f4ffc41e7931.tar.gz |
Major refactoring - avoid use of 'from XX import *' in favor of more
normal 'import XXX' syntax. The previous way was an artifact from
earlier versions where the whole program fit in one file.
git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@252 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
Diffstat (limited to 'rdiff-backup/rdiff_backup/rorpiter.py')
-rw-r--r-- | rdiff-backup/rdiff_backup/rorpiter.py | 747 |
1 files changed, 518 insertions, 229 deletions
diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py index 2e9bd06..875ab1e 100644 --- a/rdiff-backup/rdiff_backup/rorpiter.py +++ b/rdiff-backup/rdiff_backup/rorpiter.py @@ -17,248 +17,240 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA -"""Operations on Iterators of Read Only Remote Paths""" +"""Operations on Iterators of Read Only Remote Paths + +The main structure will be an iterator that yields RORPaths. +Every RORPath has a "raw" form that makes it more amenable to +being turned into a file. The raw form of the iterator yields +each RORPath in the form of the tuple (index, data_dictionary, +files), where files is the number of files attached (usually 1 or +0). After that, if a file is attached, it yields that file. + +""" from __future__ import generators -import tempfile, UserList, types, librsync -from static import * -from log import * -from rpath import * -from robust import * -from iterfile import * -import Globals, Rdiff, Hardlink +import tempfile, UserList, types, librsync, Globals, Rdiff, \ + Hardlink, robust, log, static, rpath, iterfile, TempFile + class RORPIterException(Exception): pass -class RORPIter: - """Functions relating to iterators of Read Only RPaths +def ToRaw(rorp_iter): + """Convert a rorp iterator to raw form""" + for rorp in rorp_iter: + if rorp.file: + yield (rorp.index, rorp.data, 1) + yield rorp.file + else: yield (rorp.index, rorp.data, 0) + +def FromRaw(raw_iter): + """Convert raw rorp iter back to standard form""" + for index, data, num_files in raw_iter: + rorp = rpath.RORPath(index, data) + if num_files: + assert num_files == 1, "Only one file accepted right now" + rorp.setfile(getnext(raw_iter)) + yield rorp + +def ToFile(rorp_iter): + """Return file version of iterator""" + return iterfile.FileWrappingIter(ToRaw(rorp_iter)) + +def FromFile(fileobj): + """Recover rorp iterator from file interface""" + return FromRaw(iterfile.IterWrappingFile(fileobj)) + +def IterateRPaths(base_rp): + """Return an iterator yielding RPaths with given base rp""" + yield base_rp + if base_rp.isdir(): + dirlisting = base_rp.listdir() + dirlisting.sort() + for filename in dirlisting: + for rp in IterateRPaths(base_rp.append(filename)): + yield rp + +def Signatures(rp_iter): + """Yield signatures of rpaths in given rp_iter""" + def error_handler(exc, rp): + log.Log("Error generating signature for %s" % rp.path) + return None + + for rp in rp_iter: + if rp.isplaceholder(): yield rp + else: + rorp = rp.getRORPath() + if rp.isreg(): + if rp.isflaglinked(): rorp.flaglinked() + else: + fp = robust.check_common_error( + error_handler, Rdiff.get_signature, (rp,)) + if fp: rorp.setfile(fp) + else: continue + yield rorp + +def GetSignatureIter(base_rp): + """Return a signature iterator recurring over the base_rp""" + return Signatures(IterateRPaths(base_rp)) + +def CollateIterators(*rorp_iters): + """Collate RORPath iterators by index - The main structure will be an iterator that yields RORPaths. - Every RORPath has a "raw" form that makes it more amenable to - being turned into a file. The raw form of the iterator yields - each RORPath in the form of the tuple (index, data_dictionary, - files), where files is the number of files attached (usually 1 or - 0). After that, if a file is attached, it yields that file. + So it takes two or more iterators of rorps and returns an + iterator yielding tuples like (rorp1, rorp2) with the same + index. If one or the other lacks that index, it will be None """ - def ToRaw(rorp_iter): - """Convert a rorp iterator to raw form""" - for rorp in rorp_iter: - if rorp.file: - yield (rorp.index, rorp.data, 1) - yield rorp.file - else: yield (rorp.index, rorp.data, 0) - - def FromRaw(raw_iter): - """Convert raw rorp iter back to standard form""" - for index, data, num_files in raw_iter: - rorp = RORPath(index, data) - if num_files: - assert num_files == 1, "Only one file accepted right now" - rorp.setfile(RORPIter.getnext(raw_iter)) - yield rorp + # overflow[i] means that iter[i] has been exhausted + # rorps[i] is None means that it is time to replenish it. + iter_num = len(rorp_iters) + if iter_num == 2: + return Collate2Iters(rorp_iters[0], rorp_iters[1]) + overflow = [None] * iter_num + rorps = overflow[:] + + def setrorps(overflow, rorps): + """Set the overflow and rorps list""" + for i in range(iter_num): + if not overflow[i] and rorps[i] is None: + try: rorps[i] = rorp_iters[i].next() + except StopIteration: + overflow[i] = 1 + rorps[i] = None - def ToFile(rorp_iter): - """Return file version of iterator""" - return FileWrappingIter(RORPIter.ToRaw(rorp_iter)) - - def FromFile(fileobj): - """Recover rorp iterator from file interface""" - return RORPIter.FromRaw(IterWrappingFile(fileobj)) - - def IterateRPaths(base_rp): - """Return an iterator yielding RPaths with given base rp""" - yield base_rp - if base_rp.isdir(): - dirlisting = base_rp.listdir() - dirlisting.sort() - for filename in dirlisting: - for rp in RORPIter.IterateRPaths(base_rp.append(filename)): - yield rp - - def Signatures(rp_iter): - """Yield signatures of rpaths in given rp_iter""" - def error_handler(exc, rp): - Log("Error generating signature for %s" % rp.path) - return None - - for rp in rp_iter: - if rp.isplaceholder(): yield rp - else: - rorp = rp.getRORPath() - if rp.isreg(): - if rp.isflaglinked(): rorp.flaglinked() - else: - fp = Robust.check_common_error( - error_handler, Rdiff.get_signature, (rp,)) - if fp: rorp.setfile(fp) - else: continue - yield rorp - - def GetSignatureIter(base_rp): - """Return a signature iterator recurring over the base_rp""" - return RORPIter.Signatures(RORPIter.IterateRPaths(base_rp)) - - def CollateIterators(*rorp_iters): - """Collate RORPath iterators by index - - So it takes two or more iterators of rorps and returns an - iterator yielding tuples like (rorp1, rorp2) with the same - index. If one or the other lacks that index, it will be None + def getleastindex(rorps): + """Return the first index in rorps, assuming rorps isn't empty""" + return min(map(lambda rorp: rorp.index, + filter(lambda x: x, rorps))) - """ - # overflow[i] means that iter[i] has been exhausted - # rorps[i] is None means that it is time to replenish it. - iter_num = len(rorp_iters) - if iter_num == 2: - return RORPIter.Collate2Iters(rorp_iters[0], rorp_iters[1]) - overflow = [None] * iter_num - rorps = overflow[:] - - def setrorps(overflow, rorps): - """Set the overflow and rorps list""" - for i in range(iter_num): - if not overflow[i] and rorps[i] is None: - try: rorps[i] = rorp_iters[i].next() - except StopIteration: - overflow[i] = 1 - rorps[i] = None - - def getleastindex(rorps): - """Return the first index in rorps, assuming rorps isn't empty""" - return min(map(lambda rorp: rorp.index, - filter(lambda x: x, rorps))) - - def yield_tuples(iter_num, overflow, rorps): - while 1: - setrorps(overflow, rorps) - if not None in overflow: break - - index = getleastindex(rorps) - yieldval = [] - for i in range(iter_num): - if rorps[i] and rorps[i].index == index: - yieldval.append(rorps[i]) - rorps[i] = None - else: yieldval.append(None) - yield IndexedTuple(index, yieldval) - return yield_tuples(iter_num, overflow, rorps) - - def Collate2Iters(riter1, riter2): - """Special case of CollateIterators with 2 arguments - - This does the same thing but is faster because it doesn't have - to consider the >2 iterator case. Profiler says speed is - important here. - - """ - relem1, relem2 = None, None + def yield_tuples(iter_num, overflow, rorps): while 1: - if not relem1: - try: relem1 = riter1.next() - except StopIteration: - if relem2: yield IndexedTuple(index2, (None, relem2)) - for relem2 in riter2: - yield IndexedTuple(relem2.index, (None, relem2)) - break - index1 = relem1.index - if not relem2: - try: relem2 = riter2.next() - except StopIteration: - if relem1: yield IndexedTuple(index1, (relem1, None)) - for relem1 in riter1: - yield IndexedTuple(relem1.index, (relem1, None)) - break - index2 = relem2.index - - if index1 < index2: - yield IndexedTuple(index1, (relem1, None)) - relem1 = None - elif index1 == index2: - yield IndexedTuple(index1, (relem1, relem2)) - relem1, relem2 = None, None - else: # index2 is less - yield IndexedTuple(index2, (None, relem2)) - relem2 = None - - def getnext(iter): - """Return the next element of an iterator, raising error if none""" - try: next = iter.next() - except StopIteration: raise RORPIterException("Unexpected end to iter") - return next - - def GetDiffIter(sig_iter, new_iter): - """Return delta iterator from sig_iter to new_iter - - The accompanying file for each will be a delta as produced by - rdiff, unless the destination file does not exist, in which - case it will be the file in its entirety. - - sig_iter may be composed of rorps, but new_iter should have - full RPaths. + setrorps(overflow, rorps) + if not None in overflow: break - """ - collated_iter = RORPIter.CollateIterators(sig_iter, new_iter) - for rorp, rp in collated_iter: yield RORPIter.diffonce(rorp, rp) - - def diffonce(sig_rorp, new_rp): - """Return one diff rorp, based from signature rorp and orig rp""" - if sig_rorp and Globals.preserve_hardlinks and sig_rorp.isflaglinked(): - if new_rp: diff_rorp = new_rp.getRORPath() - else: diff_rorp = RORPath(sig_rorp.index) - diff_rorp.flaglinked() - return diff_rorp - elif sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg(): - diff_rorp = new_rp.getRORPath() - #fp = sig_rorp.open("rb") - #print "---------------------", fp - #tmp_sig_rp = RPath(Globals.local_connection, "/tmp/sig") - #tmp_sig_rp.delete() - #tmp_sig_rp.write_from_fileobj(fp) - #diff_rorp.setfile(Rdiff.get_delta_sigfileobj(tmp_sig_rp.open("rb"), - # new_rp)) - diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"), - new_rp)) - diff_rorp.set_attached_filetype('diff') + index = getleastindex(rorps) + yieldval = [] + for i in range(iter_num): + if rorps[i] and rorps[i].index == index: + yieldval.append(rorps[i]) + rorps[i] = None + else: yieldval.append(None) + yield IndexedTuple(index, yieldval) + return yield_tuples(iter_num, overflow, rorps) + +def Collate2Iters(riter1, riter2): + """Special case of CollateIterators with 2 arguments + + This does the same thing but is faster because it doesn't have + to consider the >2 iterator case. Profiler says speed is + important here. + + """ + relem1, relem2 = None, None + while 1: + if not relem1: + try: relem1 = riter1.next() + except StopIteration: + if relem2: yield IndexedTuple(index2, (None, relem2)) + for relem2 in riter2: + yield IndexedTuple(relem2.index, (None, relem2)) + break + index1 = relem1.index + if not relem2: + try: relem2 = riter2.next() + except StopIteration: + if relem1: yield IndexedTuple(index1, (relem1, None)) + for relem1 in riter1: + yield IndexedTuple(relem1.index, (relem1, None)) + break + index2 = relem2.index + + if index1 < index2: + yield IndexedTuple(index1, (relem1, None)) + relem1 = None + elif index1 == index2: + yield IndexedTuple(index1, (relem1, relem2)) + relem1, relem2 = None, None + else: # index2 is less + yield IndexedTuple(index2, (None, relem2)) + relem2 = None + +def getnext(iter): + """Return the next element of an iterator, raising error if none""" + try: next = iter.next() + except StopIteration: raise RORPIterException("Unexpected end to iter") + return next + +def GetDiffIter(sig_iter, new_iter): + """Return delta iterator from sig_iter to new_iter + + The accompanying file for each will be a delta as produced by + rdiff, unless the destination file does not exist, in which + case it will be the file in its entirety. + + sig_iter may be composed of rorps, but new_iter should have + full RPaths. + + """ + collated_iter = CollateIterators(sig_iter, new_iter) + for rorp, rp in collated_iter: yield diffonce(rorp, rp) + +def diffonce(sig_rorp, new_rp): + """Return one diff rorp, based from signature rorp and orig rp""" + if sig_rorp and Globals.preserve_hardlinks and sig_rorp.isflaglinked(): + if new_rp: diff_rorp = new_rp.getRORPath() + else: diff_rorp = rpath.RORPath(sig_rorp.index) + diff_rorp.flaglinked() + return diff_rorp + elif sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg(): + diff_rorp = new_rp.getRORPath() + #fp = sig_rorp.open("rb") + #print "---------------------", fp + #tmp_sig_rp = RPath(Globals.local_connection, "/tmp/sig") + #tmp_sig_rp.delete() + #tmp_sig_rp.write_from_fileobj(fp) + #diff_rorp.setfile(Rdiff.get_delta_sigfileobj(tmp_sig_rp.open("rb"), + # new_rp)) + diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"), + new_rp)) + diff_rorp.set_attached_filetype('diff') + return diff_rorp + else: + # Just send over originial if diff isn't appropriate + if sig_rorp: sig_rorp.close_if_necessary() + if not new_rp: return rpath.RORPath(sig_rorp.index) + elif new_rp.isreg(): + diff_rorp = new_rp.getRORPath(1) + diff_rorp.set_attached_filetype('snapshot') return diff_rorp - else: - # Just send over originial if diff isn't appropriate - if sig_rorp: sig_rorp.close_if_necessary() - if not new_rp: return RORPath(sig_rorp.index) - elif new_rp.isreg(): - diff_rorp = new_rp.getRORPath(1) - diff_rorp.set_attached_filetype('snapshot') - return diff_rorp - else: return new_rp.getRORPath() - - def PatchIter(base_rp, diff_iter): - """Patch the appropriate rps in basis_iter using diff_iter""" - basis_iter = RORPIter.IterateRPaths(base_rp) - collated_iter = RORPIter.CollateIterators(basis_iter, diff_iter) - for basisrp, diff_rorp in collated_iter: - RORPIter.patchonce_action(base_rp, basisrp, diff_rorp).execute() - - def patchonce_action(base_rp, basisrp, diff_rorp): - """Return action patching basisrp using diff_rorp""" - assert diff_rorp, "Missing diff index %s" % basisrp.index - if not diff_rorp.lstat(): - return RobustAction(None, lambda init_val: basisrp.delete(), None) - - if Globals.preserve_hardlinks and diff_rorp.isflaglinked(): - if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) - tf = TempFileManager.new(basisrp) - def init(): Hardlink.link_rp(diff_rorp, tf, basisrp) - return Robust.make_tf_robustaction(init, tf, basisrp) - elif basisrp and basisrp.isreg() and diff_rorp.isreg(): - if diff_rorp.get_attached_filetype() != 'diff': - raise RPathException("File %s appears to have changed during" - " processing, skipping" % (basisrp.path,)) - return Rdiff.patch_with_attribs_action(basisrp, diff_rorp) - else: # Diff contains whole file, just copy it over - if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) - return Robust.copy_with_attribs_action(diff_rorp, basisrp) - -MakeStatic(RORPIter) + else: return new_rp.getRORPath() + +def PatchIter(base_rp, diff_iter): + """Patch the appropriate rps in basis_iter using diff_iter""" + basis_iter = IterateRPaths(base_rp) + collated_iter = CollateIterators(basis_iter, diff_iter) + for basisrp, diff_rorp in collated_iter: + patchonce_action(base_rp, basisrp, diff_rorp).execute() + +def patchonce_action(base_rp, basisrp, diff_rorp): + """Return action patching basisrp using diff_rorp""" + assert diff_rorp, "Missing diff index %s" % basisrp.index + if not diff_rorp.lstat(): + return robust.Action(None, lambda init_val: basisrp.delete(), None) + + if Globals.preserve_hardlinks and diff_rorp.isflaglinked(): + if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) + tf = TempFile.new(basisrp) + def init(): Hardlink.link_rp(diff_rorp, tf, basisrp) + return robust.make_tf_robustaction(init, tf, basisrp) + elif basisrp and basisrp.isreg() and diff_rorp.isreg(): + if diff_rorp.get_attached_filetype() != 'diff': + raise rpath.RPathException("File %s appears to have changed during" + " processing, skipping" % (basisrp.path,)) + return Rdiff.patch_with_attribs_action(basisrp, diff_rorp) + else: # Diff contains whole file, just copy it over + if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) + return robust.copy_with_attribs_action(diff_rorp, basisrp) class IndexedTuple(UserList.UserList): @@ -299,3 +291,300 @@ class IndexedTuple(UserList.UserList): def __str__(self): return "(%s).%s" % (", ".join(map(str, self.data)), self.index) + + +class DirHandler: + """Handle directories when entering and exiting in mirror + + The problem is that we may need to write to a directory that may + have only read and exec permissions. Also, when leaving a + directory tree, we may have modified the directory and thus + changed the mod and access times. These need to be updated when + leaving. + + """ + def __init__(self, rootrp): + """DirHandler initializer - call with root rpath of mirror dir""" + self.rootrp = rootrp + assert rootrp.index == () + self.cur_dir_index = None # Current directory we have descended into + self.last_index = None # last index processed + + # This dictionary maps indicies to (rpath, (atime, mtime), + # perms) triples. Either or both of the time pair and perms + # can be None, which means not to update the times or the + # perms when leaving. We don't have to update the perms if we + # didn't have to change them in the first place. If a + # directory is explicitly given, then we don't have to update + # anything because it will be done by the normal process. + self.index_dict = {} + + def process_old_directories(self, new_dir_index): + """Update times/permissions for directories we are leaving + + Returns greatest index of the current index that has been seen + before (i.e. no need to process up to then as new dir). + + """ + if self.cur_dir_index is None: return -1 # no previous directory + + i = len(self.cur_dir_index) + while 1: + if new_dir_index[:i] == self.cur_dir_index[:i]: + return i + self.process_old_dir(self.cur_dir_index[:i]) + i-=1 + + def process_old_dir(self, dir_index): + """Process outstanding changes for given dir index""" + rpath, times, perms = self.index_dict[dir_index] + if times: apply(rpath.settime, times) + if perms: rpath.chmod(perms) + + def init_new_dirs(self, rpath, new_dir_index, common_dir_index): + """Initialize any new directories + + Record the time, and change permissions if no write access. + Use rpath if it is given to access permissions and times. + + """ + for i in range(common_dir_index, len(new_dir_index)): + process_index = new_dir_index[:i] + if rpath.index == process_index: + self.index_dict[process_index] = (None, None, None) + else: + new_rpath = self.rootrp.new_index(process_index) + if new_rpath.hasfullperms(): perms = None + else: perms = new_rpath.getperms() + times = (new_rpath.getatime(), new_rpath.getmtime()) + self.index_dict[process_index] = new_rpath, times, perms + + def __call__(self, rpath): + """Given rpath, process containing directories""" + if rpath.isdir(): new_dir_index = rpath.index + elif not rpath.index: return # no directory contains root + else: new_dir_index = rpath.index[:-1] + + common_dir_index = self.process_old_directories(new_dir_index) + self.init_new_dirs(rpath, new_dir_index, common_dir_index) + self.cur_dir_index = new_dir_index + + def Finish(self): + """Process any remaining directories""" + indicies = self.index_dict.keys() + indicies.sort() + assert len(indicies) >= 1, indicies + indicies.reverse() + map(self.process_old_dir, indicies) + + +def FillInIter(rpiter, rootrp): + """Given ordered rpiter and rootrp, fill in missing indicies with rpaths + + For instance, suppose rpiter contains rpaths with indicies (), + (1,2), (2,5). Then return iter with rpaths (), (1,), (1,2), (2,), + (2,5). This is used when we need to process directories before or + after processing a file in that directory. + + """ + # Handle first element as special case + first_rp = rpiter.next() # StopIteration gets passed upwards + cur_index = first_rp.index + for i in range(len(cur_index)): + yield rootrp.new_index(cur_index[:i]) + yield first_rp + del first_rp + old_index = cur_index + + # Now do the others (1,2,3) (1,4,5) + for rp in rpiter: + cur_index = rp.index + if not cur_index[:-1] == old_index[:-1]: # Handle special case quickly + for i in range(1, len(cur_index)): # i==0 case already handled + if cur_index[:i] != old_index[:i]: + yield rootrp.new_index(cur_index[:i]) + yield rp + old_index = cur_index + + +class IterTreeReducer: + """Tree style reducer object for iterator + + The indicies of a RORPIter form a tree type structure. This class + can be used on each element of an iter in sequence and the result + will be as if the corresponding tree was reduced. This tries to + bridge the gap between the tree nature of directories, and the + iterator nature of the connection between hosts and the temporal + order in which the files are processed. + + """ + def __init__(self, branch_class, branch_args): + """ITR initializer""" + self.branch_class = branch_class + self.branch_args = branch_args + self.index = None + self.root_branch = branch_class(*branch_args) + self.branches = [self.root_branch] + + def finish_branches(self, index): + """Run Finish() on all branches index has passed + + When we pass out of a branch, delete it and process it with + the parent. The innermost branches will be the last in the + list. Return None if we are out of the entire tree, and 1 + otherwise. + + """ + branches = self.branches + while 1: + to_be_finished = branches[-1] + base_index = to_be_finished.base_index + if base_index != index[:len(base_index)]: + # out of the tree, finish with to_be_finished + to_be_finished.call_end_proc() + del branches[-1] + if not branches: return None + branches[-1].branch_process(to_be_finished) + else: return 1 + + def add_branch(self, index): + """Return branch of type self.branch_class, add to branch list""" + branch = self.branch_class(*self.branch_args) + branch.base_index = index + self.branches.append(branch) + return branch + + def process_w_branch(self, branch, args): + """Run start_process on latest branch""" + robust.check_common_error(branch.on_error, + branch.start_process, args) + if not branch.caught_exception: branch.start_successful = 1 + + def Finish(self): + """Call at end of sequence to tie everything up""" + while 1: + to_be_finished = self.branches.pop() + to_be_finished.call_end_proc() + if not self.branches: break + self.branches[-1].branch_process(to_be_finished) + + def __call__(self, *args): + """Process args, where args[0] is current position in iterator + + Returns true if args successfully processed, false if index is + not in the current tree and thus the final result is + available. + + Also note below we set self.index after doing the necessary + start processing, in case there is a crash in the middle. + + """ + index = args[0] + if self.index is None: + self.root_branch.base_index = index + self.process_w_branch(self.root_branch, args) + self.index = index + return 1 + + if index <= self.index: + log.Log("Warning: oldindex %s >= newindex %s" % + (self.index, index), 2) + return 1 + + if self.finish_branches(index) is None: + return None # We are no longer in the main tree + last_branch = self.branches[-1] + if last_branch.start_successful: + if last_branch.can_fast_process(*args): + last_branch.fast_process(*args) + else: + branch = self.add_branch(index) + self.process_w_branch(branch, args) + else: last_branch.log_prev_error(index) + + self.index = index + return 1 + + +class ITRBranch: + """Helper class for IterTreeReducer below + + There are five stub functions below: start_process, end_process, + branch_process, can_fast_process, and fast_process. A class that + subclasses this one will probably fill in these functions to do + more. + + It is important that this class be pickable, so keep that in mind + when subclassing (this is used to resume failed sessions). + + """ + base_index = index = None + finished = None + caught_exception = start_successful = None + + def call_end_proc(self): + """Runs the end_process on self, checking for errors""" + if self.finished or not self.start_successful: + self.caught_exception = 1 + if self.caught_exception: self.log_prev_error(self.base_index) + else: robust.check_common_error(self.on_error, self.end_process) + self.finished = 1 + + def start_process(self, *args): + """Do some initial processing (stub)""" + pass + + def end_process(self): + """Do any final processing before leaving branch (stub)""" + pass + + def branch_process(self, branch): + """Process a branch right after it is finished (stub)""" + assert branch.finished + pass + + def can_fast_process(self, *args): + """True if object can be processed without new branch (stub)""" + return None + + def fast_process(self, *args): + """Process args without new child branch (stub)""" + pass + + def on_error(self, exc, *args): + """This is run on any exception in start/end-process""" + self.caught_exception = 1 + if args and args[0] and isinstance(args[0], tuple): + filename = os.path.join(*args[0]) + elif self.index: filename = os.path.join(*self.index) + else: filename = "." + log.Log("Error '%s' processing %s" % (exc, filename), 2) + + def log_prev_error(self, index): + """Call function if no pending exception""" + log.Log("Skipping %s because of previous error" % + (os.path.join(*index),), 2) + + +class DestructiveSteppingFinalizer(ITRBranch): + """Finalizer that can work on an iterator of dsrpaths + + The reason we have to use an IterTreeReducer is that some files + should be updated immediately, but for directories we sometimes + need to update all the files in the directory before finally + coming back to it. + + """ + dsrpath = None + def start_process(self, index, dsrpath): + self.dsrpath = dsrpath + + def end_process(self): + if self.dsrpath: self.dsrpath.write_changes() + + def can_fast_process(self, index, dsrpath): + return not self.dsrpath.isdir() + + def fast_process(self, index, dsrpath): + if self.dsrpath: self.dsrpath.write_changes() + |