diff options
Diffstat (limited to 'rdiff-backup/rdiff_backup/backup.py')
-rw-r--r-- | rdiff-backup/rdiff_backup/backup.py | 67 |
1 files changed, 46 insertions, 21 deletions
diff --git a/rdiff-backup/rdiff_backup/backup.py b/rdiff-backup/rdiff_backup/backup.py index 0ee782c..0ff5ace 100644 --- a/rdiff-backup/rdiff_backup/backup.py +++ b/rdiff-backup/rdiff_backup/backup.py @@ -22,7 +22,7 @@ from __future__ import generators import errno import Globals, metadata, rorpiter, TempFile, Hardlink, robust, increment, \ - rpath, static, log, selection, Time, Rdiff, statistics + rpath, static, log, selection, Time, Rdiff, statistics, iterfile def Mirror(src_rpath, dest_rpath): """Turn dest_rpath into a copy of src_rpath""" @@ -65,7 +65,7 @@ class SourceStruct: sel = selection.Select(rpath) sel.ParseArgs(tuplelist, filelists) sel.set_iter() - cache_size = Globals.pipeline_max_length * 2 # 2 because to and from + cache_size = Globals.pipeline_max_length * 3 # to and from+leeway cls.source_select = rorpiter.CacheIndexable(sel, cache_size) def get_source_select(cls): @@ -96,6 +96,9 @@ class SourceStruct: diff_rorp.set_attached_filetype('snapshot') for dest_sig in dest_sigiter: + if dest_sig is iterfile.RORPIterFlushRepeat: + yield iterfile.RORPIterFlush # Flush buffer when get_sigs does + continue src_rp = (source_rps.get(dest_sig.index) or rpath.RORPath(dest_sig.index)) diff_rorp = src_rp.getRORPath() @@ -139,29 +142,48 @@ class DestinationStruct: """ dest_iter = cls.get_dest_select(baserp, for_increment) collated = rorpiter.Collate2Iters(source_iter, dest_iter) - cls.CCPP = CacheCollatedPostProcess(collated, - Globals.pipeline_max_length*2) + cls.CCPP = CacheCollatedPostProcess( + collated, Globals.pipeline_max_length*4) + # pipeline len adds some leeway over just*3 (to and from and back) def get_sigs(cls, dest_base_rpath): - """Yield signatures of any changed destination files""" + """Yield signatures of any changed destination files + + If we are backing up across a pipe, we must flush the pipeline + every so often so it doesn't get congested on destination end. + + """ + flush_threshold = int(Globals.pipeline_max_length/2) + num_rorps_skipped = 0 for src_rorp, dest_rorp in cls.CCPP: if (src_rorp and dest_rorp and src_rorp == dest_rorp and (not Globals.preserve_hardlinks or - Hardlink.rorp_eq(src_rorp, dest_rorp))): continue - index = src_rorp and src_rorp.index or dest_rorp.index - cls.CCPP.flag_changed(index) - if (Globals.preserve_hardlinks and - Hardlink.islinked(src_rorp or dest_rorp)): - dest_sig = rpath.RORPath(index) - dest_sig.flaglinked(Hardlink.get_link_index(dest_sig)) - elif dest_rorp: - dest_sig = dest_rorp.getRORPath() - if dest_rorp.isreg(): - dest_rp = dest_base_rpath.new_index(index) - assert dest_rp.isreg() - dest_sig.setfile(Rdiff.get_signature(dest_rp)) - else: dest_sig = rpath.RORPath(index) - yield dest_sig + Hardlink.rorp_eq(src_rorp, dest_rorp))): + num_rorps_skipped += 1 + if (Globals.backup_reader is not Globals.backup_writer and + num_rorps_skipped > flush_threshold): + num_rorps_skipped = 0 + yield iterfile.RORPIterFlushRepeat + else: + index = src_rorp and src_rorp.index or dest_rorp.index + cls.CCPP.flag_changed(index) + yield cls.get_one_sig(dest_base_rpath, index, + src_rorp, dest_rorp) + + def get_one_sig(cls, dest_base_rpath, index, src_rorp, dest_rorp): + """Return a signature given source and destination rorps""" + if (Globals.preserve_hardlinks and + Hardlink.islinked(src_rorp or dest_rorp)): + dest_sig = rpath.RORPath(index) + dest_sig.flaglinked(Hardlink.get_link_index(dest_sig)) + elif dest_rorp: + dest_sig = dest_rorp.getRORPath() + if dest_rorp.isreg(): + dest_rp = dest_base_rpath.new_index(index) + assert dest_rp.isreg() + dest_sig.setfile(Rdiff.get_signature(dest_rp)) + else: dest_sig = rpath.RORPath(index) + return dest_sig def patch(cls, dest_rpath, source_diffiter, start_index = ()): """Patch dest_rpath with an rorpiter of diffs""" @@ -301,6 +323,9 @@ class CacheCollatedPostProcess: def get_source_rorp(self, index): """Retrieve source_rorp with given index from cache""" + assert index >= self.cache_indicies[0], \ + ("CCPP index out of order: %s %s" % + (repr(index), repr(self.cache_indicies[0]))) return self.cache_dict[index][0] def get_mirror_rorp(self, index): @@ -388,7 +413,7 @@ class PatchITRB(rorpiter.ITRBranch): """ if not new_rp.isreg(): return 1 cached_rorp = self.CCPP.get_source_rorp(diff_rorp.index) - if cached_rorp.equal_loose(new_rp): return 1 + if cached_rorp and cached_rorp.equal_loose(new_rp): return 1 log.ErrorLog.write_if_open("UpdateError", diff_rorp, "Updated mirror " "temp file %s does not match source" % (new_rp.path,)) return 0 |