summaryrefslogtreecommitdiff
path: root/rdiff-backup/rdiff_backup/backup.py
diff options
context:
space:
mode:
Diffstat (limited to 'rdiff-backup/rdiff_backup/backup.py')
-rw-r--r--rdiff-backup/rdiff_backup/backup.py67
1 files changed, 46 insertions, 21 deletions
diff --git a/rdiff-backup/rdiff_backup/backup.py b/rdiff-backup/rdiff_backup/backup.py
index 0ee782c..0ff5ace 100644
--- a/rdiff-backup/rdiff_backup/backup.py
+++ b/rdiff-backup/rdiff_backup/backup.py
@@ -22,7 +22,7 @@
from __future__ import generators
import errno
import Globals, metadata, rorpiter, TempFile, Hardlink, robust, increment, \
- rpath, static, log, selection, Time, Rdiff, statistics
+ rpath, static, log, selection, Time, Rdiff, statistics, iterfile
def Mirror(src_rpath, dest_rpath):
"""Turn dest_rpath into a copy of src_rpath"""
@@ -65,7 +65,7 @@ class SourceStruct:
sel = selection.Select(rpath)
sel.ParseArgs(tuplelist, filelists)
sel.set_iter()
- cache_size = Globals.pipeline_max_length * 2 # 2 because to and from
+ cache_size = Globals.pipeline_max_length * 3 # to and from+leeway
cls.source_select = rorpiter.CacheIndexable(sel, cache_size)
def get_source_select(cls):
@@ -96,6 +96,9 @@ class SourceStruct:
diff_rorp.set_attached_filetype('snapshot')
for dest_sig in dest_sigiter:
+ if dest_sig is iterfile.RORPIterFlushRepeat:
+ yield iterfile.RORPIterFlush # Flush buffer when get_sigs does
+ continue
src_rp = (source_rps.get(dest_sig.index) or
rpath.RORPath(dest_sig.index))
diff_rorp = src_rp.getRORPath()
@@ -139,29 +142,48 @@ class DestinationStruct:
"""
dest_iter = cls.get_dest_select(baserp, for_increment)
collated = rorpiter.Collate2Iters(source_iter, dest_iter)
- cls.CCPP = CacheCollatedPostProcess(collated,
- Globals.pipeline_max_length*2)
+ cls.CCPP = CacheCollatedPostProcess(
+ collated, Globals.pipeline_max_length*4)
+ # pipeline len adds some leeway over just*3 (to and from and back)
def get_sigs(cls, dest_base_rpath):
- """Yield signatures of any changed destination files"""
+ """Yield signatures of any changed destination files
+
+ If we are backing up across a pipe, we must flush the pipeline
+ every so often so it doesn't get congested on destination end.
+
+ """
+ flush_threshold = int(Globals.pipeline_max_length/2)
+ num_rorps_skipped = 0
for src_rorp, dest_rorp in cls.CCPP:
if (src_rorp and dest_rorp and src_rorp == dest_rorp and
(not Globals.preserve_hardlinks or
- Hardlink.rorp_eq(src_rorp, dest_rorp))): continue
- index = src_rorp and src_rorp.index or dest_rorp.index
- cls.CCPP.flag_changed(index)
- if (Globals.preserve_hardlinks and
- Hardlink.islinked(src_rorp or dest_rorp)):
- dest_sig = rpath.RORPath(index)
- dest_sig.flaglinked(Hardlink.get_link_index(dest_sig))
- elif dest_rorp:
- dest_sig = dest_rorp.getRORPath()
- if dest_rorp.isreg():
- dest_rp = dest_base_rpath.new_index(index)
- assert dest_rp.isreg()
- dest_sig.setfile(Rdiff.get_signature(dest_rp))
- else: dest_sig = rpath.RORPath(index)
- yield dest_sig
+ Hardlink.rorp_eq(src_rorp, dest_rorp))):
+ num_rorps_skipped += 1
+ if (Globals.backup_reader is not Globals.backup_writer and
+ num_rorps_skipped > flush_threshold):
+ num_rorps_skipped = 0
+ yield iterfile.RORPIterFlushRepeat
+ else:
+ index = src_rorp and src_rorp.index or dest_rorp.index
+ cls.CCPP.flag_changed(index)
+ yield cls.get_one_sig(dest_base_rpath, index,
+ src_rorp, dest_rorp)
+
+ def get_one_sig(cls, dest_base_rpath, index, src_rorp, dest_rorp):
+ """Return a signature given source and destination rorps"""
+ if (Globals.preserve_hardlinks and
+ Hardlink.islinked(src_rorp or dest_rorp)):
+ dest_sig = rpath.RORPath(index)
+ dest_sig.flaglinked(Hardlink.get_link_index(dest_sig))
+ elif dest_rorp:
+ dest_sig = dest_rorp.getRORPath()
+ if dest_rorp.isreg():
+ dest_rp = dest_base_rpath.new_index(index)
+ assert dest_rp.isreg()
+ dest_sig.setfile(Rdiff.get_signature(dest_rp))
+ else: dest_sig = rpath.RORPath(index)
+ return dest_sig
def patch(cls, dest_rpath, source_diffiter, start_index = ()):
"""Patch dest_rpath with an rorpiter of diffs"""
@@ -301,6 +323,9 @@ class CacheCollatedPostProcess:
def get_source_rorp(self, index):
"""Retrieve source_rorp with given index from cache"""
+ assert index >= self.cache_indicies[0], \
+ ("CCPP index out of order: %s %s" %
+ (repr(index), repr(self.cache_indicies[0])))
return self.cache_dict[index][0]
def get_mirror_rorp(self, index):
@@ -388,7 +413,7 @@ class PatchITRB(rorpiter.ITRBranch):
"""
if not new_rp.isreg(): return 1
cached_rorp = self.CCPP.get_source_rorp(diff_rorp.index)
- if cached_rorp.equal_loose(new_rp): return 1
+ if cached_rorp and cached_rorp.equal_loose(new_rp): return 1
log.ErrorLog.write_if_open("UpdateError", diff_rorp, "Updated mirror "
"temp file %s does not match source" % (new_rp.path,))
return 0