summaryrefslogtreecommitdiff
path: root/rdiff-backup/rdiff_backup/backup.py
diff options
context:
space:
mode:
Diffstat (limited to 'rdiff-backup/rdiff_backup/backup.py')
-rw-r--r--rdiff-backup/rdiff_backup/backup.py210
1 files changed, 155 insertions, 55 deletions
diff --git a/rdiff-backup/rdiff_backup/backup.py b/rdiff-backup/rdiff_backup/backup.py
index 2f3d362..78bfbe9 100644
--- a/rdiff-backup/rdiff_backup/backup.py
+++ b/rdiff-backup/rdiff_backup/backup.py
@@ -28,24 +28,22 @@ def Mirror(src_rpath, dest_rpath):
SourceS = src_rpath.conn.backup.SourceStruct
DestS = dest_rpath.conn.backup.DestinationStruct
- DestS.init_statistics()
source_rpiter = SourceS.get_source_select()
- dest_sigiter = DestS.process_source_get_sigs(dest_rpath, source_rpiter, 0)
+ DestS.set_rorp_cache(dest_rpath, source_rpiter, 0)
+ dest_sigiter = DestS.get_sigs()
source_diffiter = SourceS.get_diffs(dest_sigiter)
DestS.patch(dest_rpath, source_diffiter)
- DestS.write_statistics()
def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath):
"""Mirror + put increments in tree based at inc_rpath"""
SourceS = src_rpath.conn.backup.SourceStruct
DestS = dest_rpath.conn.backup.DestinationStruct
- DestS.init_statistics()
source_rpiter = SourceS.get_source_select()
- dest_sigiter = DestS.process_source_get_sigs(dest_rpath, source_rpiter, 1)
+ DestS.set_rorp_cache(dest_rpath, source_rpiter, 1)
+ dest_sigiter = DestS.get_sigs()
source_diffiter = SourceS.get_diffs(dest_sigiter)
DestS.patch_and_increment(dest_rpath, source_diffiter, inc_rpath)
- DestS.write_statistics()
class SourceStruct:
@@ -99,14 +97,6 @@ static.MakeClass(SourceStruct)
class DestinationStruct:
"""Hold info used by destination side when backing up"""
- def init_statistics(cls):
- """Set cls.stats to StatFileObj object"""
- cls.statfileobj = statistics.init_statfileobj()
-
- def write_statistics(cls):
- """Write statistics file"""
- statistics.write_active_statfileobj()
-
def get_dest_select(cls, rpath, use_metadata = 1):
"""Return destination select rorpath iterator
@@ -125,66 +115,175 @@ class DestinationStruct:
sel.parse_rbdir_exclude()
return sel.set_iter()
- def dest_iter_filter(cls, dest_iter):
- """Destination rorps pass through this - record stats"""
- for dest_rorp in dest_iter:
- cls.statfileobj.add_dest_file(dest_rorp)
- Hardlink.add_rorp(dest_rorp, source = 0)
- yield dest_rorp
-
- def src_iter_filter(cls, source_iter):
- """Source rorps pass through this - record stats, write metadata"""
- metadata.OpenMetadata()
- for src_rorp in source_iter:
- cls.statfileobj.add_source_file(src_rorp)
- Hardlink.add_rorp(src_rorp, source = 1)
- metadata.WriteMetadata(src_rorp)
- yield src_rorp
- metadata.CloseMetadata()
+ def set_rorp_cache(cls, baserp, source_iter, for_increment):
+ """Initialize cls.CCPP, the destination rorp cache
- def process_source_get_sigs(cls, baserp, source_iter, for_increment):
- """Process the source rorpiter and return signatures of dest dir
-
- Write all metadata to file, then return signatures of any
- destination files that have changed. for_increment should be
- true if we are mirror+incrementing, and false if we are just
- mirroring.
+ for_increment should be true if we are mirror+incrementing,
+ false if we are just mirroring.
"""
- source_iter = cls.src_iter_filter(source_iter)
- dest_iter = cls.dest_iter_filter(cls.get_dest_select(baserp,
- for_increment))
- for index in rorpiter.get_dissimilar_indicies(source_iter, dest_iter,
- cls.statfileobj):
- dest_rp = baserp.new_index(index)
- dest_sig = dest_rp.getRORPath()
- if Globals.preserve_hardlinks and Hardlink.islinked(dest_rp):
- dest_sig.flaglinked(Hardlink.get_link_index(dest_rp))
- elif dest_rp.isreg():
- dest_sig.setfile(Rdiff.get_signature(dest_rp))
- yield dest_sig
+ dest_iter = cls.get_dest_select(baserp, for_increment)
+ collated = rorpiter.Collate2Iters(source_iter, dest_iter)
+ cls.CCPP = CacheCollatedPostProcess(collated,
+ Globals.pipeline_max_length*2)
+
+ def get_sigs(cls):
+ """Yield signatures of any changed destination files"""
+ for src_rorp, dest_rorp in cls.CCPP:
+ if (src_rorp and dest_rorp and src_rorp == dest_rorp and
+ (not Globals.preserve_hardlinks or
+ Hardlink.rorp_eq(src_rorp, dest_rorp))): continue
+ index = src_rorp and src_rorp.index or dest_rorp.index
+ cls.CCPP.flag_changed(index)
+ if (Globals.preserve_hardlinks and
+ Hardlink.islinked(src_rorp or dest_rorp)):
+ dest_sig = rpath.RORPath(index)
+ dest_sig.flaglinked(Hardlink.get_link_index(dest_sig))
+ elif dest_rorp:
+ dest_sig = dest_rorp.getRORPath()
+ if dest_rorp.isreg():
+ dest_sig.setfile(Rdiff.get_signature(dest_rorp))
+ else: dest_sig = rpath.RORPath(index)
+ yield dest_sig
def patch(cls, dest_rpath, source_diffiter, start_index = ()):
"""Patch dest_rpath with an rorpiter of diffs"""
- ITR = rorpiter.IterTreeReducer(PatchITRB, [dest_rpath])
+ ITR = rorpiter.IterTreeReducer(PatchITRB, [dest_rpath, cls.CCPP])
for diff in rorpiter.FillInIter(source_diffiter, dest_rpath):
log.Log("Processing changed file " + diff.get_indexpath(), 5)
ITR(diff.index, diff)
ITR.Finish()
+ cls.CCPP.close()
dest_rpath.setdata()
def patch_and_increment(cls, dest_rpath, source_diffiter, inc_rpath):
"""Patch dest_rpath with rorpiter of diffs and write increments"""
- ITR = rorpiter.IterTreeReducer(IncrementITRB, [dest_rpath, inc_rpath])
+ ITR = rorpiter.IterTreeReducer(IncrementITRB,
+ [dest_rpath, inc_rpath, cls.CCPP])
for diff in rorpiter.FillInIter(source_diffiter, dest_rpath):
log.Log("Processing changed file " + diff.get_indexpath(), 5)
ITR(diff.index, diff)
ITR.Finish()
+ cls.CCPP.close()
dest_rpath.setdata()
static.MakeClass(DestinationStruct)
+class CacheCollatedPostProcess:
+ """Cache a collated iter of (source_rorp, dest_rp) pairs
+
+ This is necessary for two reasons:
+
+ 1. The patch function may need the original source_rorp or
+ dest_rp information, which is not present in the diff it
+ receives.
+
+ 2. The metadata must match what is stored in the destination
+ directory. If there is an error we do not update the dest
+ directory for that file, and the old metadata is used. Thus
+ we cannot write any metadata until we know the file has been
+ procesed correctly.
+
+ The class caches older source_rorps and dest_rps so the patch
+ function can retrieve them if necessary. The patch function can
+ also update the processed correctly flag. When an item falls out
+ of the cache, we assume it has been processed, and write the
+ metadata for it.
+
+ """
+ def __init__(self, collated_iter, cache_size):
+ """Initialize new CCWP."""
+ self.iter = collated_iter # generates (source_rorp, dest_rorp) pairs
+ self.cache_size = cache_size
+ self.statfileobj = statistics.init_statfileobj()
+ metadata.OpenMetadata()
+
+ # the following should map indicies to lists [source_rorp,
+ # dest_rorp, changed_flag, success_flag] where changed_flag
+ # should be true if the rorps are different, and success_flag
+ # should be true if dest_rorp has been successfully updated to
+ # source_rorp. They both default to false.
+ self.cache_dict = {}
+ self.cache_indicies = []
+
+ def __iter__(self): return self
+
+ def next(self):
+ """Return next (source_rorp, dest_rorp) pair. StopIteration passed"""
+ source_rorp, dest_rorp = self.iter.next()
+ self.pre_process(source_rorp, dest_rorp)
+ index = source_rorp and source_rorp.index or dest_rorp.index
+ self.cache_dict[index] = [source_rorp, dest_rorp, 0, 0]
+ self.cache_indicies.append(index)
+
+ if len(self.cache_indicies) > self.cache_size: self.shorten_cache()
+ return source_rorp, dest_rorp
+
+ def pre_process(self, source_rorp, dest_rorp):
+ """Do initial processing on source_rorp and dest_rorp
+
+ It will not be clear whether source_rorp and dest_rorp have
+ errors at this point, so don't do anything which assumes they
+ will be backed up correctly.
+
+ """
+ if source_rorp: Hardlink.add_rorp(source_rorp, source = 1)
+ if dest_rorp: Hardlink.add_rorp(dest_rorp, source = 0)
+
+ def shorten_cache(self):
+ """Remove one element from cache, possibly adding it to metadata"""
+ first_index = self.cache_indicies[0]
+ del self.cache_indicies[0]
+ old_source_rorp, old_dest_rorp, changed_flag, success_flag = \
+ self.cache_dict[first_index]
+ del self.cache_dict[first_index]
+ self.post_process(old_source_rorp, old_dest_rorp,
+ changed_flag, success_flag)
+
+ def post_process(self, source_rorp, dest_rorp, changed, success):
+ """Post process source_rorp and dest_rorp.
+
+ changed will be true if the files have changed. success will
+ be true if the files have been successfully updated (this is
+ always false for un-changed files).
+
+ """
+ if not changed or success:
+ self.statfileobj.add_source_file(source_rorp)
+ self.statfileobj.add_dest_file(dest_rorp)
+ if success:
+ self.statfileobj.add_changed(source_rorp, dest_rorp)
+ metadata_rorp = source_rorp
+ else:
+ metadata_rorp = dest_rorp
+ if changed: self.statfileobj.add_error()
+ if metadata_rorp and metadata_rorp.lstat():
+ metadata.WriteMetadata(metadata_rorp)
+
+ def flag_success(self, index):
+ """Signal that the file with given index was updated successfully"""
+ self.cache_dict[index][3] = 1
+
+ def flag_changed(self, index):
+ """Signal that the file with given index has changed"""
+ self.cache_dict[index][2] = 1
+
+ def get_rorps(self, index):
+ """Retrieve (source_rorp, dest_rorp) from cache"""
+ return self.cache_dict[index][:2]
+
+ def get_source_rorp(self, index):
+ """Retrieve source_rorp with given index from cache"""
+ return self.cache_dict[index][0]
+
+ def close(self):
+ """Process the remaining elements in the cache"""
+ while self.cache_indicies: self.shorten_cache()
+ metadata.CloseMetadata()
+ statistics.write_active_statfileobj()
+
+
class PatchITRB(rorpiter.ITRBranch):
"""Patch an rpath with the given diff iters (use with IterTreeReducer)
@@ -195,7 +294,7 @@ class PatchITRB(rorpiter.ITRBranch):
contents.
"""
- def __init__(self, basis_root_rp):
+ def __init__(self, basis_root_rp, rorp_cache):
"""Set basis_root_rp, the base of the tree to be incremented"""
self.basis_root_rp = basis_root_rp
assert basis_root_rp.conn is Globals.local_connection
@@ -267,7 +366,8 @@ class PatchITRB(rorpiter.ITRBranch):
else:
assert self.dir_replacement
self.base_rp.rmdir()
- rpath.rename(self.dir_replacement, self.base_rp)
+ if self.dir_replacement.lstat():
+ rpath.rename(self.dir_replacement, self.base_rp)
class IncrementITRB(PatchITRB):
@@ -276,10 +376,10 @@ class IncrementITRB(PatchITRB):
Like PatchITRB, but this time also write increments.
"""
- def __init__(self, basis_root_rp, inc_root_rp):
+ def __init__(self, basis_root_rp, inc_root_rp, rorp_cache):
self.inc_root_rp = inc_root_rp
self.cached_incrp = None
- PatchITRB.__init__(self, basis_root_rp)
+ PatchITRB.__init__(self, basis_root_rp, rorp_cache)
def get_incrp(self, index):
"""Return inc RPath by adding index to self.basis_root_rp"""