From 9a0da726e2172321cdc1dcd21441f4ffc41e7931 Mon Sep 17 00:00:00 2001 From: bescoto Date: Mon, 23 Dec 2002 06:53:18 +0000 Subject: Major refactoring - avoid use of 'from XX import *' in favor of more normal 'import XXX' syntax. The previous way was an artifact from earlier versions where the whole program fit in one file. git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@252 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/rdiff_backup/robust.py | 531 +++++++++++++++--------------------- 1 file changed, 219 insertions(+), 312 deletions(-) (limited to 'rdiff-backup/rdiff_backup/robust.py') diff --git a/rdiff-backup/rdiff_backup/robust.py b/rdiff-backup/rdiff_backup/robust.py index be7f1e8..67f32be 100644 --- a/rdiff-backup/rdiff_backup/robust.py +++ b/rdiff-backup/rdiff_backup/robust.py @@ -46,13 +46,16 @@ able to narrow down the possibilities. """ -import tempfile, errno, signal, cPickle, C -from static import * +import os, time +from log import Log +import Time, librsync, errno, signal, cPickle, C, \ + Hardlink, TempFile, static, rpath, Globals -class RobustAction: + +class Action: """Represents a file operation to be accomplished later""" def __init__(self, init_thunk, final_func, error_handler): - """RobustAction initializer + """Action initializer All the thunks are functions whose return value will be ignored. init_thunk should not make any irreversible changes @@ -96,217 +99,212 @@ class RobustAction: def default_error_handler(self, exc, ran_init, init_val): pass -class Robust: - """Contains various methods designed to make things safer""" - null_action = RobustAction(None, None, None) - def chain(*robust_action_list): - """Return chain tying together a number of robust actions - - The whole chain will be aborted if some error occurs in - initialization stage of any of the component actions. - - """ - ras_with_started_inits, init_return_vals = [], [] - def init(): - for ra in robust_action_list: - ras_with_started_inits.append(ra) - init_return_vals.append(ra.init_thunk()) - return init_return_vals - def final(init_return_vals): - final_vals = [] - for ra, init_val in zip(robust_action_list, init_return_vals): - final_vals.append(ra.final_func(init_val)) - return final_vals - def error(exc, ran_init, init_val): - for ra, init_val in zip(ras_with_started_inits, init_return_vals): - ra.error_handler(exc, 1, init_val) - for ra in ras_with_started_inits[len(init_return_vals):]: - ra.error_handler(exc, None, None) - return RobustAction(init, final, error) - - def chain_nested(*robust_action_list): - """Like chain but final actions performed in reverse order""" - ras_with_started_inits, init_vals = [], [] - def init(): - for ra in robust_action_list: - ras_with_started_inits.append(ra) - init_vals.append(ra.init_thunk()) - return init_vals - def final(init_vals): - ras_and_inits = zip(robust_action_list, init_vals) - ras_and_inits.reverse() - final_vals = [] - for ra, init_val in ras_and_inits: - final_vals.append(ra.final_func(init_val)) - return final_vals - def error(exc, ran_init, init_val): - for ra, init_val in zip(ras_with_started_inits, init_vals): - ra.error_handler(exc, 1, init_val) - for ra in ras_with_started_inits[len(init_vals):]: - ra.error_handler(exc, None, None) - return RobustAction(init, final, error) - - def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): - """Shortcut RobustAction creator when only tempfiles involved - - Often the robust action will just consist of some initial - stage, renaming tempfiles in the final stage, and deleting - them if there is an error. This function makes it easier to - create RobustActions of that type. - - """ - if isinstance(tempfiles, TempFile): tempfiles = (tempfiles,) - if isinstance(final_renames, RPath): final_renames = (final_renames,) - if final_renames is None: final_renames = [None] * len(tempfiles) - assert len(tempfiles) == len(final_renames) - - def final(init_val): # rename tempfiles to final positions - for tempfile, destination in zip(tempfiles, final_renames): - if destination: - if destination.isdir(): # Cannot rename over directory - destination.delete() - tempfile.rename(destination) - return init_val - def error(exc, ran_init, init_val): - for tf in tempfiles: tf.delete() - return RobustAction(init_thunk, final, error) - - def copy_action(rorpin, rpout): - """Return robust action copying rorpin to rpout - - The source can be a rorp or an rpath. Does not recurse. If - directories copied, then just exit (output directory not - overwritten). - - """ - tfl = [None] # Need some mutable state to hold tf value - def init(): - if not (rorpin.isdir() and rpout.isdir()): # already a dir - tfl[0] = tf = TempFileManager.new(rpout) - if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb")) - else: RPath.copy(rorpin, tf) - return tf - else: return None - def final(tf): - if tf and tf.lstat(): - if rpout.isdir(): rpout.delete() - tf.rename(rpout) - return rpout - def error(exc, ran_init, init_val): - if tfl[0]: tfl[0].delete() - return RobustAction(init, final, error) - - def copy_with_attribs_action(rorpin, rpout, compress = None): - """Like copy_action but also copy attributes""" - tfl = [None] # Need some mutable state for error handler - def init(): - if not (rorpin.isdir() and rpout.isdir()): # already a dir - tfl[0] = tf = TempFileManager.new(rpout) - if rorpin.isreg(): - tf.write_from_fileobj(rorpin.open("rb"), compress) - else: RPath.copy(rorpin, tf) - if tf.lstat(): # Some files, like sockets, won't be created - RPathStatic.copy_attribs(rorpin, tf) - return tf - else: return None - def final(tf): - if rorpin.isdir() and rpout.isdir(): - RPath.copy_attribs(rorpin, rpout) - elif tf and tf.lstat(): - if rpout.isdir(): rpout.delete() # can't rename over dir - tf.rename(rpout) - return rpout - def error(exc, ran_init, init_val): - if tfl[0]: tfl[0].delete() - return RobustAction(init, final, error) - - def copy_attribs_action(rorpin, rpout): - """Return action which just copies attributes - - Copying attributes is already pretty atomic, so just run - normal sequence. - - """ - def final(init_val): - RPath.copy_attribs(rorpin, rpout) - return rpout - return RobustAction(None, final, None) - - def symlink_action(rpath, linktext): - """Return symlink action by moving one file over another""" - tf = TempFileManager.new(rpath) - def init(): tf.symlink(linktext) - return Robust.make_tf_robustaction(init, tf, rpath) - - def destructive_write_action(rp, s): - """Return action writing string s to rpath rp in robust way - - This will overwrite any data currently in rp. - - """ - tf = TempFileManager.new(rp) - def init(): - fp = tf.open("wb") - fp.write(s) - fp.close() - tf.setdata() - return Robust.make_tf_robustaction(init, tf, rp) +null_action = Action(None, None, None) +def chain(*robust_action_list): + """Return chain tying together a number of robust actions + + The whole chain will be aborted if some error occurs in + initialization stage of any of the component actions. + + """ + ras_with_started_inits, init_return_vals = [], [] + def init(): + for ra in robust_action_list: + ras_with_started_inits.append(ra) + init_return_vals.append(ra.init_thunk()) + return init_return_vals + def final(init_return_vals): + final_vals = [] + for ra, init_val in zip(robust_action_list, init_return_vals): + final_vals.append(ra.final_func(init_val)) + return final_vals + def error(exc, ran_init, init_val): + for ra, init_val in zip(ras_with_started_inits, init_return_vals): + ra.error_handler(exc, 1, init_val) + for ra in ras_with_started_inits[len(init_return_vals):]: + ra.error_handler(exc, None, None) + return Action(init, final, error) + +def chain_nested(*robust_action_list): + """Like chain but final actions performed in reverse order""" + ras_with_started_inits, init_vals = [], [] + def init(): + for ra in robust_action_list: + ras_with_started_inits.append(ra) + init_vals.append(ra.init_thunk()) + return init_vals + def final(init_vals): + ras_and_inits = zip(robust_action_list, init_vals) + ras_and_inits.reverse() + final_vals = [] + for ra, init_val in ras_and_inits: + final_vals.append(ra.final_func(init_val)) + return final_vals + def error(exc, ran_init, init_val): + for ra, init_val in zip(ras_with_started_inits, init_vals): + ra.error_handler(exc, 1, init_val) + for ra in ras_with_started_inits[len(init_vals):]: + ra.error_handler(exc, None, None) + return Action(init, final, error) + +def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): + """Shortcut Action creator when only tempfiles involved + + Often the robust action will just consist of some initial + stage, renaming tempfiles in the final stage, and deleting + them if there is an error. This function makes it easier to + create Actions of that type. + + """ + if isinstance(tempfiles, TempFile.TempFile): tempfiles = (tempfiles,) + if isinstance(final_renames, rpath.RPath): final_renames = (final_renames,) + if final_renames is None: final_renames = [None] * len(tempfiles) + assert len(tempfiles) == len(final_renames) + + def final(init_val): # rename tempfiles to final positions + for tempfile, destination in zip(tempfiles, final_renames): + if destination: + if destination.isdir(): # Cannot rename over directory + destination.delete() + tempfile.rename(destination) + return init_val + def error(exc, ran_init, init_val): + for tf in tempfiles: tf.delete() + return Action(init_thunk, final, error) + +def copy_action(rorpin, rpout): + """Return robust action copying rorpin to rpout + + The source can be a rorp or an rpath. Does not recurse. If + directories copied, then just exit (output directory not + overwritten). + + """ + tfl = [None] # Need some mutable state to hold tf value + def init(): + if not (rorpin.isdir() and rpout.isdir()): # already a dir + tfl[0] = tf = TempFile.new(rpout) + if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb")) + else: rpath.copy(rorpin, tf) + return tf + else: return None + def final(tf): + if tf and tf.lstat(): + if rpout.isdir(): rpout.delete() + tf.rename(rpout) + return rpout + def error(exc, ran_init, init_val): + if tfl[0]: tfl[0].delete() + return Action(init, final, error) + +def copy_with_attribs_action(rorpin, rpout, compress = None): + """Like copy_action but also copy attributes""" + tfl = [None] # Need some mutable state for error handler + def init(): + if not (rorpin.isdir() and rpout.isdir()): # already a dir + tfl[0] = tf = TempFile.new(rpout) + if rorpin.isreg(): + tf.write_from_fileobj(rorpin.open("rb"), compress) + else: rpath.copy(rorpin, tf) + if tf.lstat(): # Some files, like sockets, won't be created + rpath.copy_attribs(rorpin, tf) + return tf + else: return None + def final(tf): + if rorpin.isdir() and rpout.isdir(): + rpath.copy_attribs(rorpin, rpout) + elif tf and tf.lstat(): + if rpout.isdir(): rpout.delete() # can't rename over dir + tf.rename(rpout) + return rpout + def error(exc, ran_init, init_val): + if tfl[0]: tfl[0].delete() + return Action(init, final, error) + +def copy_attribs_action(rorpin, rpout): + """Return action which just copies attributes + + Copying attributes is already pretty atomic, so just run + normal sequence. + + """ + def final(init_val): + rpath.copy_attribs(rorpin, rpout) + return rpout + return Action(None, final, None) + +def symlink_action(rpath, linktext): + """Return symlink action by moving one file over another""" + tf = TempFile.new(rpath) + def init(): tf.symlink(linktext) + return make_tf_robustaction(init, tf, rpath) + +def destructive_write_action(rp, s): + """Return action writing string s to rpath rp in robust way + + This will overwrite any data currently in rp. + + """ + tf = TempFile.new(rp) + def init(): + fp = tf.open("wb") + fp.write(s) + fp.close() + tf.setdata() + return make_tf_robustaction(init, tf, rp) - def check_common_error(error_handler, function, args = []): - """Apply function to args, if error, run error_handler on exception +def check_common_error(error_handler, function, args = []): + """Apply function to args, if error, run error_handler on exception - This uses the catch_error predicate below to only catch - certain exceptions which seems innocent enough. + This uses the catch_error predicate below to only catch + certain exceptions which seems innocent enough. - """ - try: return function(*args) - except Exception, exc: - TracebackArchive.add([function] + list(args)) - if Robust.catch_error(exc): - Log.exception() - conn = Globals.backup_writer - if conn is not None: # increment error count - ITRB_exists = conn.Globals.is_not_None('ITRB') - if ITRB_exists: conn.Globals.ITRB.increment_stat('Errors') - if error_handler: return error_handler(exc, *args) - else: return - Log.exception(1, 2) - raise - - def catch_error(exc): - """Return true if exception exc should be caught""" - for exception_class in (SkipFileException, DSRPPermError, - RPathException, Rdiff.RdiffException, - librsync.librsyncError, - C.UnknownFileTypeError): - if isinstance(exc, exception_class): return 1 - if (isinstance(exc, EnvironmentError) and - errno.errorcode[exc[0]] in ('EPERM', 'ENOENT', 'EACCES', 'EBUSY', - 'EEXIST', 'ENOTDIR', 'ENAMETOOLONG', - 'EINTR', 'ENOTEMPTY', 'EIO', 'ETXTBSY', - 'ESRCH', 'EINVAL')): - return 1 - return 0 - - def listrp(rp): - """Like rp.listdir() but return [] if error, and sort results""" - def error_handler(exc): - Log("Error listing directory %s" % rp.path, 2) - return [] - dir_listing = Robust.check_common_error(error_handler, rp.listdir) - dir_listing.sort() - return dir_listing - - def signal_handler(signum, frame): - """This is called when signal signum is caught""" - raise SignalException(signum) - - def install_signal_handlers(): - """Install signal handlers on current connection""" - for signum in [signal.SIGQUIT, signal.SIGHUP, signal.SIGTERM]: - signal.signal(signum, Robust.signal_handler) - -MakeStatic(Robust) + """ + try: return function(*args) + except Exception, exc: + TracebackArchive.add([function] + list(args)) + if catch_error(exc): + Log.exception() + conn = Globals.backup_writer + if conn is not None: # increment error count + ITRB_exists = conn.Globals.is_not_None('ITRB') + if ITRB_exists: conn.Globals.ITRB.increment_stat('Errors') + if error_handler: return error_handler(exc, *args) + else: return + Log.exception(1, 2) + raise + +def catch_error(exc): + """Return true if exception exc should be caught""" + + for exception_class in (rpath.SkipFileException, rpath.RPathException, + librsync.librsyncError, C.UnknownFileTypeError): + if isinstance(exc, exception_class): return 1 + if (isinstance(exc, EnvironmentError) and + errno.errorcode[exc[0]] in ('EPERM', 'ENOENT', 'EACCES', 'EBUSY', + 'EEXIST', 'ENOTDIR', 'ENAMETOOLONG', + 'EINTR', 'ENOTEMPTY', 'EIO', 'ETXTBSY', + 'ESRCH', 'EINVAL')): + return 1 + return 0 + +def listrp(rp): + """Like rp.listdir() but return [] if error, and sort results""" + def error_handler(exc): + Log("Error listing directory %s" % rp.path, 2) + return [] + dir_listing = check_common_error(error_handler, rp.listdir) + dir_listing.sort() + return dir_listing + +def signal_handler(signum, frame): + """This is called when signal signum is caught""" + raise SignalException(signum) + +def install_signal_handlers(): + """Install signal handlers on current connection""" + for signum in [signal.SIGQUIT, signal.SIGHUP, signal.SIGTERM]: + signal.signal(signum, signal_handler) class SignalException(Exception): @@ -335,91 +333,7 @@ class TracebackArchive: "-------------------------------------------" % ("\n".join(cls._traceback_strings),), 3) -MakeClass(TracebackArchive) - - -class TempFileManager: - """Manage temp files""" - - # This is a connection-specific list of temp files, to be cleaned - # up before rdiff-backup exits. - _tempfiles = [] - - # To make collisions less likely, this gets put in the file name - # and incremented whenever a new file is requested. - _tfindex = 0 - - def new(cls, rp_base, same_dir = 1): - """Return new tempfile that isn't in use. - - If same_dir, tempfile will be in same directory as rp_base. - Otherwise, use tempfile module to get filename. - - """ - conn = rp_base.conn - if conn is not Globals.local_connection: - return conn.TempFileManager.new(rp_base, same_dir) - - def find_unused(conn, dir): - """Find an unused tempfile with connection conn in directory dir""" - while 1: - if cls._tfindex > 100000000: - Log("Resetting index", 2) - cls._tfindex = 0 - tf = TempFile(conn, os.path.join(dir, - "rdiff-backup.tmp.%d" % cls._tfindex)) - cls._tfindex = cls._tfindex+1 - if not tf.lstat(): return tf - - if same_dir: tf = find_unused(conn, rp_base.dirsplit()[0]) - else: tf = TempFile(conn, tempfile.mktemp()) - cls._tempfiles.append(tf) - return tf - - def remove_listing(cls, tempfile): - """Remove listing of tempfile""" - if Globals.local_connection is not tempfile.conn: - tempfile.conn.TempFileManager.remove_listing(tempfile) - elif tempfile in cls._tempfiles: cls._tempfiles.remove(tempfile) - - def delete_all(cls): - """Delete all remaining tempfiles""" - for tf in cls._tempfiles[:]: tf.delete() - -MakeClass(TempFileManager) - - -from rpath import * - -class TempFile(RPath): - """Like an RPath, but keep track of which ones are still here""" - def rename(self, rp_dest): - """Rename temp file to permanent location, possibly overwriting""" - if self.isdir() and not rp_dest.isdir(): - # Cannot move a directory directly over another file - rp_dest.delete() - if (isinstance(rp_dest, DSRPath) and rp_dest.delay_perms - and not self.hasfullperms()): - # If we are moving to a delayed perm directory, delay - # permission change on destination. - rp_dest.chmod(self.getperms()) - self.chmod(0700) - RPathStatic.rename(self, rp_dest) - - # Sometimes this just seems to fail silently, as in one - # hardlinked twin is moved over the other. So check to make - # sure below. - self.setdata() - if self.lstat(): - rp_dest.delete() - RPathStatic.rename(self, rp_dest) - self.setdata() - if self.lstat(): raise OSError("Cannot rename tmp file correctly") - TempFileManager.remove_listing(self) - - def delete(self): - RPath.delete(self) - TempFileManager.remove_listing(self) +static.MakeClass(TracebackArchive) class SaveState: @@ -470,9 +384,8 @@ class SaveState: if last_file_rorp: symtext = apply(os.path.join, ('increments',) + last_file_rorp.index) - return Robust.symlink_action(cls._last_file_sym, symtext) - else: return RobustAction(None, lambda init_val: cls.touch_last_file(), - None) + return symlink_action(cls._last_file_sym, symtext) + else: return Action(None, lambda init_val: cls.touch_last_file(), None) def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None): """Save states of tree reducer and finalizer during inc backup @@ -486,9 +399,8 @@ class SaveState: cls._last_checkpoint_time = time.time() Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) state_string = cPickle.dumps((ITR, finalizer)) - Robust.chain(Robust.destructive_write_action(cls._checkpoint_rp, - state_string), - cls.record_last_file_action(last_file_rorp)).execute() + chain(destructive_write_action(cls._checkpoint_rp, state_string), + cls.record_last_file_action(last_file_rorp)).execute() def checkpoint_needed(cls): """Returns true if another checkpoint is called for""" @@ -500,7 +412,7 @@ class SaveState: for rp in Resume.get_relevant_rps(): rp.delete() if Globals.preserve_hardlinks: Hardlink.remove_all_checkpoints() -MakeClass(SaveState) +static.MakeClass(SaveState) class ResumeException(Exception): @@ -527,8 +439,8 @@ class Resume: for si in cls.get_sis_covering_index(index): if si.time > later_than: return si.time - raise SkipFileException("Index %s already covered, skipping" % - str(index)) + raise rpath.SkipFileException("Index %s already covered, skipping" % + str(index)) def get_sis_covering_index(cls, index): """Return sorted list of SessionInfos which may cover index @@ -667,7 +579,7 @@ class Resume: return None assert None -MakeClass(Resume) +static.MakeClass(Resume) class ResumeSessionInfo: @@ -691,8 +603,3 @@ class ResumeSessionInfo: self.ITR, self.finalizer, = ITR, finalizer -from log import * -from destructive_stepping import * -import Time, Rdiff, librsync -from highlevel import * - -- cgit v1.2.1