summaryrefslogtreecommitdiff
path: root/rdiff-backup/rdiff_backup/robust.py
diff options
context:
space:
mode:
Diffstat (limited to 'rdiff-backup/rdiff_backup/robust.py')
-rw-r--r--rdiff-backup/rdiff_backup/robust.py186
1 files changed, 114 insertions, 72 deletions
diff --git a/rdiff-backup/rdiff_backup/robust.py b/rdiff-backup/rdiff_backup/robust.py
index 3795bd1..74e0d12 100644
--- a/rdiff-backup/rdiff_backup/robust.py
+++ b/rdiff-backup/rdiff_backup/robust.py
@@ -33,69 +33,98 @@ execfile("hardlink.py")
class RobustAction:
"""Represents a file operation to be accomplished later"""
- def __init__(self, init_thunk, final_thunk, error_thunk):
+ def __init__(self, init_thunk, final_func, error_handler):
"""RobustAction initializer
All the thunks are functions whose return value will be
ignored. init_thunk should not make any irreversible changes
- but prepare for the writing of the important data. final_thunk
+ but prepare for the writing of the important data. final_func
should be as short as possible and do the real work.
- error_thunk is run if there is an error in init_thunk or
- final_thunk. Errors in init_thunk should be corrected by
- error_thunk as if nothing had been run in the first place.
- The functions take no arguments except for error_thunk, which
- receives the exception as its only argument.
+ error_handler is run if there is an error in init_thunk or
+ final_func. Errors in init_thunk should be corrected by
+ error_handler as if nothing had been run in the first place.
+
+ init_thunk takes no arguments.
+
+ final_thunk takes the return value of init_thunk as its
+ argument, and its return value is returned by execute().
+
+ error_handler takes three arguments: the exception, a value
+ which is true just in case self.init_thunk ran correctly, and
+ a value which will be the return value of init_thunk if it ran
+ correctly.
"""
- self.init_thunk = init_thunk
- self.final_thunk = final_thunk
- self.error_thunk = error_thunk
+ self.init_thunk = init_thunk or self.default_init_thunk
+ self.final_func = final_func or self.default_final_func
+ self.error_handler = error_handler or self.default_error_handler
def execute(self):
"""Actually run the operation"""
+ ran_init_thunk = None
try:
- self.init_thunk()
- self.final_thunk()
+ init_val = self.init_thunk()
+ ran_init_thunk = 1
+ return self.final_func(init_val)
except Exception, exc: # Catch all errors
Log.exception()
- self.error_thunk(exc)
+ if ran_init_thunk: self.error_handler(exc, 1, init_val)
+ else: self.error_handler(exc, None, None)
raise exc
+ def default_init_thunk(self): return None
+ def default_final_func(self, init_val): return init_val
+ def default_error_handler(self, exc, ran_init, init_val): pass
+
class Robust:
"""Contains various file operations made safer using tempfiles"""
- null_action = RobustAction(lambda: None, lambda: None, lambda e: None)
- def chain(robust_action_list):
+ null_action = RobustAction(None, None, None)
+ def chain(*robust_action_list):
"""Return chain tying together a number of robust actions
The whole chain will be aborted if some error occurs in
initialization stage of any of the component actions.
"""
- ras_with_completed_inits = []
+ ras_with_started_inits, init_return_vals = [], []
def init():
for ra in robust_action_list:
- ras_with_completed_inits.append(ra)
- ra.init_thunk()
- def final():
- for ra in robust_action_list: ra.final_thunk()
- def error(exc):
- for ra in ras_with_completed_inits: ra.error_thunk(exc)
+ ras_with_started_inits.append(ra)
+ init_return_vals.append(ra.init_thunk())
+ return init_return_vals
+ def final(init_return_vals):
+ final_vals = []
+ for ra, init_val in zip(robust_action_list, init_return_vals):
+ final_vals.append(ra.final_func(init_val))
+ return final_vals
+ def error(exc, ran_init, init_val):
+ for ra, init_val in zip(ras_with_started_inits, init_return_vals):
+ ra.error_handler(exc, 1, init_val)
+ for ra in ras_with_started_inits[len(init_return_vals):]:
+ ra.error_handler(exc, None, None)
return RobustAction(init, final, error)
- def chain_nested(robust_action_list):
+ def chain_nested(*robust_action_list):
"""Like chain but final actions performed in reverse order"""
- ras_with_completed_inits = []
+ ras_with_started_inits, init_vals = [], []
def init():
for ra in robust_action_list:
- ras_with_completed_inits.append(ra)
- ra.init_thunk()
- def final():
- ralist_copy = robust_action_list[:]
- ralist_copy.reverse()
- for ra in ralist_copy: ra.final_thunk()
- def error(exc):
- for ra in ras_with_completed_inits: ra.error_thunk(exc)
+ ras_with_started_inits.append(ra)
+ init_vals.append(ra.init_thunk())
+ return init_vals
+ def final(init_vals):
+ ras_and_inits = zip(robust_action_list, init_vals)
+ ras_and_inits.reverse()
+ final_vals = []
+ for ra, init_val in ras_and_inits:
+ final_vals.append(ra.final_func(init_val))
+ return final_vals
+ def error(exc, ran_init, init_val):
+ for ra, init_val in zip(ras_with_started_inits, init_return_vals):
+ ra.error_handler(exc, 1, init_val)
+ for ra in ras_with_started_inits[len(init_return_vals):]:
+ ra.error_handler(exc, None, None)
return RobustAction(init, final, error)
def make_tf_robustaction(init_thunk, tempfiles, final_renames = None):
@@ -107,18 +136,19 @@ class Robust:
create RobustActions of that type.
"""
- assert type(tempfiles) is types.TupleType, tempfiles
- if final_renames is None: final = lambda: None
- else:
- assert len(tempfiles) == len(final_renames)
- def final(): # rename tempfiles to final positions
- for i in range(len(tempfiles)):
- final_name = final_renames[i]
- if final_name:
- if final_name.isdir(): # Cannot rename over directory
- final_name.delete()
- tempfiles[i].rename(final_name)
- def error(exc):
+ if isinstance(tempfiles, TempFile): tempfiles = (tempfiles,)
+ if isinstance(final_renames, RPath): final_renames = (final_renames,)
+ if final_renames is None: final_renames = [None] * len(tempfiles)
+ assert len(tempfiles) == len(final_renames)
+
+ def final(init_val): # rename tempfiles to final positions
+ for tempfile, destination in zip(tempfiles, final_renames):
+ if destination:
+ if destination.isdir(): # Cannot rename over directory
+ destination.delete()
+ tempfile.rename(destination)
+ return init_val
+ def error(exc, ran_init, init_val):
for tf in tempfiles: tf.delete()
return RobustAction(init_thunk, final, error)
@@ -130,36 +160,46 @@ class Robust:
overwritten).
"""
- tfl = [None] # Need mutable object that init and final can access
+ tfl = [None] # Need some mutable state to hold tf value
def init():
if not (rorpin.isdir() and rpout.isdir()): # already a dir
- tfl[0] = TempFileManager.new(rpout)
- if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb"))
+ tfl[0] = tf = TempFileManager.new(rpout)
+ if rorpin.isreg(): tf.write_from_fileobj(rorpin.open("rb"))
else: RPath.copy(rorpin, tf)
- def final():
- if tfl[0] and tfl[0].lstat():
+ return tf
+ else: return None
+ def final(tf):
+ if tf and tf.lstat():
if rpout.isdir(): rpout.delete()
- tfl[0].rename(rpout)
- return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete())
+ tf.rename(rpout)
+ return rpout
+ def error(exc, ran_init, init_val):
+ if tfl[0]: tfl[0].delete()
+ return RobustAction(init, final, error)
def copy_with_attribs_action(rorpin, rpout, compress = None):
"""Like copy_action but also copy attributes"""
- tfl = [None] # Need mutable object that init and final can access
+ tfl = [None] # Need some mutable state for error handler
def init():
if not (rorpin.isdir() and rpout.isdir()): # already a dir
- tfl[0] = TempFileManager.new(rpout)
+ tfl[0] = tf = TempFileManager.new(rpout)
if rorpin.isreg():
- tfl[0].write_from_fileobj(rorpin.open("rb"), compress)
- else: RPath.copy(rorpin, tfl[0])
- if tfl[0].lstat(): # Some files, like sockets, won't be created
- RPathStatic.copy_attribs(rorpin, tfl[0])
- def final():
+ tf.write_from_fileobj(rorpin.open("rb"), compress)
+ else: RPath.copy(rorpin, tf)
+ if tf.lstat(): # Some files, like sockets, won't be created
+ RPathStatic.copy_attribs(rorpin, tf)
+ return tf
+ else: return None
+ def final(tf):
if rorpin.isdir() and rpout.isdir():
RPath.copy_attribs(rorpin, rpout)
- elif tfl[0] and tfl[0].lstat():
- if rpout.isdir(): rpout.delete()
- tfl[0].rename(rpout)
- return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete())
+ elif tf and tf.lstat():
+ if rpout.isdir(): rpout.delete() # can't rename over dir
+ tf.rename(rpout)
+ return rpout
+ def error(exc, ran_init, init_val):
+ if tfl[0]: tfl[0].delete()
+ return RobustAction(init, final, error)
def copy_attribs_action(rorpin, rpout):
"""Return action which just copies attributes
@@ -168,14 +208,16 @@ class Robust:
normal sequence.
"""
- def final(): RPath.copy_attribs(rorpin, rpout)
- return RobustAction(lambda: None, final, lambda e: None)
+ def final(init_val):
+ RPath.copy_attribs(rorpin, rpout)
+ return rpout
+ return RobustAction(None, final, None)
def symlink_action(rpath, linktext):
"""Return symlink action by moving one file over another"""
tf = TempFileManager.new(rpath)
def init(): tf.symlink(linktext)
- return Robust.make_tf_robustaction(init, (tf,), (rpath,))
+ return Robust.make_tf_robustaction(init, tf, rpath)
def destructive_write_action(rp, s):
"""Return action writing string s to rpath rp in robust way
@@ -187,9 +229,9 @@ class Robust:
def init():
fp = tf.open("wb")
fp.write(s)
- assert not fp.close()
+ fp.close()
tf.setdata()
- return Robust.make_tf_robustaction(init, (tf,), (rp,))
+ return Robust.make_tf_robustaction(init, tf, rp)
def check_common_error(init_thunk, error_thunk = lambda exc: None):
"""Execute init_thunk, if error, run error_thunk on exception
@@ -357,8 +399,8 @@ class SaveState:
symtext = apply(os.path.join,
('increments',) + last_file_rorp.index)
return Robust.symlink_action(cls._last_file_sym, symtext)
- else: return RobustAction(lambda: None, cls.touch_last_file,
- lambda exc: None)
+ else: return RobustAction(None, lambda init_val: cls.touch_last_file(),
+ None)
def checkpoint(cls, ITR, finalizer, last_file_rorp, override = None):
"""Save states of tree reducer and finalizer during inc backup
@@ -372,9 +414,9 @@ class SaveState:
cls._last_checkpoint_time = time.time()
Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
state_string = cPickle.dumps((ITR, finalizer))
- Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp,
- state_string),
- cls.record_last_file_action(last_file_rorp)]).execute()
+ Robust.chain(Robust.destructive_write_action(cls._checkpoint_rp,
+ state_string),
+ cls.record_last_file_action(last_file_rorp)).execute()
def checkpoint_needed(cls):
"""Returns true if another checkpoint is called for"""