diff options
Diffstat (limited to 'rdiff-backup/rdiff_backup/robust.py')
-rw-r--r-- | rdiff-backup/rdiff_backup/robust.py | 523 |
1 files changed, 10 insertions, 513 deletions
diff --git a/rdiff-backup/rdiff_backup/robust.py b/rdiff-backup/rdiff_backup/robust.py index 67f32be..e43ceea 100644 --- a/rdiff-backup/rdiff_backup/robust.py +++ b/rdiff-backup/rdiff_backup/robust.py @@ -17,241 +17,9 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA -"""Prevent mirror from being corrupted; handle errors +"""Catch various exceptions given system call""" -Ideally no matter an instance of rdiff-backup gets aborted, no -information should get lost. The target directory should be left in a -coherent state, and later instances of rdiff-backup should clean -things up so there is no sign that anything ever got aborted or -failed. - -Thus, files should be updated in an atomic way as possible. Each file -should be updated (and the corresponding diff files written) or not, -and it should be clear which happened. In general, I don't think this -is possible, since the creation of the diff files and the changing of -updated files cannot be guarateed to happen together. It is possible, -I think, to record various information to files which would allow a -later process to figure out what the last operation was, but this -would add several file operations to the processing of each file, and -I don't think, would be a good tradeoff. - -The compromise reached here is that diff files should be created just -before the mirror files are updated, and each file update should be -done with a rename operation on a file in the same directory. -Furthermore, every once in a while, rdiff-backup will record which -file it just finished processing. If any fatal errors are caught, it -will also record the last processed file. Future instances may not -know exactly when the previous instance was aborted, but they will be -able to narrow down the possibilities. - -""" - -import os, time -from log import Log -import Time, librsync, errno, signal, cPickle, C, \ - Hardlink, TempFile, static, rpath, Globals - - -class Action: - """Represents a file operation to be accomplished later""" - def __init__(self, init_thunk, final_func, error_handler): - """Action initializer - - All the thunks are functions whose return value will be - ignored. init_thunk should not make any irreversible changes - but prepare for the writing of the important data. final_func - should be as short as possible and do the real work. - error_handler is run if there is an error in init_thunk or - final_func. Errors in init_thunk should be corrected by - error_handler as if nothing had been run in the first place. - - init_thunk takes no arguments. - - final_thunk takes the return value of init_thunk as its - argument, and its return value is returned by execute(). - - error_handler takes three arguments: the exception, a value - which is true just in case self.init_thunk ran correctly, and - a value which will be the return value of init_thunk if it ran - correctly. - - """ - self.init_thunk = init_thunk or self.default_init_thunk - self.final_func = final_func or self.default_final_func - self.error_handler = error_handler or self.default_error_handler - - def execute(self): - """Actually run the operation""" - ran_init_thunk = None - try: - init_val = self.init_thunk() - ran_init_thunk = 1 - return self.final_func(init_val) - except Exception, exc: # Catch all errors - Log.exception() - TracebackArchive.add() - if ran_init_thunk: self.error_handler(exc, 1, init_val) - else: self.error_handler(exc, None, None) - raise exc - - def default_init_thunk(self): return None - def default_final_func(self, init_val): return init_val - def default_error_handler(self, exc, ran_init, init_val): pass - - -null_action = Action(None, None, None) -def chain(*robust_action_list): - """Return chain tying together a number of robust actions - - The whole chain will be aborted if some error occurs in - initialization stage of any of the component actions. - - """ - ras_with_started_inits, init_return_vals = [], [] - def init(): - for ra in robust_action_list: - ras_with_started_inits.append(ra) - init_return_vals.append(ra.init_thunk()) - return init_return_vals - def final(init_return_vals): - final_vals = [] - for ra, init_val in zip(robust_action_list, init_return_vals): - final_vals.append(ra.final_func(init_val)) - return final_vals - def error(exc, ran_init, init_val): - for ra, init_val in zip(ras_with_started_inits, init_return_vals): - ra.error_handler(exc, 1, init_val) - for ra in ras_with_started_inits[len(init_return_vals):]: - ra.error_handler(exc, None, None) - return Action(init, final, error) - -def chain_nested(*robust_action_list): - """Like chain but final actions performed in reverse order""" - ras_with_started_inits, init_vals = [], [] - def init(): - for ra in robust_action_list: - ras_with_started_inits.append(ra) - init_vals.append(ra.init_thunk()) - return init_vals - def final(init_vals): - ras_and_inits = zip(robust_action_list, init_vals) - ras_and_inits.reverse() - final_vals = [] - for ra, init_val in ras_and_inits: - final_vals.append(ra.final_func(init_val)) - return final_vals - def error(exc, ran_init, init_val): - for ra, init_val in zip(ras_with_started_inits, init_vals): - ra.error_handler(exc, 1, init_val) - for ra in ras_with_started_inits[len(init_vals):]: - ra.error_handler(exc, None, None) - return Action(init, final, error) - -def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): - """Shortcut Action creator when only tempfiles involved - - Often the robust action will just consist of some initial - stage, renaming tempfiles in the final stage, and deleting - them if there is an error. This function makes it easier to - create Actions of that type. - - """ - if isinstance(tempfiles, TempFile.TempFile): tempfiles = (tempfiles,) - if isinstance(final_renames, rpath.RPath): final_renames = (final_renames,) - if final_renames is None: final_renames = [None] * len(tempfiles) - assert len(tempfiles) == len(final_renames) - - def final(init_val): # rename tempfiles to final positions - for tempfile, destination in zip(tempfiles, final_renames): - if destination: - if destination.isdir(): # Cannot rename over directory - destination.delete() - tempfile.rename(destination) - return init_val - def error(exc, ran_init, init_val): - for tf in tempfiles: tf.delete() - return Action(init_thunk, final, error) - -def copy_action(rorpin, rpout): - """Return robust action copying rorpin to rpout - - The source can be a rorp or an rpath. Does not recurse. If - directories copied, then just exit (output directory not - overwritten). - - """ - tfl = [None] # Need some mutable state to hold tf value - def init(): - if not (rorpin.isdir() and rpout.isdir()): # already a dir - tfl[0] = tf = TempFile.new(rpout) - if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb")) - else: rpath.copy(rorpin, tf) - return tf - else: return None - def final(tf): - if tf and tf.lstat(): - if rpout.isdir(): rpout.delete() - tf.rename(rpout) - return rpout - def error(exc, ran_init, init_val): - if tfl[0]: tfl[0].delete() - return Action(init, final, error) - -def copy_with_attribs_action(rorpin, rpout, compress = None): - """Like copy_action but also copy attributes""" - tfl = [None] # Need some mutable state for error handler - def init(): - if not (rorpin.isdir() and rpout.isdir()): # already a dir - tfl[0] = tf = TempFile.new(rpout) - if rorpin.isreg(): - tf.write_from_fileobj(rorpin.open("rb"), compress) - else: rpath.copy(rorpin, tf) - if tf.lstat(): # Some files, like sockets, won't be created - rpath.copy_attribs(rorpin, tf) - return tf - else: return None - def final(tf): - if rorpin.isdir() and rpout.isdir(): - rpath.copy_attribs(rorpin, rpout) - elif tf and tf.lstat(): - if rpout.isdir(): rpout.delete() # can't rename over dir - tf.rename(rpout) - return rpout - def error(exc, ran_init, init_val): - if tfl[0]: tfl[0].delete() - return Action(init, final, error) - -def copy_attribs_action(rorpin, rpout): - """Return action which just copies attributes - - Copying attributes is already pretty atomic, so just run - normal sequence. - - """ - def final(init_val): - rpath.copy_attribs(rorpin, rpout) - return rpout - return Action(None, final, None) - -def symlink_action(rpath, linktext): - """Return symlink action by moving one file over another""" - tf = TempFile.new(rpath) - def init(): tf.symlink(linktext) - return make_tf_robustaction(init, tf, rpath) - -def destructive_write_action(rp, s): - """Return action writing string s to rpath rp in robust way - - This will overwrite any data currently in rp. - - """ - tf = TempFile.new(rp) - def init(): - fp = tf.open("wb") - fp.write(s) - fp.close() - tf.setdata() - return make_tf_robustaction(init, tf, rp) +import librsync, errno, signal, C, static, rpath, Globals, log, statistics def check_common_error(error_handler, function, args = []): """Apply function to args, if error, run error_handler on exception @@ -264,19 +32,16 @@ def check_common_error(error_handler, function, args = []): except Exception, exc: TracebackArchive.add([function] + list(args)) if catch_error(exc): - Log.exception() + log.Log.exception() conn = Globals.backup_writer - if conn is not None: # increment error count - ITRB_exists = conn.Globals.is_not_None('ITRB') - if ITRB_exists: conn.Globals.ITRB.increment_stat('Errors') + if conn is not None: statistics.record_error() if error_handler: return error_handler(exc, *args) else: return - Log.exception(1, 2) + log.Log.exception(1, 2) raise def catch_error(exc): """Return true if exception exc should be caught""" - for exception_class in (rpath.SkipFileException, rpath.RPathException, librsync.librsyncError, C.UnknownFileTypeError): if isinstance(exc, exception_class): return 1 @@ -291,7 +56,7 @@ def catch_error(exc): def listrp(rp): """Like rp.listdir() but return [] if error, and sort results""" def error_handler(exc): - Log("Error listing directory %s" % rp.path, 2) + log.Log("Error listing directory %s" % rp.path, 2) return [] dir_listing = check_common_error(error_handler, rp.listdir) dir_listing.sort() @@ -322,284 +87,16 @@ class TracebackArchive: extra information to same traceback archive. """ - cls._traceback_strings.append(Log.exception_to_string(extra_args)) + cls._traceback_strings.append(log.Log.exception_to_string(extra_args)) if len(cls._traceback_strings) > 10: cls._traceback_strings = cls._traceback_strings[:10] def log(cls): """Print all exception information to log file""" if cls._traceback_strings: - Log("------------ Old traceback info -----------\n%s\n" - "-------------------------------------------" % - ("\n".join(cls._traceback_strings),), 3) + log.Log("------------ Old traceback info -----------\n%s\n" + "-------------------------------------------" % + ("\n".join(cls._traceback_strings),), 3) static.MakeClass(TracebackArchive) - -class SaveState: - """Save state in the middle of backups for resuming later""" - _last_file_sym = None # RPath of sym pointing to last file processed - _last_file_definitive_rp = None # Touch this if last file is really last - _last_checkpoint_time = 0 # time in seconds of last checkpoint - _checkpoint_rp = None # RPath of checkpoint data pickle - - def init_filenames(cls): - """Set rpaths of markers. Assume rbdir already set.""" - if not Globals.isbackup_writer: - return Globals.backup_writer.SaveState.init_filenames() - - assert Globals.local_connection is Globals.rbdir.conn, \ - (Globals.rbdir.conn, Globals.backup_writer) - - cls._last_file_sym = Globals.rbdir.append( - "last-file-incremented.%s.data" % Time.curtimestr) - cls._checkpoint_rp = Globals.rbdir.append( - "checkpoint-data.%s.data" % Time.curtimestr) - cls._last_file_definitive_rp = Globals.rbdir.append( - "last-file-definitive.%s.data" % Time.curtimestr) - - def touch_last_file(cls): - """Touch last file marker, indicating backup has begun""" - if not cls._last_file_sym.lstat(): cls._last_file_sym.touch() - - def touch_last_file_definitive(cls): - """Create last-file-definitive marker - - When a backup gets aborted, there may be time to indicate the - last file successfully processed, and this should be touched. - Sometimes when the abort is hard, there may be a last file - indicated, but further files since then have been processed, - in which case this shouldn't be touched. - - """ - cls._last_file_definitive_rp.touch() - - def record_last_file_action(cls, last_file_rorp): - """Action recording last file to be processed as symlink in rbdir - - last_file_rorp is None means that no file is known to have - been processed. - - """ - if last_file_rorp: - symtext = apply(os.path.join, - ('increments',) + last_file_rorp.index) - return symlink_action(cls._last_file_sym, symtext) - else: return Action(None, lambda init_val: cls.touch_last_file(), None) - - def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None): - """Save states of tree reducer and finalizer during inc backup - - If override is true, checkpoint even if one isn't due. - - """ - if not override and not cls.checkpoint_needed(): return - assert cls._checkpoint_rp, "_checkpoint_rp not set yet" - - cls._last_checkpoint_time = time.time() - Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) - state_string = cPickle.dumps((ITR, finalizer)) - chain(destructive_write_action(cls._checkpoint_rp, state_string), - cls.record_last_file_action(last_file_rorp)).execute() - - def checkpoint_needed(cls): - """Returns true if another checkpoint is called for""" - return (time.time() > cls._last_checkpoint_time + - Globals.checkpoint_interval) - - def checkpoint_remove(cls): - """Remove all checkpointing data after successful operation""" - for rp in Resume.get_relevant_rps(): rp.delete() - if Globals.preserve_hardlinks: Hardlink.remove_all_checkpoints() - -static.MakeClass(SaveState) - - -class ResumeException(Exception): - """Indicates some error has been encountered while trying to resume""" - pass - -class Resume: - """Check for old aborted backups and resume if necessary""" - _session_info_list = None # List of ResumeSessionInfo's, sorted by time - def FindTime(cls, index, later_than = 0): - """For a given index, find the appropriate time to use for inc - - If it is clear which time to use (because it is determined by - definitive records, or there are no aborted backup, etc.) then - just return the appropriate time. Otherwise, if an aborted - backup was last checkpointed before the index, assume that it - didn't get there, and go for the older time. If an inc file - is already present, the function will be rerun with later time - specified. - - """ - assert Globals.isbackup_writer - if Time.prevtime > later_than: return Time.prevtime # usual case - - for si in cls.get_sis_covering_index(index): - if si.time > later_than: return si.time - raise rpath.SkipFileException("Index %s already covered, skipping" % - str(index)) - - def get_sis_covering_index(cls, index): - """Return sorted list of SessionInfos which may cover index - - Aborted backup may be relevant unless index is lower and we - are sure that it didn't go further. - - """ - return filter(lambda session_info: - not ((session_info.last_index is None or - session_info.last_index < index) and - session_info.last_definitive), - cls._session_info_list) - - def SetSessionInfo(cls): - """Read data directory and initialize _session_info""" - assert Globals.isbackup_writer - silist = [] - rp_quad_dict = cls.group_rps_by_time(cls.get_relevant_rps()) - times = rp_quad_dict.keys() - times.sort() - for time in times: - try: silist.append(cls.quad_to_si(time, rp_quad_dict[time])) - except ResumeException: - Log("Bad resume information found, skipping", 2) - cls._session_info_list = silist - - def get_relevant_rps(cls): - """Return list of relevant rpaths in rbdata directory""" - relevant_bases = ['last-file-incremented', 'last-file-mirrored', - 'checkpoint-data', 'last-file-definitive'] - rps = map(Globals.rbdir.append, Globals.rbdir.listdir()) - return filter(lambda rp: rp.isincfile() - and rp.getincbase_str() in relevant_bases, rps) - - def group_rps_by_time(cls, rplist): - """Take list of rps return time dict {time: quadlist} - - Times in seconds are the keys, values are triples of rps - [last-file-incremented, last-file-mirrored, checkpoint-data, - last-is-definitive]. - - """ - result = {} - for rp in rplist: - time = Time.stringtotime(rp.getinctime()) - if result.has_key(time): quadlist = result[time] - else: quadlist = [None, None, None, None] - base_string = rp.getincbase_str() - if base_string == 'last-file-incremented': quadlist[0] = rp - elif base_string == 'last-file-mirrored': quadlist[1] = rp - elif base_string == 'last-file-definitive': quadlist[3] = 1 - else: - assert base_string == 'checkpoint-data' - quadlist[2] = rp - result[time] = quadlist - return result - - def quad_to_si(cls, time, quad): - """Take time, quadlist, return associated ResumeSessionInfo""" - increment_sym, mirror_sym, checkpoint_rp, last_definitive = quad - if increment_sym and mirror_sym: - raise ResumeException("both mirror and inc sym shouldn't exist") - ITR, finalizer = None, None - if increment_sym: - mirror = None - last_index = cls.sym_to_index(increment_sym) - if checkpoint_rp: - ITR, finalizer = cls.unpickle_checkpoint(checkpoint_rp) - elif mirror_sym: - mirror = 1 - last_index = cls.sym_to_index(mirror_sym) - if checkpoint_rp: - finalizer = cls.unpickle_checkpoint(checkpoint_rp) - else: raise ResumeException("Missing increment or mirror sym") - return ResumeSessionInfo(mirror, time, last_index, last_definitive, - finalizer, ITR) - - def sym_to_index(cls, sym_rp): - """Read last file sym rp, return last file index - - If sym_rp is not a sym at all, return None, indicating that no - file index was ever conclusively processed. - - """ - if not sym_rp.issym(): return None - link_components = sym_rp.readlink().split("/") - assert link_components[0] == 'increments' - return tuple(link_components[1:]) - - def unpickle_checkpoint(cls, checkpoint_rp): - """Read data from checkpoint_rp and return unpickled data - - Return value is pair (patch increment ITR, finalizer state). - - """ - fp = checkpoint_rp.open("rb") - data = fp.read() - fp.close() - try: result = cPickle.loads(data) - except Exception, exc: - raise ResumeException("Bad pickle at %s: %s" % - (checkpoint_rp.path, exc)) - return result - - def ResumeCheck(cls): - """Return relevant ResumeSessionInfo if there's one we should resume - - Also if find RSI to resume, reset current time to old resume - time. - - """ - cls.SetSessionInfo() - if not cls._session_info_list: - if Globals.resume == 1: - Log.FatalError("User specified resume, but no data on " - "previous backup found.") - else: return None - else: - si = cls._session_info_list[-1] - if (Globals.resume == 1 or - (time.time() <= (si.time + Globals.resume_window) and - not Globals.resume == 0)): - Log("Resuming aborted backup dated %s" % - Time.timetopretty(si.time), 2) - Time.setcurtime(si.time) - if Globals.preserve_hardlinks: - if (not si.last_definitive or not - Hardlink.retrieve_checkpoint(Globals.rbdir, si.time)): - Log("Hardlink information not successfully " - "recovered.", 2) - return si - else: - Log("Last backup dated %s was aborted, but we aren't " - "resuming it." % Time.timetopretty(si.time), 2) - return None - assert None - -static.MakeClass(Resume) - - -class ResumeSessionInfo: - """Hold information about a previously aborted session""" - def __init__(self, mirror, time, last_index, - last_definitive, finalizer = None, ITR = None): - """Class initializer - - time - starting time in seconds of backup - mirror - true if backup was a mirror, false if increment - last_index - Last confirmed index processed by backup, or None - last_definitive - True is we know last_index is really last - finalizer - the dsrp finalizer if available - ITR - For increment, ITM reducer (assume mirror if NA) - - """ - self.time = time - self.mirror = mirror - self.last_index = last_index - self.last_definitive = last_definitive - self.ITR, self.finalizer, = ITR, finalizer - - |