diff options
Diffstat (limited to 'rdiff-backup/rdiff_backup/backup.py')
-rw-r--r-- | rdiff-backup/rdiff_backup/backup.py | 210 |
1 files changed, 155 insertions, 55 deletions
diff --git a/rdiff-backup/rdiff_backup/backup.py b/rdiff-backup/rdiff_backup/backup.py index 2f3d362..78bfbe9 100644 --- a/rdiff-backup/rdiff_backup/backup.py +++ b/rdiff-backup/rdiff_backup/backup.py @@ -28,24 +28,22 @@ def Mirror(src_rpath, dest_rpath): SourceS = src_rpath.conn.backup.SourceStruct DestS = dest_rpath.conn.backup.DestinationStruct - DestS.init_statistics() source_rpiter = SourceS.get_source_select() - dest_sigiter = DestS.process_source_get_sigs(dest_rpath, source_rpiter, 0) + DestS.set_rorp_cache(dest_rpath, source_rpiter, 0) + dest_sigiter = DestS.get_sigs() source_diffiter = SourceS.get_diffs(dest_sigiter) DestS.patch(dest_rpath, source_diffiter) - DestS.write_statistics() def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath): """Mirror + put increments in tree based at inc_rpath""" SourceS = src_rpath.conn.backup.SourceStruct DestS = dest_rpath.conn.backup.DestinationStruct - DestS.init_statistics() source_rpiter = SourceS.get_source_select() - dest_sigiter = DestS.process_source_get_sigs(dest_rpath, source_rpiter, 1) + DestS.set_rorp_cache(dest_rpath, source_rpiter, 1) + dest_sigiter = DestS.get_sigs() source_diffiter = SourceS.get_diffs(dest_sigiter) DestS.patch_and_increment(dest_rpath, source_diffiter, inc_rpath) - DestS.write_statistics() class SourceStruct: @@ -99,14 +97,6 @@ static.MakeClass(SourceStruct) class DestinationStruct: """Hold info used by destination side when backing up""" - def init_statistics(cls): - """Set cls.stats to StatFileObj object""" - cls.statfileobj = statistics.init_statfileobj() - - def write_statistics(cls): - """Write statistics file""" - statistics.write_active_statfileobj() - def get_dest_select(cls, rpath, use_metadata = 1): """Return destination select rorpath iterator @@ -125,66 +115,175 @@ class DestinationStruct: sel.parse_rbdir_exclude() return sel.set_iter() - def dest_iter_filter(cls, dest_iter): - """Destination rorps pass through this - record stats""" - for dest_rorp in dest_iter: - cls.statfileobj.add_dest_file(dest_rorp) - Hardlink.add_rorp(dest_rorp, source = 0) - yield dest_rorp - - def src_iter_filter(cls, source_iter): - """Source rorps pass through this - record stats, write metadata""" - metadata.OpenMetadata() - for src_rorp in source_iter: - cls.statfileobj.add_source_file(src_rorp) - Hardlink.add_rorp(src_rorp, source = 1) - metadata.WriteMetadata(src_rorp) - yield src_rorp - metadata.CloseMetadata() + def set_rorp_cache(cls, baserp, source_iter, for_increment): + """Initialize cls.CCPP, the destination rorp cache - def process_source_get_sigs(cls, baserp, source_iter, for_increment): - """Process the source rorpiter and return signatures of dest dir - - Write all metadata to file, then return signatures of any - destination files that have changed. for_increment should be - true if we are mirror+incrementing, and false if we are just - mirroring. + for_increment should be true if we are mirror+incrementing, + false if we are just mirroring. """ - source_iter = cls.src_iter_filter(source_iter) - dest_iter = cls.dest_iter_filter(cls.get_dest_select(baserp, - for_increment)) - for index in rorpiter.get_dissimilar_indicies(source_iter, dest_iter, - cls.statfileobj): - dest_rp = baserp.new_index(index) - dest_sig = dest_rp.getRORPath() - if Globals.preserve_hardlinks and Hardlink.islinked(dest_rp): - dest_sig.flaglinked(Hardlink.get_link_index(dest_rp)) - elif dest_rp.isreg(): - dest_sig.setfile(Rdiff.get_signature(dest_rp)) - yield dest_sig + dest_iter = cls.get_dest_select(baserp, for_increment) + collated = rorpiter.Collate2Iters(source_iter, dest_iter) + cls.CCPP = CacheCollatedPostProcess(collated, + Globals.pipeline_max_length*2) + + def get_sigs(cls): + """Yield signatures of any changed destination files""" + for src_rorp, dest_rorp in cls.CCPP: + if (src_rorp and dest_rorp and src_rorp == dest_rorp and + (not Globals.preserve_hardlinks or + Hardlink.rorp_eq(src_rorp, dest_rorp))): continue + index = src_rorp and src_rorp.index or dest_rorp.index + cls.CCPP.flag_changed(index) + if (Globals.preserve_hardlinks and + Hardlink.islinked(src_rorp or dest_rorp)): + dest_sig = rpath.RORPath(index) + dest_sig.flaglinked(Hardlink.get_link_index(dest_sig)) + elif dest_rorp: + dest_sig = dest_rorp.getRORPath() + if dest_rorp.isreg(): + dest_sig.setfile(Rdiff.get_signature(dest_rorp)) + else: dest_sig = rpath.RORPath(index) + yield dest_sig def patch(cls, dest_rpath, source_diffiter, start_index = ()): """Patch dest_rpath with an rorpiter of diffs""" - ITR = rorpiter.IterTreeReducer(PatchITRB, [dest_rpath]) + ITR = rorpiter.IterTreeReducer(PatchITRB, [dest_rpath, cls.CCPP]) for diff in rorpiter.FillInIter(source_diffiter, dest_rpath): log.Log("Processing changed file " + diff.get_indexpath(), 5) ITR(diff.index, diff) ITR.Finish() + cls.CCPP.close() dest_rpath.setdata() def patch_and_increment(cls, dest_rpath, source_diffiter, inc_rpath): """Patch dest_rpath with rorpiter of diffs and write increments""" - ITR = rorpiter.IterTreeReducer(IncrementITRB, [dest_rpath, inc_rpath]) + ITR = rorpiter.IterTreeReducer(IncrementITRB, + [dest_rpath, inc_rpath, cls.CCPP]) for diff in rorpiter.FillInIter(source_diffiter, dest_rpath): log.Log("Processing changed file " + diff.get_indexpath(), 5) ITR(diff.index, diff) ITR.Finish() + cls.CCPP.close() dest_rpath.setdata() static.MakeClass(DestinationStruct) +class CacheCollatedPostProcess: + """Cache a collated iter of (source_rorp, dest_rp) pairs + + This is necessary for two reasons: + + 1. The patch function may need the original source_rorp or + dest_rp information, which is not present in the diff it + receives. + + 2. The metadata must match what is stored in the destination + directory. If there is an error we do not update the dest + directory for that file, and the old metadata is used. Thus + we cannot write any metadata until we know the file has been + procesed correctly. + + The class caches older source_rorps and dest_rps so the patch + function can retrieve them if necessary. The patch function can + also update the processed correctly flag. When an item falls out + of the cache, we assume it has been processed, and write the + metadata for it. + + """ + def __init__(self, collated_iter, cache_size): + """Initialize new CCWP.""" + self.iter = collated_iter # generates (source_rorp, dest_rorp) pairs + self.cache_size = cache_size + self.statfileobj = statistics.init_statfileobj() + metadata.OpenMetadata() + + # the following should map indicies to lists [source_rorp, + # dest_rorp, changed_flag, success_flag] where changed_flag + # should be true if the rorps are different, and success_flag + # should be true if dest_rorp has been successfully updated to + # source_rorp. They both default to false. + self.cache_dict = {} + self.cache_indicies = [] + + def __iter__(self): return self + + def next(self): + """Return next (source_rorp, dest_rorp) pair. StopIteration passed""" + source_rorp, dest_rorp = self.iter.next() + self.pre_process(source_rorp, dest_rorp) + index = source_rorp and source_rorp.index or dest_rorp.index + self.cache_dict[index] = [source_rorp, dest_rorp, 0, 0] + self.cache_indicies.append(index) + + if len(self.cache_indicies) > self.cache_size: self.shorten_cache() + return source_rorp, dest_rorp + + def pre_process(self, source_rorp, dest_rorp): + """Do initial processing on source_rorp and dest_rorp + + It will not be clear whether source_rorp and dest_rorp have + errors at this point, so don't do anything which assumes they + will be backed up correctly. + + """ + if source_rorp: Hardlink.add_rorp(source_rorp, source = 1) + if dest_rorp: Hardlink.add_rorp(dest_rorp, source = 0) + + def shorten_cache(self): + """Remove one element from cache, possibly adding it to metadata""" + first_index = self.cache_indicies[0] + del self.cache_indicies[0] + old_source_rorp, old_dest_rorp, changed_flag, success_flag = \ + self.cache_dict[first_index] + del self.cache_dict[first_index] + self.post_process(old_source_rorp, old_dest_rorp, + changed_flag, success_flag) + + def post_process(self, source_rorp, dest_rorp, changed, success): + """Post process source_rorp and dest_rorp. + + changed will be true if the files have changed. success will + be true if the files have been successfully updated (this is + always false for un-changed files). + + """ + if not changed or success: + self.statfileobj.add_source_file(source_rorp) + self.statfileobj.add_dest_file(dest_rorp) + if success: + self.statfileobj.add_changed(source_rorp, dest_rorp) + metadata_rorp = source_rorp + else: + metadata_rorp = dest_rorp + if changed: self.statfileobj.add_error() + if metadata_rorp and metadata_rorp.lstat(): + metadata.WriteMetadata(metadata_rorp) + + def flag_success(self, index): + """Signal that the file with given index was updated successfully""" + self.cache_dict[index][3] = 1 + + def flag_changed(self, index): + """Signal that the file with given index has changed""" + self.cache_dict[index][2] = 1 + + def get_rorps(self, index): + """Retrieve (source_rorp, dest_rorp) from cache""" + return self.cache_dict[index][:2] + + def get_source_rorp(self, index): + """Retrieve source_rorp with given index from cache""" + return self.cache_dict[index][0] + + def close(self): + """Process the remaining elements in the cache""" + while self.cache_indicies: self.shorten_cache() + metadata.CloseMetadata() + statistics.write_active_statfileobj() + + class PatchITRB(rorpiter.ITRBranch): """Patch an rpath with the given diff iters (use with IterTreeReducer) @@ -195,7 +294,7 @@ class PatchITRB(rorpiter.ITRBranch): contents. """ - def __init__(self, basis_root_rp): + def __init__(self, basis_root_rp, rorp_cache): """Set basis_root_rp, the base of the tree to be incremented""" self.basis_root_rp = basis_root_rp assert basis_root_rp.conn is Globals.local_connection @@ -267,7 +366,8 @@ class PatchITRB(rorpiter.ITRBranch): else: assert self.dir_replacement self.base_rp.rmdir() - rpath.rename(self.dir_replacement, self.base_rp) + if self.dir_replacement.lstat(): + rpath.rename(self.dir_replacement, self.base_rp) class IncrementITRB(PatchITRB): @@ -276,10 +376,10 @@ class IncrementITRB(PatchITRB): Like PatchITRB, but this time also write increments. """ - def __init__(self, basis_root_rp, inc_root_rp): + def __init__(self, basis_root_rp, inc_root_rp, rorp_cache): self.inc_root_rp = inc_root_rp self.cached_incrp = None - PatchITRB.__init__(self, basis_root_rp) + PatchITRB.__init__(self, basis_root_rp, rorp_cache) def get_incrp(self, index): """Return inc RPath by adding index to self.basis_root_rp""" |