From 85792f3b028aebac6e2681a0ce5ab60f9d91f1ed Mon Sep 17 00:00:00 2001 From: ben Date: Wed, 29 May 2002 07:09:49 +0000 Subject: Reexamined robust writing and statistics git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@110 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/rdiff_backup/header.py | 2 +- rdiff-backup/rdiff_backup/highlevel.py | 23 ++-- rdiff-backup/rdiff_backup/increment.py | 110 +++++++++++-------- rdiff-backup/rdiff_backup/log.py | 1 - rdiff-backup/rdiff_backup/robust.py | 186 +++++++++++++++++++------------- rdiff-backup/rdiff_backup/rorpiter.py | 8 +- rdiff-backup/rdiff_backup/statistics.py | 182 +++++++++++++++++++++++++++---- rdiff-backup/src/globals.py | 2 +- rdiff-backup/src/header.py | 2 +- rdiff-backup/src/highlevel.py | 23 ++-- rdiff-backup/src/increment.py | 110 +++++++++++-------- rdiff-backup/src/log.py | 1 - rdiff-backup/src/main.py | 3 +- rdiff-backup/src/rdiff.py | 34 +++--- rdiff-backup/src/robust.py | 186 +++++++++++++++++++------------- rdiff-backup/src/rorpiter.py | 8 +- rdiff-backup/src/statistics.py | 182 +++++++++++++++++++++++++++---- rdiff-backup/src/ttime.py | 4 +- 18 files changed, 736 insertions(+), 331 deletions(-) diff --git a/rdiff-backup/rdiff_backup/header.py b/rdiff-backup/rdiff_backup/header.py index 9ca40d2..00c801f 100644 --- a/rdiff-backup/rdiff_backup/header.py +++ b/rdiff-backup/rdiff_backup/header.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # # rdiff-backup -- Mirror files while keeping incremental changes -# Version 0.7.5.1 released May 25, 2002 +# Version 0.7.5.3 released May 25, 2002 # Copyright (C) 2001, 2002 Ben Escoto # # This program is licensed under the GNU General Public License (GPL). diff --git a/rdiff-backup/rdiff_backup/highlevel.py b/rdiff-backup/rdiff_backup/highlevel.py index a9e5ce2..bd34746 100644 --- a/rdiff-backup/rdiff_backup/highlevel.py +++ b/rdiff-backup/rdiff_backup/highlevel.py @@ -240,6 +240,7 @@ class HLDestinationStruct: """Apply diffs and finalize, with checkpointing and statistics""" collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) finalizer, ITR = cls.get_finalizer(), cls.get_MirrorITR(inc_rpath) + Stats.open_dir_stats_file() dsrp = None def error_checked(): @@ -262,13 +263,15 @@ class HLDestinationStruct: cls.check_skip_error(finalizer.Finish, dsrp) except: cls.handle_last_error(dsrp, finalizer, ITR) if Globals.preserve_hardlinks: Hardlink.final_writedata() - cls.write_statistics(ITR) + Stats.close_dir_stats_file() + Stats.write_session_statistics(ITR) SaveState.checkpoint_remove() def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath): """Apply diffs, write increment if necessary, and finalize""" collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath) + Stats.open_dir_stats_file() dsrp = None def error_checked(): @@ -292,7 +295,8 @@ class HLDestinationStruct: cls.check_skip_error(finalizer.Finish, dsrp) except: cls.handle_last_error(dsrp, finalizer, ITR) if Globals.preserve_hardlinks: Hardlink.final_writedata() - cls.write_statistics(ITR) + Stats.close_dir_stats_file() + Stats.write_session_statistics(ITR) SaveState.checkpoint_remove() def check_skip_error(cls, thunk, dsrp): @@ -323,19 +327,4 @@ class HLDestinationStruct: SaveState.touch_last_file_definitive() raise - def write_statistics(cls, ITR): - """Write session statistics to file, log""" - stat_inc = Inc.get_inc(Globals.rbdir.append("session_statistics"), - Time.curtime, "data") - ITR.StartTime = Time.curtime - ITR.EndTime = time.time() - if Globals.preserve_hardlinks and Hardlink.final_inc: - # include hardlink data in size of increments - ITR.IncrementFileSize += Hardlink.final_inc.getsize() - ITR.write_stats_to_rp(stat_inc) - if Globals.print_statistics: - message = ITR.get_stats_logstring("Session statistics") - Log.log_to_file(message) - Globals.client_conn.sys.stdout.write(message) - MakeClass(HLDestinationStruct) diff --git a/rdiff-backup/rdiff_backup/increment.py b/rdiff-backup/rdiff_backup/increment.py index 18b137f..2456b28 100644 --- a/rdiff-backup/rdiff_backup/increment.py +++ b/rdiff-backup/rdiff_backup/increment.py @@ -22,7 +22,11 @@ class Inc: mirror is the mirrored file from the last backup, incpref is the prefix of the increment file. - This function basically moves mirror -> incpref. + This function basically moves the information about the mirror + file to incpref. + + The returned RobustAction when executed should return the name + of the incfile, or None if none was created. """ if not (new and new.lstat() or mirror.lstat()): @@ -39,13 +43,15 @@ class Inc: else: return Inc.makesnapshot_action(mirror, incpref) def Increment(new, mirror, incpref): - Inc.Increment_action(new, mirror, incpref).execute() + return Inc.Increment_action(new, mirror, incpref).execute() def makemissing_action(incpref): """Signify that mirror file was missing""" - return RobustAction(lambda: None, - Inc.get_inc_ext(incpref, "missing").touch, - lambda exp: None) + def final(init_val): + incrp = Inc.get_inc_ext(incpref, "missing") + incrp.touch() + return incrp + return RobustAction(None, final, None) def makesnapshot_action(mirror, incpref): """Copy mirror to incfile, since new is quite different""" @@ -62,22 +68,29 @@ class Inc: if (Globals.compression and not Globals.no_compression_regexp.match(mirror.path)): diff = Inc.get_inc_ext(incpref, "diff.gz") - return Robust.chain([Rdiff.write_delta_action(new, mirror, - diff, 1), - Robust.copy_attribs_action(mirror, diff)]) + compress = 1 else: diff = Inc.get_inc_ext(incpref, "diff") - return Robust.chain([Rdiff.write_delta_action(new, mirror, - diff, None), - Robust.copy_attribs_action(mirror, diff)]) + compress = None + + diff_tf = TempFileManager.new(diff) + sig_tf = TempFileManager.new(mirror, None) + def init(): + Rdiff.write_delta(new, mirror, diff_tf, compress, sig_tf) + RPath.copy_attribs(mirror, diff_tf) + return diff + return Robust.make_tf_robustaction(init, (diff_tf, sig_tf), + (diff, None)) def makedir_action(mirrordir, incpref): """Make file indicating directory mirrordir has changed""" dirsign = Inc.get_inc_ext(incpref, "dir") - def final(): - dirsign.touch() - RPath.copy_attribs(mirrordir, dirsign) - return RobustAction(lambda: None, final, dirsign.delete) + tf = TempFileManager.new(dirsign) + def init(): + tf.touch() + RPath.copy_attribs(mirrordir, tf) + return dirsign + return Robust.make_tf_robustaction(init, tf, dirsign) def get_inc(rp, time, typestr): """Return increment like rp but with time and typestr suffixes""" @@ -127,8 +140,15 @@ class IncrementITR(StatsITR): Remember this object needs to be pickable. """ - mirror_isdirectory, directory_replacement = None, None + # Iff true, mirror file was a directory + mirror_isdirectory = None + # If set, what the directory on the mirror side will be replaced with + directory_replacement = None + # True iff there has been some change at this level or lower (used + # for marking directories to be flagged) changed = None + # Holds the RPath of the created increment file, if any + incrp = None def __init__(self, inc_rpath): """Set inc_rpath, an rpath of the base of the tree""" @@ -184,26 +204,34 @@ class IncrementITR(StatsITR): if not (incpref.lstat() and incpref.isdir()): incpref.mkdir() if diff_rorp and diff_rorp.isreg() and diff_rorp.file: tf = TempFileManager.new(dsrp) - RPathStatic.copy_with_attribs(diff_rorp, tf) - tf.set_attached_filetype(diff_rorp.get_attached_filetype()) + def init(): + RPathStatic.copy_with_attribs(diff_rorp, tf) + tf.set_attached_filetype(diff_rorp.get_attached_filetype()) + def error(exc, ran_init, init_val): tf.delete() + RobustAction(init, None, error).execute() self.directory_replacement = tf def init_non_dir(self, dsrp, diff_rorp, incpref): """Process a non directory file (initial pass)""" if not diff_rorp: return # no diff, so no change necessary if diff_rorp.isreg() and (dsrp.isreg() or diff_rorp.isflaglinked()): - tf = TempFileManager.new(dsrp) + # Write updated mirror to temp file so we can compute + # reverse diff locally + mirror_tf = TempFileManager.new(dsrp) def init_thunk(): if diff_rorp.isflaglinked(): - Hardlink.link_rp(diff_rorp, tf, dsrp) + Hardlink.link_rp(diff_rorp, mirror_tf, dsrp) else: Rdiff.patch_with_attribs_action(dsrp, diff_rorp, - tf).execute() - Inc.Increment_action(tf, dsrp, incpref).execute() - Robust.make_tf_robustaction(init_thunk, (tf,), (dsrp,)).execute() - else: - Robust.chain([Inc.Increment_action(diff_rorp, dsrp, incpref), - RORPIter.patchonce_action(None, dsrp, diff_rorp)] - ).execute() + mirror_tf).execute() + self.incrp = Inc.Increment_action(mirror_tf, dsrp, + incpref).execute() + def final(init_val): mirror_tf.rename(dsrp) + def error(exc, ran_init, init_val): mirror_tf.delete() + RobustAction(init_thunk, final, error).execute() + else: self.incrp = Robust.chain( + Inc.Increment_action(diff_rorp, dsrp, incpref), + RORPIter.patchonce_action(None, dsrp, diff_rorp)).execute()[0] + self.changed = 1 def end_process(self): @@ -217,19 +245,18 @@ class IncrementITR(StatsITR): if self.directory_replacement: tf = self.directory_replacement - Inc.Increment(tf, dsrp, incpref) - RORPIter.patchonce_action(None, dsrp, tf).execute() + self.incrp = Robust.chain( + Inc.Increment_action(tf, dsrp, incpref), + RORPIter.patchonce_action(None, dsrp, tf)).execute()[0] tf.delete() else: - Inc.Increment(diff_rorp, dsrp, incpref) + self.incrp = Inc.Increment(diff_rorp, dsrp, incpref) if diff_rorp: RORPIter.patchonce_action(None, dsrp, diff_rorp).execute() - self.end_stats(diff_rorp, dsrp, Inc._inc_file) - if self.incpref.isdir() and (self.mirror_isdirectory or dsrp.isdir()): - self.write_stats_to_rp(Inc.get_inc( - self.incpref.append("directory_statistics"), - Time.curtime, "data")) + self.end_stats(diff_rorp, dsrp, self.incrp) + if self.mirror_isdirectory or dsrp.isdir(): + Stats.write_dir_stats_line(self, dsrp.index) def branch_process(self, subinstance): """Update statistics, and the has_changed flag if change in branch""" @@ -239,6 +266,8 @@ class IncrementITR(StatsITR): class MirrorITR(StatsITR): """Like IncrementITR, but only patch mirror directory, don't increment""" + # This is always None since no increments will be created + incrp = None def __init__(self, inc_rpath): """Set inc_rpath, an rpath of the base of the inc tree""" self.inc_rpath = inc_rpath @@ -251,9 +280,6 @@ class MirrorITR(StatsITR): RORPIter.patchonce_action(None, mirror_dsrp, diff_rorp).execute() self.incpref = self.inc_rpath.new_index(index) - if mirror_dsrp.isdir() and not self.incpref.lstat(): - self.incpref.mkdir() # holds the statistics files - self.diff_rorp, self.mirror_dsrp = diff_rorp, mirror_dsrp def end_process(self): @@ -262,11 +288,9 @@ class MirrorITR(StatsITR): except AttributeError: # Some error above prevented these being set return - self.end_stats(self.diff_rorp, self.mirror_dsrp) - if self.incpref.isdir(): - self.write_stats_to_rp(Inc.get_inc( - self.incpref.append("directory_statistics"), - Time.curtime, "data")) + self.end_stats(diff_rorp, mirror_dsrp) + if mirror_dsrp.isdir(): + Stats.write_dir_stats_line(self, mirror_dsrp.index) def branch_process(self, subinstance): """Update statistics with subdirectory results""" diff --git a/rdiff-backup/rdiff_backup/log.py b/rdiff-backup/rdiff_backup/log.py index 60bf75b..f7e4a89 100644 --- a/rdiff-backup/rdiff_backup/log.py +++ b/rdiff-backup/rdiff_backup/log.py @@ -139,6 +139,5 @@ class Logger: logging_func("Exception %s raised of class %s" % (exc_info[1], exc_info[0]), verbosity) logging_func("".join(traceback.format_tb(exc_info[2])), verbosity+1) - Log = Logger() diff --git a/rdiff-backup/rdiff_backup/robust.py b/rdiff-backup/rdiff_backup/robust.py index 3795bd1..74e0d12 100644 --- a/rdiff-backup/rdiff_backup/robust.py +++ b/rdiff-backup/rdiff_backup/robust.py @@ -33,69 +33,98 @@ execfile("hardlink.py") class RobustAction: """Represents a file operation to be accomplished later""" - def __init__(self, init_thunk, final_thunk, error_thunk): + def __init__(self, init_thunk, final_func, error_handler): """RobustAction initializer All the thunks are functions whose return value will be ignored. init_thunk should not make any irreversible changes - but prepare for the writing of the important data. final_thunk + but prepare for the writing of the important data. final_func should be as short as possible and do the real work. - error_thunk is run if there is an error in init_thunk or - final_thunk. Errors in init_thunk should be corrected by - error_thunk as if nothing had been run in the first place. - The functions take no arguments except for error_thunk, which - receives the exception as its only argument. + error_handler is run if there is an error in init_thunk or + final_func. Errors in init_thunk should be corrected by + error_handler as if nothing had been run in the first place. + + init_thunk takes no arguments. + + final_thunk takes the return value of init_thunk as its + argument, and its return value is returned by execute(). + + error_handler takes three arguments: the exception, a value + which is true just in case self.init_thunk ran correctly, and + a value which will be the return value of init_thunk if it ran + correctly. """ - self.init_thunk = init_thunk - self.final_thunk = final_thunk - self.error_thunk = error_thunk + self.init_thunk = init_thunk or self.default_init_thunk + self.final_func = final_func or self.default_final_func + self.error_handler = error_handler or self.default_error_handler def execute(self): """Actually run the operation""" + ran_init_thunk = None try: - self.init_thunk() - self.final_thunk() + init_val = self.init_thunk() + ran_init_thunk = 1 + return self.final_func(init_val) except Exception, exc: # Catch all errors Log.exception() - self.error_thunk(exc) + if ran_init_thunk: self.error_handler(exc, 1, init_val) + else: self.error_handler(exc, None, None) raise exc + def default_init_thunk(self): return None + def default_final_func(self, init_val): return init_val + def default_error_handler(self, exc, ran_init, init_val): pass + class Robust: """Contains various file operations made safer using tempfiles""" - null_action = RobustAction(lambda: None, lambda: None, lambda e: None) - def chain(robust_action_list): + null_action = RobustAction(None, None, None) + def chain(*robust_action_list): """Return chain tying together a number of robust actions The whole chain will be aborted if some error occurs in initialization stage of any of the component actions. """ - ras_with_completed_inits = [] + ras_with_started_inits, init_return_vals = [], [] def init(): for ra in robust_action_list: - ras_with_completed_inits.append(ra) - ra.init_thunk() - def final(): - for ra in robust_action_list: ra.final_thunk() - def error(exc): - for ra in ras_with_completed_inits: ra.error_thunk(exc) + ras_with_started_inits.append(ra) + init_return_vals.append(ra.init_thunk()) + return init_return_vals + def final(init_return_vals): + final_vals = [] + for ra, init_val in zip(robust_action_list, init_return_vals): + final_vals.append(ra.final_func(init_val)) + return final_vals + def error(exc, ran_init, init_val): + for ra, init_val in zip(ras_with_started_inits, init_return_vals): + ra.error_handler(exc, 1, init_val) + for ra in ras_with_started_inits[len(init_return_vals):]: + ra.error_handler(exc, None, None) return RobustAction(init, final, error) - def chain_nested(robust_action_list): + def chain_nested(*robust_action_list): """Like chain but final actions performed in reverse order""" - ras_with_completed_inits = [] + ras_with_started_inits, init_vals = [], [] def init(): for ra in robust_action_list: - ras_with_completed_inits.append(ra) - ra.init_thunk() - def final(): - ralist_copy = robust_action_list[:] - ralist_copy.reverse() - for ra in ralist_copy: ra.final_thunk() - def error(exc): - for ra in ras_with_completed_inits: ra.error_thunk(exc) + ras_with_started_inits.append(ra) + init_vals.append(ra.init_thunk()) + return init_vals + def final(init_vals): + ras_and_inits = zip(robust_action_list, init_vals) + ras_and_inits.reverse() + final_vals = [] + for ra, init_val in ras_and_inits: + final_vals.append(ra.final_func(init_val)) + return final_vals + def error(exc, ran_init, init_val): + for ra, init_val in zip(ras_with_started_inits, init_return_vals): + ra.error_handler(exc, 1, init_val) + for ra in ras_with_started_inits[len(init_return_vals):]: + ra.error_handler(exc, None, None) return RobustAction(init, final, error) def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): @@ -107,18 +136,19 @@ class Robust: create RobustActions of that type. """ - assert type(tempfiles) is types.TupleType, tempfiles - if final_renames is None: final = lambda: None - else: - assert len(tempfiles) == len(final_renames) - def final(): # rename tempfiles to final positions - for i in range(len(tempfiles)): - final_name = final_renames[i] - if final_name: - if final_name.isdir(): # Cannot rename over directory - final_name.delete() - tempfiles[i].rename(final_name) - def error(exc): + if isinstance(tempfiles, TempFile): tempfiles = (tempfiles,) + if isinstance(final_renames, RPath): final_renames = (final_renames,) + if final_renames is None: final_renames = [None] * len(tempfiles) + assert len(tempfiles) == len(final_renames) + + def final(init_val): # rename tempfiles to final positions + for tempfile, destination in zip(tempfiles, final_renames): + if destination: + if destination.isdir(): # Cannot rename over directory + destination.delete() + tempfile.rename(destination) + return init_val + def error(exc, ran_init, init_val): for tf in tempfiles: tf.delete() return RobustAction(init_thunk, final, error) @@ -130,36 +160,46 @@ class Robust: overwritten). """ - tfl = [None] # Need mutable object that init and final can access + tfl = [None] # Need some mutable state to hold tf value def init(): if not (rorpin.isdir() and rpout.isdir()): # already a dir - tfl[0] = TempFileManager.new(rpout) - if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb")) + tfl[0] = tf = TempFileManager.new(rpout) + if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb")) else: RPath.copy(rorpin, tf) - def final(): - if tfl[0] and tfl[0].lstat(): + return tf + else: return None + def final(tf): + if tf and tf.lstat(): if rpout.isdir(): rpout.delete() - tfl[0].rename(rpout) - return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) + tf.rename(rpout) + return rpout + def error(exc, ran_init, init_val): + if tfl[0]: tfl[0].delete() + return RobustAction(init, final, error) def copy_with_attribs_action(rorpin, rpout, compress = None): """Like copy_action but also copy attributes""" - tfl = [None] # Need mutable object that init and final can access + tfl = [None] # Need some mutable state for error handler def init(): if not (rorpin.isdir() and rpout.isdir()): # already a dir - tfl[0] = TempFileManager.new(rpout) + tfl[0] = tf = TempFileManager.new(rpout) if rorpin.isreg(): - tfl[0].write_from_fileobj(rorpin.open("rb"), compress) - else: RPath.copy(rorpin, tfl[0]) - if tfl[0].lstat(): # Some files, like sockets, won't be created - RPathStatic.copy_attribs(rorpin, tfl[0]) - def final(): + tf.write_from_fileobj(rorpin.open("rb"), compress) + else: RPath.copy(rorpin, tf) + if tf.lstat(): # Some files, like sockets, won't be created + RPathStatic.copy_attribs(rorpin, tf) + return tf + else: return None + def final(tf): if rorpin.isdir() and rpout.isdir(): RPath.copy_attribs(rorpin, rpout) - elif tfl[0] and tfl[0].lstat(): - if rpout.isdir(): rpout.delete() - tfl[0].rename(rpout) - return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) + elif tf and tf.lstat(): + if rpout.isdir(): rpout.delete() # can't rename over dir + tf.rename(rpout) + return rpout + def error(exc, ran_init, init_val): + if tfl[0]: tfl[0].delete() + return RobustAction(init, final, error) def copy_attribs_action(rorpin, rpout): """Return action which just copies attributes @@ -168,14 +208,16 @@ class Robust: normal sequence. """ - def final(): RPath.copy_attribs(rorpin, rpout) - return RobustAction(lambda: None, final, lambda e: None) + def final(init_val): + RPath.copy_attribs(rorpin, rpout) + return rpout + return RobustAction(None, final, None) def symlink_action(rpath, linktext): """Return symlink action by moving one file over another""" tf = TempFileManager.new(rpath) def init(): tf.symlink(linktext) - return Robust.make_tf_robustaction(init, (tf,), (rpath,)) + return Robust.make_tf_robustaction(init, tf, rpath) def destructive_write_action(rp, s): """Return action writing string s to rpath rp in robust way @@ -187,9 +229,9 @@ class Robust: def init(): fp = tf.open("wb") fp.write(s) - assert not fp.close() + fp.close() tf.setdata() - return Robust.make_tf_robustaction(init, (tf,), (rp,)) + return Robust.make_tf_robustaction(init, tf, rp) def check_common_error(init_thunk, error_thunk = lambda exc: None): """Execute init_thunk, if error, run error_thunk on exception @@ -357,8 +399,8 @@ class SaveState: symtext = apply(os.path.join, ('increments',) + last_file_rorp.index) return Robust.symlink_action(cls._last_file_sym, symtext) - else: return RobustAction(lambda: None, cls.touch_last_file, - lambda exc: None) + else: return RobustAction(None, lambda init_val: cls.touch_last_file(), + None) def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None): """Save states of tree reducer and finalizer during inc backup @@ -372,9 +414,9 @@ class SaveState: cls._last_checkpoint_time = time.time() Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) state_string = cPickle.dumps((ITR, finalizer)) - Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp, - state_string), - cls.record_last_file_action(last_file_rorp)]).execute() + Robust.chain(Robust.destructive_write_action(cls._checkpoint_rp, + state_string), + cls.record_last_file_action(last_file_rorp)).execute() def checkpoint_needed(cls): """Returns true if another checkpoint is called for""" diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py index fb30426..1ff0724 100644 --- a/rdiff-backup/rdiff_backup/rorpiter.py +++ b/rdiff-backup/rdiff_backup/rorpiter.py @@ -212,13 +212,13 @@ class RORPIter: """Return action patching basisrp using diff_rorp""" assert diff_rorp, "Missing diff index %s" % basisrp.index if not diff_rorp.lstat(): - return RobustAction(lambda: None, basisrp.delete, lambda e: None) + return RobustAction(None, lambda init_val: basisrp.delete(), None) if Globals.preserve_hardlinks and diff_rorp.isflaglinked(): if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) - return RobustAction(lambda: None, - lambda: Hardlink.link_rp(diff_rorp, basisrp), - lambda e: None) + tf = TempFileManager.new(basisrp) + def init(): Hardlink.link_rp(diff_rorp, tf, basisrp) + return Robust.make_tf_robustaction(init, tf, basisrp) elif basisrp and basisrp.isreg() and diff_rorp.isreg(): assert diff_rorp.get_attached_filetype() == 'diff' return Rdiff.patch_with_attribs_action(basisrp, diff_rorp) diff --git a/rdiff-backup/rdiff_backup/statistics.py b/rdiff-backup/rdiff_backup/statistics.py index 8269456..c18f34a 100644 --- a/rdiff-backup/rdiff_backup/statistics.py +++ b/rdiff-backup/rdiff_backup/statistics.py @@ -16,13 +16,29 @@ class StatsObj: 'DeletedFiles', 'DeletedFileSize', 'ChangedFiles', 'ChangedSourceSize', 'ChangedMirrorSize', - 'IncrementFileSize') + 'IncrementFiles', 'IncrementFileSize') stat_time_attrs = ('StartTime', 'EndTime', 'ElapsedTime') - stat_attrs = stat_time_attrs + stat_file_attrs + stat_attrs = ('Filename',) + stat_time_attrs + stat_file_attrs + + # Below, the second value in each pair is true iff the value + # indicates a number of bytes + stat_file_pairs = (('SourceFiles', None), ('SourceFileSize', 1), + ('MirrorFiles', None), ('MirrorFileSize', 1), + ('NewFiles', None), ('NewFileSize', 1), + ('DeletedFiles', None), ('DeletedFileSize', 1), + ('ChangedFiles', None), + ('ChangedSourceSize', 1), ('ChangedMirrorSize', 1), + ('IncrementFiles', None), ('IncrementFileSize', 1)) # Set all stats to None, indicating info not available for attr in stat_attrs: locals()[attr] = None + # This is used in get_byte_summary_string below + byte_abbrev_list = ((1024*1024*1024*1024, "TB"), + (1024*1024*1024, "GB"), + (1024*1024, "MB"), + (1024, "KB")) + def get_stat(self, attribute): """Get a statistic""" try: return self.__dict__[attribute] @@ -34,33 +50,89 @@ class StatsObj: """Set attribute to given value""" self.__dict__[attr] = value + def get_stats_line(self, index): + """Return one line abbreviated version of full stats string""" + file_attrs = map(lambda attr: str(self.get_stat(attr)), + self.stat_file_attrs) + if not index: filename = "." + else: + # use repr to quote newlines in relative filename, then + # take of leading and trailing quote. + filename = repr(apply(os.path.join, index))[1:-1] + return " ".join([filename,] + file_attrs) + + def set_stats_from_line(self, line): + """Set statistics from given line""" + def error(): raise StatsException("Bad line '%s'" % line) + if line[-1] == "\n": line = line[:-1] + lineparts = line.split(" ") + if len(lineparts) < len(stat_file_attrs): error() + for attr, val_string in zip(stat_file_attrs, + lineparts[-len(stat_file_attrs):]): + try: val = long(val_string) + except ValueError: + try: val = float(val_string) + except ValueError: error() + self.set_stat(attr, val) + return self + def get_stats_string(self): - """Return string printing out statistics""" + """Return extended string printing out statistics""" + return self.get_timestats_string() + self.get_filestats_string() + + def get_timestats_string(self): + """Return portion of statistics string dealing with time""" timelist = [] if self.StartTime is not None: - timelist.append("StartTime %s (%s)\n" % + timelist.append("StartTime %.2f (%s)\n" % (self.StartTime, Time.timetopretty(self.StartTime))) if self.EndTime is not None: - timelist.append("EndTime %s (%s)\n" % + timelist.append("EndTime %.2f (%s)\n" % (self.EndTime, Time.timetopretty(self.EndTime))) - if self.StartTime is not None and self.EndTime is not None: + if self.ElapsedTime or (self.StartTime is not None and + self.EndTime is not None): if self.ElapsedTime is None: self.ElapsedTime = self.EndTime - self.StartTime - timelist.append("ElapsedTime %s (%s)\n" % + timelist.append("ElapsedTime %.2f (%s)\n" % (self.ElapsedTime, Time.inttopretty(self.ElapsedTime))) + return "".join(timelist) + + def get_filestats_string(self): + """Return portion of statistics string about files and bytes""" + def fileline(stat_file_pair): + """Return zero or one line of the string""" + attr, in_bytes = stat_file_pair + val = self.get_stat(attr) + if val is None: return "" + if in_bytes: + return "%s %s (%s)\n" % (attr, val, + self.get_byte_summary_string(val)) + else: return "%s %s\n" % (attr, val) + + return "".join(map(fileline, self.stat_file_pairs)) - filelist = ["%s %s\n" % (attr, self.get_stat(attr)) - for attr in self.stat_file_attrs - if self.get_stat(attr) is not None] - return "".join(timelist + filelist) + def get_byte_summary_string(self, byte_count): + """Turn byte count into human readable string like "7.23GB" """ + for abbrev_bytes, abbrev_string in self.byte_abbrev_list: + if byte_count >= abbrev_bytes: + # Now get 3 significant figures + abbrev_count = float(byte_count)/abbrev_bytes + if abbrev_count >= 100: precision = 0 + elif abbrev_count >= 10: precision = 1 + else: precision = 2 + return "%%.%df %s" % (precision, abbrev_string) \ + % (abbrev_count,) + byte_count = round(byte_count) + if byte_count == 1: return "1 byte" + else: return "%d bytes" % (byte_count,) def get_stats_logstring(self, title): """Like get_stats_string, but add header and footer""" - header = "-------------[ %s ]-------------" % title + header = "--------------[ %s ]--------------" % title footer = "-" * len(header) return "%s\n%s%s\n" % (header, self.get_stats_string(), footer) - def init_stats_from_string(self, s): + def set_stats_from_string(self, s): """Initialize attributes from string, return self for convenience""" def error(line): raise StatsException("Bad line '%s'" % line) @@ -91,7 +163,7 @@ class StatsObj: def read_stats_from_rp(self, rp): """Set statistics from rpath, return self for convenience""" fp = rp.open("r") - self.init_stats_from_string(fp.read()) + self.set_stats_from_string(fp.read()) fp.close() return self @@ -162,22 +234,96 @@ class StatsITR(IterTreeReducer, StatsObj): self.ChangedFiles += 1 self.ChangedSourceSize += mirror_dsrp.getsize() self.ChangedMirrorSize += self.mirror_base_size - self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 + if inc_rp: + self.IncrementFiles += 1 + self.IncrementFileSize += inc_rp.getsize() else: # new file was created self.NewFiles += 1 self.NewFileSize += mirror_dsrp.getsize() - self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 + if inc_rp: + self.IncrementFiles += 1 + self.IncrementFileSize += inc_rp.getsize() else: if self.mirror_base_exists: # file was deleted from mirror self.MirrorFiles += 1 self.MirrorFileSize += self.mirror_base_size self.DeletedFiles += 1 self.DeletedFileSize += self.mirror_base_size - self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 - + if inc_rp: + self.IncrementFiles += 1 + self.IncrementFileSize += inc_rp.getsize() def add_file_stats(self, subinstance): """Add all file statistics from subinstance to current totals""" for attr in self.stat_file_attrs: self.set_stat(attr, self.get_stat(attr) + subinstance.get_stat(attr)) + + +class Stats: + """Misc statistics methods, pertaining to dir and session stat files""" + # This is the RPath of the directory statistics file, and the + # associated open file. It will hold a line of statistics for + # each directory that is backed up. + _dir_stats_rp = None + _dir_stats_fp = None + + # This goes at the beginning of the directory statistics file and + # explains the format. + _dir_stats_header = """# rdiff-backup directory statistics file +# +# Each line is in the following format: +# RelativeDirName %s +""" % " ".join(StatsObj.stat_file_attrs) + + def open_dir_stats_file(cls): + """Open directory statistics file, write header""" + assert not cls._dir_stats_fp, "Directory file already open" + + if Globals.compression: suffix = "data.gz" + else: suffix = "data" + cls._dir_stats_rp = Inc.get_inc(Globals.rbdir.append( + "directory_statistics"), Time.curtime, suffix) + + if cls._dir_stats_rp.lstat(): + Log("Warning, statistics file %s already exists, appending", 2) + cls._dir_stats_fp = cls._dir_stats_rp.open("ab", + Globals.compression) + else: cls._dir_stats_fp = \ + cls._dir_stats_rp.open("wb", Globals.compression) + cls._dir_stats_fp.write(cls._dir_stats_header) + + def write_dir_stats_line(cls, statobj, index): + """Write info from statobj about rpath to statistics file""" + cls._dir_stats_fp.write(statobj.get_stats_line(index) +"\n") + + def close_dir_stats_file(cls): + """Close directory statistics file if its open""" + if cls._dir_stats_fp: + cls._dir_stats_fp.close() + cls._dir_stats_fp = None + + def write_session_statistics(cls, statobj): + """Write session statistics into file, log""" + stat_inc = Inc.get_inc(Globals.rbdir.append("session_statistics"), + Time.curtime, "data") + statobj.StartTime = Time.curtime + statobj.EndTime = time.time() + + # include hardlink data and dir stats in size of increments + if Globals.preserve_hardlinks and Hardlink.final_inc: + # include hardlink data in size of increments + statobj.IncrementFiles += 1 + statobj.IncrementFileSize += Hardlink.final_inc.getsize() + if cls._dir_stats_rp and cls._dir_stats_rp.lstat(): + statobj.IncrementFiles += 1 + statobj.IncrementFileSize += cls._dir_stats_rp.getsize() + + statobj.write_stats_to_rp(stat_inc) + if Globals.print_statistics: + message = statobj.get_stats_logstring("Session statistics") + Log.log_to_file(message) + Globals.client_conn.sys.stdout.write(message) + +MakeClass(Stats) + diff --git a/rdiff-backup/src/globals.py b/rdiff-backup/src/globals.py index 2a042a8..d4c9471 100644 --- a/rdiff-backup/src/globals.py +++ b/rdiff-backup/src/globals.py @@ -8,7 +8,7 @@ import re, os class Globals: # The current version of rdiff-backup - version = "0.7.5.1" + version = "0.7.5.3" # If this is set, use this value in seconds as the current time # instead of reading it from the clock. diff --git a/rdiff-backup/src/header.py b/rdiff-backup/src/header.py index 9ca40d2..00c801f 100644 --- a/rdiff-backup/src/header.py +++ b/rdiff-backup/src/header.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # # rdiff-backup -- Mirror files while keeping incremental changes -# Version 0.7.5.1 released May 25, 2002 +# Version 0.7.5.3 released May 25, 2002 # Copyright (C) 2001, 2002 Ben Escoto # # This program is licensed under the GNU General Public License (GPL). diff --git a/rdiff-backup/src/highlevel.py b/rdiff-backup/src/highlevel.py index a9e5ce2..bd34746 100644 --- a/rdiff-backup/src/highlevel.py +++ b/rdiff-backup/src/highlevel.py @@ -240,6 +240,7 @@ class HLDestinationStruct: """Apply diffs and finalize, with checkpointing and statistics""" collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) finalizer, ITR = cls.get_finalizer(), cls.get_MirrorITR(inc_rpath) + Stats.open_dir_stats_file() dsrp = None def error_checked(): @@ -262,13 +263,15 @@ class HLDestinationStruct: cls.check_skip_error(finalizer.Finish, dsrp) except: cls.handle_last_error(dsrp, finalizer, ITR) if Globals.preserve_hardlinks: Hardlink.final_writedata() - cls.write_statistics(ITR) + Stats.close_dir_stats_file() + Stats.write_session_statistics(ITR) SaveState.checkpoint_remove() def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath): """Apply diffs, write increment if necessary, and finalize""" collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath) + Stats.open_dir_stats_file() dsrp = None def error_checked(): @@ -292,7 +295,8 @@ class HLDestinationStruct: cls.check_skip_error(finalizer.Finish, dsrp) except: cls.handle_last_error(dsrp, finalizer, ITR) if Globals.preserve_hardlinks: Hardlink.final_writedata() - cls.write_statistics(ITR) + Stats.close_dir_stats_file() + Stats.write_session_statistics(ITR) SaveState.checkpoint_remove() def check_skip_error(cls, thunk, dsrp): @@ -323,19 +327,4 @@ class HLDestinationStruct: SaveState.touch_last_file_definitive() raise - def write_statistics(cls, ITR): - """Write session statistics to file, log""" - stat_inc = Inc.get_inc(Globals.rbdir.append("session_statistics"), - Time.curtime, "data") - ITR.StartTime = Time.curtime - ITR.EndTime = time.time() - if Globals.preserve_hardlinks and Hardlink.final_inc: - # include hardlink data in size of increments - ITR.IncrementFileSize += Hardlink.final_inc.getsize() - ITR.write_stats_to_rp(stat_inc) - if Globals.print_statistics: - message = ITR.get_stats_logstring("Session statistics") - Log.log_to_file(message) - Globals.client_conn.sys.stdout.write(message) - MakeClass(HLDestinationStruct) diff --git a/rdiff-backup/src/increment.py b/rdiff-backup/src/increment.py index 18b137f..2456b28 100644 --- a/rdiff-backup/src/increment.py +++ b/rdiff-backup/src/increment.py @@ -22,7 +22,11 @@ class Inc: mirror is the mirrored file from the last backup, incpref is the prefix of the increment file. - This function basically moves mirror -> incpref. + This function basically moves the information about the mirror + file to incpref. + + The returned RobustAction when executed should return the name + of the incfile, or None if none was created. """ if not (new and new.lstat() or mirror.lstat()): @@ -39,13 +43,15 @@ class Inc: else: return Inc.makesnapshot_action(mirror, incpref) def Increment(new, mirror, incpref): - Inc.Increment_action(new, mirror, incpref).execute() + return Inc.Increment_action(new, mirror, incpref).execute() def makemissing_action(incpref): """Signify that mirror file was missing""" - return RobustAction(lambda: None, - Inc.get_inc_ext(incpref, "missing").touch, - lambda exp: None) + def final(init_val): + incrp = Inc.get_inc_ext(incpref, "missing") + incrp.touch() + return incrp + return RobustAction(None, final, None) def makesnapshot_action(mirror, incpref): """Copy mirror to incfile, since new is quite different""" @@ -62,22 +68,29 @@ class Inc: if (Globals.compression and not Globals.no_compression_regexp.match(mirror.path)): diff = Inc.get_inc_ext(incpref, "diff.gz") - return Robust.chain([Rdiff.write_delta_action(new, mirror, - diff, 1), - Robust.copy_attribs_action(mirror, diff)]) + compress = 1 else: diff = Inc.get_inc_ext(incpref, "diff") - return Robust.chain([Rdiff.write_delta_action(new, mirror, - diff, None), - Robust.copy_attribs_action(mirror, diff)]) + compress = None + + diff_tf = TempFileManager.new(diff) + sig_tf = TempFileManager.new(mirror, None) + def init(): + Rdiff.write_delta(new, mirror, diff_tf, compress, sig_tf) + RPath.copy_attribs(mirror, diff_tf) + return diff + return Robust.make_tf_robustaction(init, (diff_tf, sig_tf), + (diff, None)) def makedir_action(mirrordir, incpref): """Make file indicating directory mirrordir has changed""" dirsign = Inc.get_inc_ext(incpref, "dir") - def final(): - dirsign.touch() - RPath.copy_attribs(mirrordir, dirsign) - return RobustAction(lambda: None, final, dirsign.delete) + tf = TempFileManager.new(dirsign) + def init(): + tf.touch() + RPath.copy_attribs(mirrordir, tf) + return dirsign + return Robust.make_tf_robustaction(init, tf, dirsign) def get_inc(rp, time, typestr): """Return increment like rp but with time and typestr suffixes""" @@ -127,8 +140,15 @@ class IncrementITR(StatsITR): Remember this object needs to be pickable. """ - mirror_isdirectory, directory_replacement = None, None + # Iff true, mirror file was a directory + mirror_isdirectory = None + # If set, what the directory on the mirror side will be replaced with + directory_replacement = None + # True iff there has been some change at this level or lower (used + # for marking directories to be flagged) changed = None + # Holds the RPath of the created increment file, if any + incrp = None def __init__(self, inc_rpath): """Set inc_rpath, an rpath of the base of the tree""" @@ -184,26 +204,34 @@ class IncrementITR(StatsITR): if not (incpref.lstat() and incpref.isdir()): incpref.mkdir() if diff_rorp and diff_rorp.isreg() and diff_rorp.file: tf = TempFileManager.new(dsrp) - RPathStatic.copy_with_attribs(diff_rorp, tf) - tf.set_attached_filetype(diff_rorp.get_attached_filetype()) + def init(): + RPathStatic.copy_with_attribs(diff_rorp, tf) + tf.set_attached_filetype(diff_rorp.get_attached_filetype()) + def error(exc, ran_init, init_val): tf.delete() + RobustAction(init, None, error).execute() self.directory_replacement = tf def init_non_dir(self, dsrp, diff_rorp, incpref): """Process a non directory file (initial pass)""" if not diff_rorp: return # no diff, so no change necessary if diff_rorp.isreg() and (dsrp.isreg() or diff_rorp.isflaglinked()): - tf = TempFileManager.new(dsrp) + # Write updated mirror to temp file so we can compute + # reverse diff locally + mirror_tf = TempFileManager.new(dsrp) def init_thunk(): if diff_rorp.isflaglinked(): - Hardlink.link_rp(diff_rorp, tf, dsrp) + Hardlink.link_rp(diff_rorp, mirror_tf, dsrp) else: Rdiff.patch_with_attribs_action(dsrp, diff_rorp, - tf).execute() - Inc.Increment_action(tf, dsrp, incpref).execute() - Robust.make_tf_robustaction(init_thunk, (tf,), (dsrp,)).execute() - else: - Robust.chain([Inc.Increment_action(diff_rorp, dsrp, incpref), - RORPIter.patchonce_action(None, dsrp, diff_rorp)] - ).execute() + mirror_tf).execute() + self.incrp = Inc.Increment_action(mirror_tf, dsrp, + incpref).execute() + def final(init_val): mirror_tf.rename(dsrp) + def error(exc, ran_init, init_val): mirror_tf.delete() + RobustAction(init_thunk, final, error).execute() + else: self.incrp = Robust.chain( + Inc.Increment_action(diff_rorp, dsrp, incpref), + RORPIter.patchonce_action(None, dsrp, diff_rorp)).execute()[0] + self.changed = 1 def end_process(self): @@ -217,19 +245,18 @@ class IncrementITR(StatsITR): if self.directory_replacement: tf = self.directory_replacement - Inc.Increment(tf, dsrp, incpref) - RORPIter.patchonce_action(None, dsrp, tf).execute() + self.incrp = Robust.chain( + Inc.Increment_action(tf, dsrp, incpref), + RORPIter.patchonce_action(None, dsrp, tf)).execute()[0] tf.delete() else: - Inc.Increment(diff_rorp, dsrp, incpref) + self.incrp = Inc.Increment(diff_rorp, dsrp, incpref) if diff_rorp: RORPIter.patchonce_action(None, dsrp, diff_rorp).execute() - self.end_stats(diff_rorp, dsrp, Inc._inc_file) - if self.incpref.isdir() and (self.mirror_isdirectory or dsrp.isdir()): - self.write_stats_to_rp(Inc.get_inc( - self.incpref.append("directory_statistics"), - Time.curtime, "data")) + self.end_stats(diff_rorp, dsrp, self.incrp) + if self.mirror_isdirectory or dsrp.isdir(): + Stats.write_dir_stats_line(self, dsrp.index) def branch_process(self, subinstance): """Update statistics, and the has_changed flag if change in branch""" @@ -239,6 +266,8 @@ class IncrementITR(StatsITR): class MirrorITR(StatsITR): """Like IncrementITR, but only patch mirror directory, don't increment""" + # This is always None since no increments will be created + incrp = None def __init__(self, inc_rpath): """Set inc_rpath, an rpath of the base of the inc tree""" self.inc_rpath = inc_rpath @@ -251,9 +280,6 @@ class MirrorITR(StatsITR): RORPIter.patchonce_action(None, mirror_dsrp, diff_rorp).execute() self.incpref = self.inc_rpath.new_index(index) - if mirror_dsrp.isdir() and not self.incpref.lstat(): - self.incpref.mkdir() # holds the statistics files - self.diff_rorp, self.mirror_dsrp = diff_rorp, mirror_dsrp def end_process(self): @@ -262,11 +288,9 @@ class MirrorITR(StatsITR): except AttributeError: # Some error above prevented these being set return - self.end_stats(self.diff_rorp, self.mirror_dsrp) - if self.incpref.isdir(): - self.write_stats_to_rp(Inc.get_inc( - self.incpref.append("directory_statistics"), - Time.curtime, "data")) + self.end_stats(diff_rorp, mirror_dsrp) + if mirror_dsrp.isdir(): + Stats.write_dir_stats_line(self, mirror_dsrp.index) def branch_process(self, subinstance): """Update statistics with subdirectory results""" diff --git a/rdiff-backup/src/log.py b/rdiff-backup/src/log.py index 60bf75b..f7e4a89 100644 --- a/rdiff-backup/src/log.py +++ b/rdiff-backup/src/log.py @@ -139,6 +139,5 @@ class Logger: logging_func("Exception %s raised of class %s" % (exc_info[1], exc_info[0]), verbosity) logging_func("".join(traceback.format_tb(exc_info[2])), verbosity+1) - Log = Logger() diff --git a/rdiff-backup/src/main.py b/rdiff-backup/src/main.py index c222fdc..6b1db51 100755 --- a/rdiff-backup/src/main.py +++ b/rdiff-backup/src/main.py @@ -146,7 +146,8 @@ class Main: self.action == "remove-older-than"): self.commandline_error("Only use one argument, " "the root of the backup directory") - if l > 2: self.commandline_error("Too many arguments given") + if l > 2 and self.action != "calculate-average": + self.commandline_error("Too many arguments given") def commandline_error(self, message): sys.stderr.write("Error: %s\n" % message) diff --git a/rdiff-backup/src/rdiff.py b/rdiff-backup/src/rdiff.py index 3b5ba95..e4552ce 100644 --- a/rdiff-backup/src/rdiff.py +++ b/rdiff-backup/src/rdiff.py @@ -24,11 +24,11 @@ class Rdiff: """Like get_delta but signature is in a file object""" sig_tf = TempFileManager.new(rp_new, None) sig_tf.write_from_fileobj(sig_fileobj) - rdiff_popen_obj = Rdiff.get_delta(sig_tf, rp_new) + rdiff_popen_obj = Rdiff.get_delta_sigrp(sig_tf, rp_new) rdiff_popen_obj.set_thunk(sig_tf.delete) return rdiff_popen_obj - def get_delta(rp_signature, rp_new): + def get_delta_sigrp(rp_signature, rp_new): """Take signature rp and new rp, return delta file object""" assert rp_signature.conn is rp_new.conn Log("Getting delta of %s with signature %s" % @@ -45,18 +45,18 @@ class Rdiff: """ sig_tf = TempFileManager.new(new, None) delta_tf = TempFileManager.new(delta) - def init(): - Log("Writing delta %s from %s -> %s" % - (basis.path, new.path, delta.path), 7) - sig_tf.write_from_fileobj(Rdiff.get_signature(basis)) - delta_tf.write_from_fileobj(Rdiff.get_delta(sig_tf, new), compress) - sig_tf.delete() + def init(): Rdiff.write_delta(basis, new, delta_tf, compress, sig_tf) return Robust.make_tf_robustaction(init, (sig_tf, delta_tf), (None, delta)) - def write_delta(basis, new, delta, compress = None): + def write_delta(basis, new, delta, compress = None, sig_tf = None): """Write rdiff delta which brings basis to new""" - Rdiff.write_delta_action(basis, new, delta, compress).execute() + Log("Writing delta %s from %s -> %s" % + (basis.path, new.path, delta.path), 7) + if not sig_tf: sig_tf = TempFileManager.new(new, None) + sig_tf.write_from_fileobj(Rdiff.get_signature(basis)) + delta.write_from_fileobj(Rdiff.get_delta_sigrp(sig_tf, new), compress) + sig_tf.delete() def patch_action(rp_basis, rp_delta, rp_out = None, out_tf = None, delta_compressed = None): @@ -106,18 +106,20 @@ class Rdiff: if not rp_out: rp_out = rp_basis delta_tf = TempFileManager.new(rp_out, None) def init(): delta_tf.write_from_fileobj(delta_fileobj) - return Robust.chain_nested([RobustAction(init, delta_tf.delete, - lambda exc: delta_tf.delete), - Rdiff.patch_action(rp_basis, delta_tf, - rp_out, out_tf)]) + def final(init_val): delta_tf.delete() + def error(exc, ran_init, init_val): delta_tf.delete() + write_delta_action = RobustAction(init, final, error) + return Robust.chain(write_delta_action, + Rdiff.patch_action(rp_basis, delta_tf, + rp_out, out_tf)) def patch_with_attribs_action(rp_basis, rp_delta, rp_out = None): """Like patch_action, but also transfers attributs from rp_delta""" if not rp_out: rp_out = rp_basis tf = TempFileManager.new(rp_out) return Robust.chain_nested( - [Rdiff.patch_action(rp_basis, rp_delta, rp_out, tf), - Robust.copy_attribs_action(rp_delta, tf)]) + Rdiff.patch_action(rp_basis, rp_delta, rp_out, tf), + Robust.copy_attribs_action(rp_delta, tf)) def copy_action(rpin, rpout): """Use rdiff to copy rpin to rpout, conserving bandwidth""" diff --git a/rdiff-backup/src/robust.py b/rdiff-backup/src/robust.py index 3795bd1..74e0d12 100644 --- a/rdiff-backup/src/robust.py +++ b/rdiff-backup/src/robust.py @@ -33,69 +33,98 @@ execfile("hardlink.py") class RobustAction: """Represents a file operation to be accomplished later""" - def __init__(self, init_thunk, final_thunk, error_thunk): + def __init__(self, init_thunk, final_func, error_handler): """RobustAction initializer All the thunks are functions whose return value will be ignored. init_thunk should not make any irreversible changes - but prepare for the writing of the important data. final_thunk + but prepare for the writing of the important data. final_func should be as short as possible and do the real work. - error_thunk is run if there is an error in init_thunk or - final_thunk. Errors in init_thunk should be corrected by - error_thunk as if nothing had been run in the first place. - The functions take no arguments except for error_thunk, which - receives the exception as its only argument. + error_handler is run if there is an error in init_thunk or + final_func. Errors in init_thunk should be corrected by + error_handler as if nothing had been run in the first place. + + init_thunk takes no arguments. + + final_thunk takes the return value of init_thunk as its + argument, and its return value is returned by execute(). + + error_handler takes three arguments: the exception, a value + which is true just in case self.init_thunk ran correctly, and + a value which will be the return value of init_thunk if it ran + correctly. """ - self.init_thunk = init_thunk - self.final_thunk = final_thunk - self.error_thunk = error_thunk + self.init_thunk = init_thunk or self.default_init_thunk + self.final_func = final_func or self.default_final_func + self.error_handler = error_handler or self.default_error_handler def execute(self): """Actually run the operation""" + ran_init_thunk = None try: - self.init_thunk() - self.final_thunk() + init_val = self.init_thunk() + ran_init_thunk = 1 + return self.final_func(init_val) except Exception, exc: # Catch all errors Log.exception() - self.error_thunk(exc) + if ran_init_thunk: self.error_handler(exc, 1, init_val) + else: self.error_handler(exc, None, None) raise exc + def default_init_thunk(self): return None + def default_final_func(self, init_val): return init_val + def default_error_handler(self, exc, ran_init, init_val): pass + class Robust: """Contains various file operations made safer using tempfiles""" - null_action = RobustAction(lambda: None, lambda: None, lambda e: None) - def chain(robust_action_list): + null_action = RobustAction(None, None, None) + def chain(*robust_action_list): """Return chain tying together a number of robust actions The whole chain will be aborted if some error occurs in initialization stage of any of the component actions. """ - ras_with_completed_inits = [] + ras_with_started_inits, init_return_vals = [], [] def init(): for ra in robust_action_list: - ras_with_completed_inits.append(ra) - ra.init_thunk() - def final(): - for ra in robust_action_list: ra.final_thunk() - def error(exc): - for ra in ras_with_completed_inits: ra.error_thunk(exc) + ras_with_started_inits.append(ra) + init_return_vals.append(ra.init_thunk()) + return init_return_vals + def final(init_return_vals): + final_vals = [] + for ra, init_val in zip(robust_action_list, init_return_vals): + final_vals.append(ra.final_func(init_val)) + return final_vals + def error(exc, ran_init, init_val): + for ra, init_val in zip(ras_with_started_inits, init_return_vals): + ra.error_handler(exc, 1, init_val) + for ra in ras_with_started_inits[len(init_return_vals):]: + ra.error_handler(exc, None, None) return RobustAction(init, final, error) - def chain_nested(robust_action_list): + def chain_nested(*robust_action_list): """Like chain but final actions performed in reverse order""" - ras_with_completed_inits = [] + ras_with_started_inits, init_vals = [], [] def init(): for ra in robust_action_list: - ras_with_completed_inits.append(ra) - ra.init_thunk() - def final(): - ralist_copy = robust_action_list[:] - ralist_copy.reverse() - for ra in ralist_copy: ra.final_thunk() - def error(exc): - for ra in ras_with_completed_inits: ra.error_thunk(exc) + ras_with_started_inits.append(ra) + init_vals.append(ra.init_thunk()) + return init_vals + def final(init_vals): + ras_and_inits = zip(robust_action_list, init_vals) + ras_and_inits.reverse() + final_vals = [] + for ra, init_val in ras_and_inits: + final_vals.append(ra.final_func(init_val)) + return final_vals + def error(exc, ran_init, init_val): + for ra, init_val in zip(ras_with_started_inits, init_return_vals): + ra.error_handler(exc, 1, init_val) + for ra in ras_with_started_inits[len(init_return_vals):]: + ra.error_handler(exc, None, None) return RobustAction(init, final, error) def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): @@ -107,18 +136,19 @@ class Robust: create RobustActions of that type. """ - assert type(tempfiles) is types.TupleType, tempfiles - if final_renames is None: final = lambda: None - else: - assert len(tempfiles) == len(final_renames) - def final(): # rename tempfiles to final positions - for i in range(len(tempfiles)): - final_name = final_renames[i] - if final_name: - if final_name.isdir(): # Cannot rename over directory - final_name.delete() - tempfiles[i].rename(final_name) - def error(exc): + if isinstance(tempfiles, TempFile): tempfiles = (tempfiles,) + if isinstance(final_renames, RPath): final_renames = (final_renames,) + if final_renames is None: final_renames = [None] * len(tempfiles) + assert len(tempfiles) == len(final_renames) + + def final(init_val): # rename tempfiles to final positions + for tempfile, destination in zip(tempfiles, final_renames): + if destination: + if destination.isdir(): # Cannot rename over directory + destination.delete() + tempfile.rename(destination) + return init_val + def error(exc, ran_init, init_val): for tf in tempfiles: tf.delete() return RobustAction(init_thunk, final, error) @@ -130,36 +160,46 @@ class Robust: overwritten). """ - tfl = [None] # Need mutable object that init and final can access + tfl = [None] # Need some mutable state to hold tf value def init(): if not (rorpin.isdir() and rpout.isdir()): # already a dir - tfl[0] = TempFileManager.new(rpout) - if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb")) + tfl[0] = tf = TempFileManager.new(rpout) + if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb")) else: RPath.copy(rorpin, tf) - def final(): - if tfl[0] and tfl[0].lstat(): + return tf + else: return None + def final(tf): + if tf and tf.lstat(): if rpout.isdir(): rpout.delete() - tfl[0].rename(rpout) - return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) + tf.rename(rpout) + return rpout + def error(exc, ran_init, init_val): + if tfl[0]: tfl[0].delete() + return RobustAction(init, final, error) def copy_with_attribs_action(rorpin, rpout, compress = None): """Like copy_action but also copy attributes""" - tfl = [None] # Need mutable object that init and final can access + tfl = [None] # Need some mutable state for error handler def init(): if not (rorpin.isdir() and rpout.isdir()): # already a dir - tfl[0] = TempFileManager.new(rpout) + tfl[0] = tf = TempFileManager.new(rpout) if rorpin.isreg(): - tfl[0].write_from_fileobj(rorpin.open("rb"), compress) - else: RPath.copy(rorpin, tfl[0]) - if tfl[0].lstat(): # Some files, like sockets, won't be created - RPathStatic.copy_attribs(rorpin, tfl[0]) - def final(): + tf.write_from_fileobj(rorpin.open("rb"), compress) + else: RPath.copy(rorpin, tf) + if tf.lstat(): # Some files, like sockets, won't be created + RPathStatic.copy_attribs(rorpin, tf) + return tf + else: return None + def final(tf): if rorpin.isdir() and rpout.isdir(): RPath.copy_attribs(rorpin, rpout) - elif tfl[0] and tfl[0].lstat(): - if rpout.isdir(): rpout.delete() - tfl[0].rename(rpout) - return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) + elif tf and tf.lstat(): + if rpout.isdir(): rpout.delete() # can't rename over dir + tf.rename(rpout) + return rpout + def error(exc, ran_init, init_val): + if tfl[0]: tfl[0].delete() + return RobustAction(init, final, error) def copy_attribs_action(rorpin, rpout): """Return action which just copies attributes @@ -168,14 +208,16 @@ class Robust: normal sequence. """ - def final(): RPath.copy_attribs(rorpin, rpout) - return RobustAction(lambda: None, final, lambda e: None) + def final(init_val): + RPath.copy_attribs(rorpin, rpout) + return rpout + return RobustAction(None, final, None) def symlink_action(rpath, linktext): """Return symlink action by moving one file over another""" tf = TempFileManager.new(rpath) def init(): tf.symlink(linktext) - return Robust.make_tf_robustaction(init, (tf,), (rpath,)) + return Robust.make_tf_robustaction(init, tf, rpath) def destructive_write_action(rp, s): """Return action writing string s to rpath rp in robust way @@ -187,9 +229,9 @@ class Robust: def init(): fp = tf.open("wb") fp.write(s) - assert not fp.close() + fp.close() tf.setdata() - return Robust.make_tf_robustaction(init, (tf,), (rp,)) + return Robust.make_tf_robustaction(init, tf, rp) def check_common_error(init_thunk, error_thunk = lambda exc: None): """Execute init_thunk, if error, run error_thunk on exception @@ -357,8 +399,8 @@ class SaveState: symtext = apply(os.path.join, ('increments',) + last_file_rorp.index) return Robust.symlink_action(cls._last_file_sym, symtext) - else: return RobustAction(lambda: None, cls.touch_last_file, - lambda exc: None) + else: return RobustAction(None, lambda init_val: cls.touch_last_file(), + None) def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None): """Save states of tree reducer and finalizer during inc backup @@ -372,9 +414,9 @@ class SaveState: cls._last_checkpoint_time = time.time() Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) state_string = cPickle.dumps((ITR, finalizer)) - Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp, - state_string), - cls.record_last_file_action(last_file_rorp)]).execute() + Robust.chain(Robust.destructive_write_action(cls._checkpoint_rp, + state_string), + cls.record_last_file_action(last_file_rorp)).execute() def checkpoint_needed(cls): """Returns true if another checkpoint is called for""" diff --git a/rdiff-backup/src/rorpiter.py b/rdiff-backup/src/rorpiter.py index fb30426..1ff0724 100644 --- a/rdiff-backup/src/rorpiter.py +++ b/rdiff-backup/src/rorpiter.py @@ -212,13 +212,13 @@ class RORPIter: """Return action patching basisrp using diff_rorp""" assert diff_rorp, "Missing diff index %s" % basisrp.index if not diff_rorp.lstat(): - return RobustAction(lambda: None, basisrp.delete, lambda e: None) + return RobustAction(None, lambda init_val: basisrp.delete(), None) if Globals.preserve_hardlinks and diff_rorp.isflaglinked(): if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) - return RobustAction(lambda: None, - lambda: Hardlink.link_rp(diff_rorp, basisrp), - lambda e: None) + tf = TempFileManager.new(basisrp) + def init(): Hardlink.link_rp(diff_rorp, tf, basisrp) + return Robust.make_tf_robustaction(init, tf, basisrp) elif basisrp and basisrp.isreg() and diff_rorp.isreg(): assert diff_rorp.get_attached_filetype() == 'diff' return Rdiff.patch_with_attribs_action(basisrp, diff_rorp) diff --git a/rdiff-backup/src/statistics.py b/rdiff-backup/src/statistics.py index 8269456..c18f34a 100644 --- a/rdiff-backup/src/statistics.py +++ b/rdiff-backup/src/statistics.py @@ -16,13 +16,29 @@ class StatsObj: 'DeletedFiles', 'DeletedFileSize', 'ChangedFiles', 'ChangedSourceSize', 'ChangedMirrorSize', - 'IncrementFileSize') + 'IncrementFiles', 'IncrementFileSize') stat_time_attrs = ('StartTime', 'EndTime', 'ElapsedTime') - stat_attrs = stat_time_attrs + stat_file_attrs + stat_attrs = ('Filename',) + stat_time_attrs + stat_file_attrs + + # Below, the second value in each pair is true iff the value + # indicates a number of bytes + stat_file_pairs = (('SourceFiles', None), ('SourceFileSize', 1), + ('MirrorFiles', None), ('MirrorFileSize', 1), + ('NewFiles', None), ('NewFileSize', 1), + ('DeletedFiles', None), ('DeletedFileSize', 1), + ('ChangedFiles', None), + ('ChangedSourceSize', 1), ('ChangedMirrorSize', 1), + ('IncrementFiles', None), ('IncrementFileSize', 1)) # Set all stats to None, indicating info not available for attr in stat_attrs: locals()[attr] = None + # This is used in get_byte_summary_string below + byte_abbrev_list = ((1024*1024*1024*1024, "TB"), + (1024*1024*1024, "GB"), + (1024*1024, "MB"), + (1024, "KB")) + def get_stat(self, attribute): """Get a statistic""" try: return self.__dict__[attribute] @@ -34,33 +50,89 @@ class StatsObj: """Set attribute to given value""" self.__dict__[attr] = value + def get_stats_line(self, index): + """Return one line abbreviated version of full stats string""" + file_attrs = map(lambda attr: str(self.get_stat(attr)), + self.stat_file_attrs) + if not index: filename = "." + else: + # use repr to quote newlines in relative filename, then + # take of leading and trailing quote. + filename = repr(apply(os.path.join, index))[1:-1] + return " ".join([filename,] + file_attrs) + + def set_stats_from_line(self, line): + """Set statistics from given line""" + def error(): raise StatsException("Bad line '%s'" % line) + if line[-1] == "\n": line = line[:-1] + lineparts = line.split(" ") + if len(lineparts) < len(stat_file_attrs): error() + for attr, val_string in zip(stat_file_attrs, + lineparts[-len(stat_file_attrs):]): + try: val = long(val_string) + except ValueError: + try: val = float(val_string) + except ValueError: error() + self.set_stat(attr, val) + return self + def get_stats_string(self): - """Return string printing out statistics""" + """Return extended string printing out statistics""" + return self.get_timestats_string() + self.get_filestats_string() + + def get_timestats_string(self): + """Return portion of statistics string dealing with time""" timelist = [] if self.StartTime is not None: - timelist.append("StartTime %s (%s)\n" % + timelist.append("StartTime %.2f (%s)\n" % (self.StartTime, Time.timetopretty(self.StartTime))) if self.EndTime is not None: - timelist.append("EndTime %s (%s)\n" % + timelist.append("EndTime %.2f (%s)\n" % (self.EndTime, Time.timetopretty(self.EndTime))) - if self.StartTime is not None and self.EndTime is not None: + if self.ElapsedTime or (self.StartTime is not None and + self.EndTime is not None): if self.ElapsedTime is None: self.ElapsedTime = self.EndTime - self.StartTime - timelist.append("ElapsedTime %s (%s)\n" % + timelist.append("ElapsedTime %.2f (%s)\n" % (self.ElapsedTime, Time.inttopretty(self.ElapsedTime))) + return "".join(timelist) + + def get_filestats_string(self): + """Return portion of statistics string about files and bytes""" + def fileline(stat_file_pair): + """Return zero or one line of the string""" + attr, in_bytes = stat_file_pair + val = self.get_stat(attr) + if val is None: return "" + if in_bytes: + return "%s %s (%s)\n" % (attr, val, + self.get_byte_summary_string(val)) + else: return "%s %s\n" % (attr, val) + + return "".join(map(fileline, self.stat_file_pairs)) - filelist = ["%s %s\n" % (attr, self.get_stat(attr)) - for attr in self.stat_file_attrs - if self.get_stat(attr) is not None] - return "".join(timelist + filelist) + def get_byte_summary_string(self, byte_count): + """Turn byte count into human readable string like "7.23GB" """ + for abbrev_bytes, abbrev_string in self.byte_abbrev_list: + if byte_count >= abbrev_bytes: + # Now get 3 significant figures + abbrev_count = float(byte_count)/abbrev_bytes + if abbrev_count >= 100: precision = 0 + elif abbrev_count >= 10: precision = 1 + else: precision = 2 + return "%%.%df %s" % (precision, abbrev_string) \ + % (abbrev_count,) + byte_count = round(byte_count) + if byte_count == 1: return "1 byte" + else: return "%d bytes" % (byte_count,) def get_stats_logstring(self, title): """Like get_stats_string, but add header and footer""" - header = "-------------[ %s ]-------------" % title + header = "--------------[ %s ]--------------" % title footer = "-" * len(header) return "%s\n%s%s\n" % (header, self.get_stats_string(), footer) - def init_stats_from_string(self, s): + def set_stats_from_string(self, s): """Initialize attributes from string, return self for convenience""" def error(line): raise StatsException("Bad line '%s'" % line) @@ -91,7 +163,7 @@ class StatsObj: def read_stats_from_rp(self, rp): """Set statistics from rpath, return self for convenience""" fp = rp.open("r") - self.init_stats_from_string(fp.read()) + self.set_stats_from_string(fp.read()) fp.close() return self @@ -162,22 +234,96 @@ class StatsITR(IterTreeReducer, StatsObj): self.ChangedFiles += 1 self.ChangedSourceSize += mirror_dsrp.getsize() self.ChangedMirrorSize += self.mirror_base_size - self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 + if inc_rp: + self.IncrementFiles += 1 + self.IncrementFileSize += inc_rp.getsize() else: # new file was created self.NewFiles += 1 self.NewFileSize += mirror_dsrp.getsize() - self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 + if inc_rp: + self.IncrementFiles += 1 + self.IncrementFileSize += inc_rp.getsize() else: if self.mirror_base_exists: # file was deleted from mirror self.MirrorFiles += 1 self.MirrorFileSize += self.mirror_base_size self.DeletedFiles += 1 self.DeletedFileSize += self.mirror_base_size - self.IncrementFileSize += inc_rp and inc_rp.getsize() or 0 - + if inc_rp: + self.IncrementFiles += 1 + self.IncrementFileSize += inc_rp.getsize() def add_file_stats(self, subinstance): """Add all file statistics from subinstance to current totals""" for attr in self.stat_file_attrs: self.set_stat(attr, self.get_stat(attr) + subinstance.get_stat(attr)) + + +class Stats: + """Misc statistics methods, pertaining to dir and session stat files""" + # This is the RPath of the directory statistics file, and the + # associated open file. It will hold a line of statistics for + # each directory that is backed up. + _dir_stats_rp = None + _dir_stats_fp = None + + # This goes at the beginning of the directory statistics file and + # explains the format. + _dir_stats_header = """# rdiff-backup directory statistics file +# +# Each line is in the following format: +# RelativeDirName %s +""" % " ".join(StatsObj.stat_file_attrs) + + def open_dir_stats_file(cls): + """Open directory statistics file, write header""" + assert not cls._dir_stats_fp, "Directory file already open" + + if Globals.compression: suffix = "data.gz" + else: suffix = "data" + cls._dir_stats_rp = Inc.get_inc(Globals.rbdir.append( + "directory_statistics"), Time.curtime, suffix) + + if cls._dir_stats_rp.lstat(): + Log("Warning, statistics file %s already exists, appending", 2) + cls._dir_stats_fp = cls._dir_stats_rp.open("ab", + Globals.compression) + else: cls._dir_stats_fp = \ + cls._dir_stats_rp.open("wb", Globals.compression) + cls._dir_stats_fp.write(cls._dir_stats_header) + + def write_dir_stats_line(cls, statobj, index): + """Write info from statobj about rpath to statistics file""" + cls._dir_stats_fp.write(statobj.get_stats_line(index) +"\n") + + def close_dir_stats_file(cls): + """Close directory statistics file if its open""" + if cls._dir_stats_fp: + cls._dir_stats_fp.close() + cls._dir_stats_fp = None + + def write_session_statistics(cls, statobj): + """Write session statistics into file, log""" + stat_inc = Inc.get_inc(Globals.rbdir.append("session_statistics"), + Time.curtime, "data") + statobj.StartTime = Time.curtime + statobj.EndTime = time.time() + + # include hardlink data and dir stats in size of increments + if Globals.preserve_hardlinks and Hardlink.final_inc: + # include hardlink data in size of increments + statobj.IncrementFiles += 1 + statobj.IncrementFileSize += Hardlink.final_inc.getsize() + if cls._dir_stats_rp and cls._dir_stats_rp.lstat(): + statobj.IncrementFiles += 1 + statobj.IncrementFileSize += cls._dir_stats_rp.getsize() + + statobj.write_stats_to_rp(stat_inc) + if Globals.print_statistics: + message = statobj.get_stats_logstring("Session statistics") + Log.log_to_file(message) + Globals.client_conn.sys.stdout.write(message) + +MakeClass(Stats) + diff --git a/rdiff-backup/src/ttime.py b/rdiff-backup/src/ttime.py index bfa3c6f..852f6ea 100644 --- a/rdiff-backup/src/ttime.py +++ b/rdiff-backup/src/ttime.py @@ -95,7 +95,9 @@ class Time: if seconds == 1: partlist.append("1 second") elif not partlist or seconds > 1: - partlist.append("%s seconds" % seconds) + if isinstance(seconds, int) or isinstance(seconds, long): + partlist.append("%s seconds" % seconds) + else: partlist.append("%.2f seconds" % seconds) return " ".join(partlist) def intstringtoseconds(cls, interval_string): -- cgit v1.2.1