# Copyright 2002 Ben Escoto # # This file is part of rdiff-backup. # # rdiff-backup is free software; you can redistribute it and/or modify # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # rdiff-backup is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with rdiff-backup; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA """Prevent mirror from being corrupted; handle errors Ideally no matter an instance of rdiff-backup gets aborted, no information should get lost. The target directory should be left in a coherent state, and later instances of rdiff-backup should clean things up so there is no sign that anything ever got aborted or failed. Thus, files should be updated in an atomic way as possible. Each file should be updated (and the corresponding diff files written) or not, and it should be clear which happened. In general, I don't think this is possible, since the creation of the diff files and the changing of updated files cannot be guarateed to happen together. It is possible, I think, to record various information to files which would allow a later process to figure out what the last operation was, but this would add several file operations to the processing of each file, and I don't think, would be a good tradeoff. The compromise reached here is that diff files should be created just before the mirror files are updated, and each file update should be done with a rename operation on a file in the same directory. Furthermore, every once in a while, rdiff-backup will record which file it just finished processing. If any fatal errors are caught, it will also record the last processed file. Future instances may not know exactly when the previous instance was aborted, but they will be able to narrow down the possibilities. """ import os, time from log import Log import Time, librsync, errno, signal, cPickle, C, \ Hardlink, TempFile, static, rpath, Globals class Action: """Represents a file operation to be accomplished later""" def __init__(self, init_thunk, final_func, error_handler): """Action initializer All the thunks are functions whose return value will be ignored. init_thunk should not make any irreversible changes but prepare for the writing of the important data. final_func should be as short as possible and do the real work. error_handler is run if there is an error in init_thunk or final_func. Errors in init_thunk should be corrected by error_handler as if nothing had been run in the first place. init_thunk takes no arguments. final_thunk takes the return value of init_thunk as its argument, and its return value is returned by execute(). error_handler takes three arguments: the exception, a value which is true just in case self.init_thunk ran correctly, and a value which will be the return value of init_thunk if it ran correctly. """ self.init_thunk = init_thunk or self.default_init_thunk self.final_func = final_func or self.default_final_func self.error_handler = error_handler or self.default_error_handler def execute(self): """Actually run the operation""" ran_init_thunk = None try: init_val = self.init_thunk() ran_init_thunk = 1 return self.final_func(init_val) except Exception, exc: # Catch all errors Log.exception() TracebackArchive.add() if ran_init_thunk: self.error_handler(exc, 1, init_val) else: self.error_handler(exc, None, None) raise exc def default_init_thunk(self): return None def default_final_func(self, init_val): return init_val def default_error_handler(self, exc, ran_init, init_val): pass null_action = Action(None, None, None) def chain(*robust_action_list): """Return chain tying together a number of robust actions The whole chain will be aborted if some error occurs in initialization stage of any of the component actions. """ ras_with_started_inits, init_return_vals = [], [] def init(): for ra in robust_action_list: ras_with_started_inits.append(ra) init_return_vals.append(ra.init_thunk()) return init_return_vals def final(init_return_vals): final_vals = [] for ra, init_val in zip(robust_action_list, init_return_vals): final_vals.append(ra.final_func(init_val)) return final_vals def error(exc, ran_init, init_val): for ra, init_val in zip(ras_with_started_inits, init_return_vals): ra.error_handler(exc, 1, init_val) for ra in ras_with_started_inits[len(init_return_vals):]: ra.error_handler(exc, None, None) return Action(init, final, error) def chain_nested(*robust_action_list): """Like chain but final actions performed in reverse order""" ras_with_started_inits, init_vals = [], [] def init(): for ra in robust_action_list: ras_with_started_inits.append(ra) init_vals.append(ra.init_thunk()) return init_vals def final(init_vals): ras_and_inits = zip(robust_action_list, init_vals) ras_and_inits.reverse() final_vals = [] for ra, init_val in ras_and_inits: final_vals.append(ra.final_func(init_val)) return final_vals def error(exc, ran_init, init_val): for ra, init_val in zip(ras_with_started_inits, init_vals): ra.error_handler(exc, 1, init_val) for ra in ras_with_started_inits[len(init_vals):]: ra.error_handler(exc, None, None) return Action(init, final, error) def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): """Shortcut Action creator when only tempfiles involved Often the robust action will just consist of some initial stage, renaming tempfiles in the final stage, and deleting them if there is an error. This function makes it easier to create Actions of that type. """ if isinstance(tempfiles, TempFile.TempFile): tempfiles = (tempfiles,) if isinstance(final_renames, rpath.RPath): final_renames = (final_renames,) if final_renames is None: final_renames = [None] * len(tempfiles) assert len(tempfiles) == len(final_renames) def final(init_val): # rename tempfiles to final positions for tempfile, destination in zip(tempfiles, final_renames): if destination: if destination.isdir(): # Cannot rename over directory destination.delete() tempfile.rename(destination) return init_val def error(exc, ran_init, init_val): for tf in tempfiles: tf.delete() return Action(init_thunk, final, error) def copy_action(rorpin, rpout): """Return robust action copying rorpin to rpout The source can be a rorp or an rpath. Does not recurse. If directories copied, then just exit (output directory not overwritten). """ tfl = [None] # Need some mutable state to hold tf value def init(): if not (rorpin.isdir() and rpout.isdir()): # already a dir tfl[0] = tf = TempFile.new(rpout) if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb")) else: rpath.copy(rorpin, tf) return tf else: return None def final(tf): if tf and tf.lstat(): if rpout.isdir(): rpout.delete() tf.rename(rpout) return rpout def error(exc, ran_init, init_val): if tfl[0]: tfl[0].delete() return Action(init, final, error) def copy_with_attribs_action(rorpin, rpout, compress = None): """Like copy_action but also copy attributes""" tfl = [None] # Need some mutable state for error handler def init(): if not (rorpin.isdir() and rpout.isdir()): # already a dir tfl[0] = tf = TempFile.new(rpout) if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb"), compress) else: rpath.copy(rorpin, tf) if tf.lstat(): # Some files, like sockets, won't be created rpath.copy_attribs(rorpin, tf) return tf else: return None def final(tf): if rorpin.isdir() and rpout.isdir(): rpath.copy_attribs(rorpin, rpout) elif tf and tf.lstat(): if rpout.isdir(): rpout.delete() # can't rename over dir tf.rename(rpout) return rpout def error(exc, ran_init, init_val): if tfl[0]: tfl[0].delete() return Action(init, final, error) def copy_attribs_action(rorpin, rpout): """Return action which just copies attributes Copying attributes is already pretty atomic, so just run normal sequence. """ def final(init_val): rpath.copy_attribs(rorpin, rpout) return rpout return Action(None, final, None) def symlink_action(rpath, linktext): """Return symlink action by moving one file over another""" tf = TempFile.new(rpath) def init(): tf.symlink(linktext) return make_tf_robustaction(init, tf, rpath) def destructive_write_action(rp, s): """Return action writing string s to rpath rp in robust way This will overwrite any data currently in rp. """ tf = TempFile.new(rp) def init(): fp = tf.open("wb") fp.write(s) fp.close() tf.setdata() return make_tf_robustaction(init, tf, rp) def check_common_error(error_handler, function, args = []): """Apply function to args, if error, run error_handler on exception This uses the catch_error predicate below to only catch certain exceptions which seems innocent enough. """ try: return function(*args) except Exception, exc: TracebackArchive.add([function] + list(args)) if catch_error(exc): Log.exception() conn = Globals.backup_writer if conn is not None: # increment error count ITRB_exists = conn.Globals.is_not_None('ITRB') if ITRB_exists: conn.Globals.ITRB.increment_stat('Errors') if error_handler: return error_handler(exc, *args) else: return Log.exception(1, 2) raise def catch_error(exc): """Return true if exception exc should be caught""" for exception_class in (rpath.SkipFileException, rpath.RPathException, librsync.librsyncError, C.UnknownFileTypeError): if isinstance(exc, exception_class): return 1 if (isinstance(exc, EnvironmentError) and errno.errorcode[exc[0]] in ('EPERM', 'ENOENT', 'EACCES', 'EBUSY', 'EEXIST', 'ENOTDIR', 'ENAMETOOLONG', 'EINTR', 'ENOTEMPTY', 'EIO', 'ETXTBSY', 'ESRCH', 'EINVAL')): return 1 return 0 def listrp(rp): """Like rp.listdir() but return [] if error, and sort results""" def error_handler(exc): Log("Error listing directory %s" % rp.path, 2) return [] dir_listing = check_common_error(error_handler, rp.listdir) dir_listing.sort() return dir_listing def signal_handler(signum, frame): """This is called when signal signum is caught""" raise SignalException(signum) def install_signal_handlers(): """Install signal handlers on current connection""" for signum in [signal.SIGQUIT, signal.SIGHUP, signal.SIGTERM]: signal.signal(signum, signal_handler) class SignalException(Exception): """SignalException(signum) means signal signum has been received""" pass class TracebackArchive: """Save last 10 caught exceptions, so they can be printed if fatal""" _traceback_strings = [] def add(cls, extra_args = []): """Add most recent exception to archived list If extra_args are present, convert to strings and add them as extra information to same traceback archive. """ cls._traceback_strings.append(Log.exception_to_string(extra_args)) if len(cls._traceback_strings) > 10: cls._traceback_strings = cls._traceback_strings[:10] def log(cls): """Print all exception information to log file""" if cls._traceback_strings: Log("------------ Old traceback info -----------\n%s\n" "-------------------------------------------" % ("\n".join(cls._traceback_strings),), 3) static.MakeClass(TracebackArchive) class SaveState: """Save state in the middle of backups for resuming later""" _last_file_sym = None # RPath of sym pointing to last file processed _last_file_definitive_rp = None # Touch this if last file is really last _last_checkpoint_time = 0 # time in seconds of last checkpoint _checkpoint_rp = None # RPath of checkpoint data pickle def init_filenames(cls): """Set rpaths of markers. Assume rbdir already set.""" if not Globals.isbackup_writer: return Globals.backup_writer.SaveState.init_filenames() assert Globals.local_connection is Globals.rbdir.conn, \ (Globals.rbdir.conn, Globals.backup_writer) cls._last_file_sym = Globals.rbdir.append( "last-file-incremented.%s.data" % Time.curtimestr) cls._checkpoint_rp = Globals.rbdir.append( "checkpoint-data.%s.data" % Time.curtimestr) cls._last_file_definitive_rp = Globals.rbdir.append( "last-file-definitive.%s.data" % Time.curtimestr) def touch_last_file(cls): """Touch last file marker, indicating backup has begun""" if not cls._last_file_sym.lstat(): cls._last_file_sym.touch() def touch_last_file_definitive(cls): """Create last-file-definitive marker When a backup gets aborted, there may be time to indicate the last file successfully processed, and this should be touched. Sometimes when the abort is hard, there may be a last file indicated, but further files since then have been processed, in which case this shouldn't be touched. """ cls._last_file_definitive_rp.touch() def record_last_file_action(cls, last_file_rorp): """Action recording last file to be processed as symlink in rbdir last_file_rorp is None means that no file is known to have been processed. """ if last_file_rorp: symtext = apply(os.path.join, ('increments',) + last_file_rorp.index) return symlink_action(cls._last_file_sym, symtext) else: return Action(None, lambda init_val: cls.touch_last_file(), None) def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None): """Save states of tree reducer and finalizer during inc backup If override is true, checkpoint even if one isn't due. """ if not override and not cls.checkpoint_needed(): return assert cls._checkpoint_rp, "_checkpoint_rp not set yet" cls._last_checkpoint_time = time.time() Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) state_string = cPickle.dumps((ITR, finalizer)) chain(destructive_write_action(cls._checkpoint_rp, state_string), cls.record_last_file_action(last_file_rorp)).execute() def checkpoint_needed(cls): """Returns true if another checkpoint is called for""" return (time.time() > cls._last_checkpoint_time + Globals.checkpoint_interval) def checkpoint_remove(cls): """Remove all checkpointing data after successful operation""" for rp in Resume.get_relevant_rps(): rp.delete() if Globals.preserve_hardlinks: Hardlink.remove_all_checkpoints() static.MakeClass(SaveState) class ResumeException(Exception): """Indicates some error has been encountered while trying to resume""" pass class Resume: """Check for old aborted backups and resume if necessary""" _session_info_list = None # List of ResumeSessionInfo's, sorted by time def FindTime(cls, index, later_than = 0): """For a given index, find the appropriate time to use for inc If it is clear which time to use (because it is determined by definitive records, or there are no aborted backup, etc.) then just return the appropriate time. Otherwise, if an aborted backup was last checkpointed before the index, assume that it didn't get there, and go for the older time. If an inc file is already present, the function will be rerun with later time specified. """ assert Globals.isbackup_writer if Time.prevtime > later_than: return Time.prevtime # usual case for si in cls.get_sis_covering_index(index): if si.time > later_than: return si.time raise rpath.SkipFileException("Index %s already covered, skipping" % str(index)) def get_sis_covering_index(cls, index): """Return sorted list of SessionInfos which may cover index Aborted backup may be relevant unless index is lower and we are sure that it didn't go further. """ return filter(lambda session_info: not ((session_info.last_index is None or session_info.last_index < index) and session_info.last_definitive), cls._session_info_list) def SetSessionInfo(cls): """Read data directory and initialize _session_info""" assert Globals.isbackup_writer silist = [] rp_quad_dict = cls.group_rps_by_time(cls.get_relevant_rps()) times = rp_quad_dict.keys() times.sort() for time in times: try: silist.append(cls.quad_to_si(time, rp_quad_dict[time])) except ResumeException: Log("Bad resume information found, skipping", 2) cls._session_info_list = silist def get_relevant_rps(cls): """Return list of relevant rpaths in rbdata directory""" relevant_bases = ['last-file-incremented', 'last-file-mirrored', 'checkpoint-data', 'last-file-definitive'] rps = map(Globals.rbdir.append, Globals.rbdir.listdir()) return filter(lambda rp: rp.isincfile() and rp.getincbase_str() in relevant_bases, rps) def group_rps_by_time(cls, rplist): """Take list of rps return time dict {time: quadlist} Times in seconds are the keys, values are triples of rps [last-file-incremented, last-file-mirrored, checkpoint-data, last-is-definitive]. """ result = {} for rp in rplist: time = Time.stringtotime(rp.getinctime()) if result.has_key(time): quadlist = result[time] else: quadlist = [None, None, None, None] base_string = rp.getincbase_str() if base_string == 'last-file-incremented': quadlist[0] = rp elif base_string == 'last-file-mirrored': quadlist[1] = rp elif base_string == 'last-file-definitive': quadlist[3] = 1 else: assert base_string == 'checkpoint-data' quadlist[2] = rp result[time] = quadlist return result def quad_to_si(cls, time, quad): """Take time, quadlist, return associated ResumeSessionInfo""" increment_sym, mirror_sym, checkpoint_rp, last_definitive = quad if increment_sym and mirror_sym: raise ResumeException("both mirror and inc sym shouldn't exist") ITR, finalizer = None, None if increment_sym: mirror = None last_index = cls.sym_to_index(increment_sym) if checkpoint_rp: ITR, finalizer = cls.unpickle_checkpoint(checkpoint_rp) elif mirror_sym: mirror = 1 last_index = cls.sym_to_index(mirror_sym) if checkpoint_rp: finalizer = cls.unpickle_checkpoint(checkpoint_rp) else: raise ResumeException("Missing increment or mirror sym") return ResumeSessionInfo(mirror, time, last_index, last_definitive, finalizer, ITR) def sym_to_index(cls, sym_rp): """Read last file sym rp, return last file index If sym_rp is not a sym at all, return None, indicating that no file index was ever conclusively processed. """ if not sym_rp.issym(): return None link_components = sym_rp.readlink().split("/") assert link_components[0] == 'increments' return tuple(link_components[1:]) def unpickle_checkpoint(cls, checkpoint_rp): """Read data from checkpoint_rp and return unpickled data Return value is pair (patch increment ITR, finalizer state). """ fp = checkpoint_rp.open("rb") data = fp.read() fp.close() try: result = cPickle.loads(data) except Exception, exc: raise ResumeException("Bad pickle at %s: %s" % (checkpoint_rp.path, exc)) return result def ResumeCheck(cls): """Return relevant ResumeSessionInfo if there's one we should resume Also if find RSI to resume, reset current time to old resume time. """ cls.SetSessionInfo() if not cls._session_info_list: if Globals.resume == 1: Log.FatalError("User specified resume, but no data on " "previous backup found.") else: return None else: si = cls._session_info_list[-1] if (Globals.resume == 1 or (time.time() <= (si.time + Globals.resume_window) and not Globals.resume == 0)): Log("Resuming aborted backup dated %s" % Time.timetopretty(si.time), 2) Time.setcurtime(si.time) if Globals.preserve_hardlinks: if (not si.last_definitive or not Hardlink.retrieve_checkpoint(Globals.rbdir, si.time)): Log("Hardlink information not successfully " "recovered.", 2) return si else: Log("Last backup dated %s was aborted, but we aren't " "resuming it." % Time.timetopretty(si.time), 2) return None assert None static.MakeClass(Resume) class ResumeSessionInfo: """Hold information about a previously aborted session""" def __init__(self, mirror, time, last_index, last_definitive, finalizer = None, ITR = None): """Class initializer time - starting time in seconds of backup mirror - true if backup was a mirror, false if increment last_index - Last confirmed index processed by backup, or None last_definitive - True is we know last_index is really last finalizer - the dsrp finalizer if available ITR - For increment, ITM reducer (assume mirror if NA) """ self.time = time self.mirror = mirror self.last_index = last_index self.last_definitive = last_definitive self.ITR, self.finalizer, = ITR, finalizer