From 8c37a5bdfdd46d5cfad6e9d67925ddef9ca382bf Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 21 Mar 2002 07:22:43 +0000 Subject: First checkin git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@2 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/rdiff_backup/connection.py | 467 ++++++++++++++ rdiff-backup/rdiff_backup/destructive_stepping.py | 250 ++++++++ rdiff-backup/rdiff_backup/filelist.py | 106 ++++ rdiff-backup/rdiff_backup/header.py | 18 + rdiff-backup/rdiff_backup/highlevel.py | 288 +++++++++ rdiff-backup/rdiff_backup/increment.py | 180 ++++++ rdiff-backup/rdiff_backup/iterfile.py | 235 ++++++++ rdiff-backup/rdiff_backup/lazy.py | 343 +++++++++++ rdiff-backup/rdiff_backup/log.py | 142 +++++ rdiff-backup/rdiff_backup/manage.py | 99 +++ rdiff-backup/rdiff_backup/restore.py | 158 +++++ rdiff-backup/rdiff_backup/rlist.py | 240 ++++++++ rdiff-backup/rdiff_backup/robust.py | 537 +++++++++++++++++ rdiff-backup/rdiff_backup/rorpiter.py | 248 ++++++++ rdiff-backup/rdiff_backup/rpath.py | 704 ++++++++++++++++++++++ rdiff-backup/rdiff_backup/static.py | 30 + 16 files changed, 4045 insertions(+) create mode 100644 rdiff-backup/rdiff_backup/connection.py create mode 100644 rdiff-backup/rdiff_backup/destructive_stepping.py create mode 100644 rdiff-backup/rdiff_backup/filelist.py create mode 100644 rdiff-backup/rdiff_backup/header.py create mode 100644 rdiff-backup/rdiff_backup/highlevel.py create mode 100644 rdiff-backup/rdiff_backup/increment.py create mode 100644 rdiff-backup/rdiff_backup/iterfile.py create mode 100644 rdiff-backup/rdiff_backup/lazy.py create mode 100644 rdiff-backup/rdiff_backup/log.py create mode 100644 rdiff-backup/rdiff_backup/manage.py create mode 100644 rdiff-backup/rdiff_backup/restore.py create mode 100644 rdiff-backup/rdiff_backup/rlist.py create mode 100644 rdiff-backup/rdiff_backup/robust.py create mode 100644 rdiff-backup/rdiff_backup/rorpiter.py create mode 100644 rdiff-backup/rdiff_backup/rpath.py create mode 100644 rdiff-backup/rdiff_backup/static.py (limited to 'rdiff-backup/rdiff_backup') diff --git a/rdiff-backup/rdiff_backup/connection.py b/rdiff-backup/rdiff_backup/connection.py new file mode 100644 index 0000000..83fc874 --- /dev/null +++ b/rdiff-backup/rdiff_backup/connection.py @@ -0,0 +1,467 @@ +execfile("rdiff.py") +import types, os, tempfile, cPickle, shutil, traceback + +####################################################################### +# +# connection - Code that deals with remote execution +# + +class ConnectionError(Exception): + pass + +class ConnectionQuit(Exception): + pass + + +class Connection: + """Connection class - represent remote execution + + The idea is that, if c is an instance of this class, c.foo will + return the object on the remote side. For functions, c.foo will + return a function that, when called, executes foo on the remote + side, sending over the arguments and sending back the result. + + """ + def __repr__(self): return self.__str__() + + +class LocalConnection(Connection): + """Local connection + + This is a dummy connection class, so that LC.foo just evaluates to + foo using global scope. + + """ + def __init__(self): + """This prevents two instances of LocalConnection""" + assert not Globals.local_connection + self.conn_number = 0 # changed by SetConnections for server + + def __getattr__(self, name): + try: return globals()[name] + except KeyError: + builtins = globals()["__builtins__"] + try: + if type(builtins) is types.ModuleType: + return builtins.__dict__[name] + else: return builtins[name] + except KeyError: raise NameError, name + + def __setattr__(self, name, value): + globals()[name] = value + + def __delattr__(self, name): + del globals()[name] + + def __str__(self): return "LocalConnection" + + def reval(self, function_string, *args): + return apply(eval(function_string), args) + + def quit(self): pass + +Globals.local_connection = LocalConnection() +Globals.connections.append(Globals.local_connection) +# Following changed by server in SetConnections +Globals.connection_dict[0] = Globals.local_connection + + +class ConnectionRequest: + """Simple wrapper around a PipeConnection request""" + def __init__(self, function_string, num_args): + self.function_string = function_string + self.num_args = num_args + + def __str__(self): + return "ConnectionRequest: %s with %d arguments" % \ + (self.function_string, self.num_args) + + +class LowLevelPipeConnection(Connection): + """Routines for just sending objects from one side of pipe to another + + Each thing sent down the pipe is paired with a request number, + currently limited to be between 0 and 255. The size of each thing + should be less than 2^56. + + Each thing also has a type, indicated by one of the following + characters: + + o - generic object + i - iterator/generator of RORPs + f - file object + b - string + q - quit signal + t - TempFile + R - RPath + r - RORPath only + c - PipeConnection object + + """ + def __init__(self, inpipe, outpipe): + """inpipe is a file-type open for reading, outpipe for writing""" + self.inpipe = inpipe + self.outpipe = outpipe + + def __str__(self): + """Return string version + + This is actually an important function, because otherwise + requests to represent this object would result in "__str__" + being executed on the other side of the connection. + + """ + return "LowLevelPipeConnection" + + def _put(self, obj, req_num): + """Put an object into the pipe (will send raw if string)""" + Log.conn("sending", obj, req_num) + if type(obj) is types.StringType: self._putbuf(obj, req_num) + elif isinstance(obj, Connection): self._putconn(obj, req_num) + elif isinstance(obj, TempFile): self._puttempfile(obj, req_num) + elif isinstance(obj, RPath): self._putrpath(obj, req_num) + elif isinstance(obj, RORPath): self._putrorpath(obj, req_num) + elif ((hasattr(obj, "read") or hasattr(obj, "write")) + and hasattr(obj, "close")): self._putfile(obj, req_num) + elif hasattr(obj, "next"): self._putiter(obj, req_num) + else: self._putobj(obj, req_num) + + def _putobj(self, obj, req_num): + """Send a generic python obj down the outpipe""" + self._write("o", cPickle.dumps(obj, 1), req_num) + + def _putbuf(self, buf, req_num): + """Send buffer buf down the outpipe""" + self._write("b", buf, req_num) + + def _putfile(self, fp, req_num): + """Send a file to the client using virtual files""" + self._write("f", str(VirtualFile.new(fp)), req_num) + + def _putiter(self, iterator, req_num): + """Put an iterator through the pipe""" + self._write("i", str(VirtualFile.new(RORPIter.ToFile(iterator))), + req_num) + + def _puttempfile(self, tempfile, req_num): + """Put a tempfile into pipe. See _putrpath""" + tf_repr = (tempfile.conn.conn_number, tempfile.base, + tempfile.index, tempfile.data) + self._write("t", cPickle.dumps(tf_repr, 1), req_num) + + def _putrpath(self, rpath, req_num): + """Put an rpath into the pipe + + The rpath's connection will be encoded as its conn_number. It + and the other information is put in a tuple. + + """ + rpath_repr = (rpath.conn.conn_number, rpath.base, + rpath.index, rpath.data) + self._write("R", cPickle.dumps(rpath_repr, 1), req_num) + + def _putrorpath(self, rorpath, req_num): + """Put an rorpath into the pipe + + This is only necessary because if there is a .file attached, + it must be excluded from the pickling + + """ + rorpath_repr = (rorpath.index, rorpath.data) + self._write("r", cPickle.dumps(rorpath_repr, 1), req_num) + + def _putconn(self, pipeconn, req_num): + """Put a connection into the pipe + + A pipe connection is represented just as the integer (in + string form) of its connection number it is *connected to*. + + """ + self._write("c", str(pipeconn.conn_number), req_num) + + def _putquit(self): + """Send a string that takes down server""" + self._write("q", "", 255) + + def _write(self, headerchar, data, req_num): + """Write header and then data to the pipe""" + self.outpipe.write(headerchar + chr(req_num) + self._l2s(len(data))) + self.outpipe.write(data) + self.outpipe.flush() + + def _read(self, length): + """Read length bytes from inpipe, returning result""" + return self.inpipe.read(length) + + def _s2l(self, s): + """Convert string to long int""" + assert len(s) == 7 + l = 0L + for i in range(7): l = l*256 + ord(s[i]) + return l + + def _l2s(self, l): + """Convert long int to string""" + s = "" + for i in range(7): + l, remainder = divmod(l, 256) + s = chr(remainder) + s + assert remainder == 0 + return s + + def _get(self): + """Read an object from the pipe and return (req_num, value)""" + header_string = self.inpipe.read(9) + assert len(header_string) == 9, \ + "Error reading from pipe (problem probably originated remotely)" + try: + format_string, req_num, length = (header_string[0], + ord(header_string[1]), + self._s2l(header_string[2:])) + except IndexError: raise ConnectionError() + if format_string == "o": result = cPickle.loads(self._read(length)) + elif format_string == "b": result = self._read(length) + elif format_string == "f": + result = VirtualFile(self, int(self._read(length))) + elif format_string == "i": + result = RORPIter.FromFile(BufferedRead( + VirtualFile(self, int(self._read(length))))) + elif format_string == "t": + result = self._gettempfile(self._read(length)) + elif format_string == "r": + result = self._getrorpath(self._read(length)) + elif format_string == "R": result = self._getrpath(self._read(length)) + elif format_string == "c": + result = Globals.connection_dict[int(self._read(length))] + else: + assert format_string == "q", header_string + raise ConnectionQuit("Received quit signal") + Log.conn("received", result, req_num) + return (req_num, result) + + def _getrorpath(self, raw_rorpath_buf): + """Reconstruct RORPath object from raw data""" + index, data = cPickle.loads(raw_rorpath_buf) + return RORPath(index, data) + + def _gettempfile(self, raw_tf_buf): + """Return TempFile object indicated by raw_tf_buf""" + conn_number, base, index, data = cPickle.loads(raw_tf_buf) + return TempFile(Globals.connection_dict[conn_number], + base, index, data) + + def _getrpath(self, raw_rpath_buf): + """Return RPath object indicated by raw_rpath_buf""" + conn_number, base, index, data = cPickle.loads(raw_rpath_buf) + return RPath(Globals.connection_dict[conn_number], base, index, data) + + def _close(self): + """Close the pipes associated with the connection""" + self.outpipe.close() + self.inpipe.close() + + +class PipeConnection(LowLevelPipeConnection): + """Provide server and client functions for a Pipe Connection + + Both sides act as modules that allows for remote execution. For + instance, self.conn.pow(2,8) will execute the operation on the + server side. + + The only difference between the client and server is that the + client makes the first request, and the server listens first. + + """ + def __init__(self, inpipe, outpipe, conn_number = 0): + """Init PipeConnection + + conn_number should be a unique (to the session) integer to + identify the connection. For instance, all connections to the + client have conn_number 0. Other connections can use this + number to route commands to the correct process. + + """ + LowLevelPipeConnection.__init__(self, inpipe, outpipe) + self.conn_number = conn_number + self.unused_request_numbers = {} + for i in range(256): self.unused_request_numbers[i] = None + + def __str__(self): return "PipeConnection %d" % self.conn_number + + def get_response(self, desired_req_num): + """Read from pipe, responding to requests until req_num. + + Sometimes after a request is sent, the other side will make + another request before responding to the original one. In + that case, respond to the request. But return once the right + response is given. + + """ + while 1: + try: req_num, object = self._get() + except ConnectionQuit: + self._put("quitting", self.get_new_req_num()) + return + if req_num == desired_req_num: return object + else: + assert isinstance(object, ConnectionRequest) + self.answer_request(object, req_num) + + def answer_request(self, request, req_num): + """Put the object requested by request down the pipe""" + del self.unused_request_numbers[req_num] + argument_list = [] + for i in range(request.num_args): + arg_req_num, arg = self._get() + assert arg_req_num == req_num + argument_list.append(arg) + try: result = apply(eval(request.function_string), argument_list) + except: result = self.extract_exception() + self._put(result, req_num) + self.unused_request_numbers[req_num] = None + + def extract_exception(self): + """Return active exception""" + Log("Sending back exception: \n" + + "".join(traceback.format_tb(sys.exc_info()[2])), 2) + return sys.exc_info()[1] + + def Server(self): + """Start server's read eval return loop""" + Globals.server = 1 + Globals.connections.append(self) + Log("Starting server", 6) + self.get_response(-1) + + def reval(self, function_string, *args): + """Execute command on remote side + + The first argument should be a string that evaluates to a + function, like "pow", and the remaining are arguments to that + function. + + """ + req_num = self.get_new_req_num() + self._put(ConnectionRequest(function_string, len(args)), req_num) + for arg in args: self._put(arg, req_num) + result = self.get_response(req_num) + self.unused_request_numbers[req_num] = None + if isinstance(result, Exception): raise result + else: return result + + def get_new_req_num(self): + """Allot a new request number and return it""" + if not self.unused_request_numbers: + raise ConnectionError("Exhaused possible connection numbers") + req_num = self.unused_request_numbers.keys()[0] + del self.unused_request_numbers[req_num] + return req_num + + def quit(self): + """Close the associated pipes and tell server side to quit""" + assert not Globals.server + self._putquit() + self._get() + self._close() + + def __getattr__(self, name): + """Intercept attributes to allow for . invocation""" + return EmulateCallable(self, name) + + +class RedirectedConnection(Connection): + """Represent a connection more than one move away + + For instance, suppose things are connected like this: S1---C---S2. + If Server1 wants something done by Server2, it will have to go + through the Client. So on S1's side, S2 will be represented by a + RedirectedConnection. + + """ + def __init__(self, conn_number, routing_number = 0): + """RedirectedConnection initializer + + Returns a RedirectedConnection object for the given + conn_number, where commands are routed through the connection + with the given routing_number. 0 is the client, so the + default shouldn't have to be changed. + + """ + self.conn_number = conn_number + self.routing_number = routing_number + self.routing_conn = Globals.connection_dict[routing_number] + + def __str__(self): + return "RedirectedConnection %d,%d" % (self.conn_number, + self.routing_number) + + def __getattr__(self, name): + return EmulateCallable(self.routing_conn, + "Globals.get_dict_val('connection_dict', %d).%s" + % (self.conn_number, name)) + + +class EmulateCallable: + """This is used by PipeConnection in calls like conn.os.chmod(foo)""" + def __init__(self, connection, name): + self.connection = connection + self.name = name + def __call__(self, *args): + return apply(self.connection.reval, (self.name,) + args) + def __getattr__(self, attr_name): + return EmulateCallable(self.connection, + "%s.%s" % (self.name, attr_name)) + + +class VirtualFile: + """When the client asks for a file over the connection, it gets this + + The returned instance then forwards requests over the connection. + The class's dictionary is used by the server to associate each + with a unique file number. + + """ + #### The following are used by the server + vfiles = {} + counter = 0 + + def getbyid(cls, id): + return cls.vfiles[id] + getbyid = classmethod(getbyid) + + def readfromid(cls, id, length): + return cls.vfiles[id].read(length) + readfromid = classmethod(readfromid) + + def writetoid(cls, id, buffer): + return cls.vfiles[id].write(buffer) + writetoid = classmethod(writetoid) + + def closebyid(cls, id): + fp = cls.vfiles[id] + del cls.vfiles[id] + return fp.close() + closebyid = classmethod(closebyid) + + def new(cls, fileobj): + """Associate a new VirtualFile with a read fileobject, return id""" + count = cls.counter + cls.vfiles[count] = fileobj + cls.counter = count + 1 + return count + new = classmethod(new) + + + #### And these are used by the client + def __init__(self, connection, id): + self.connection = connection + self.id = id + + def read(self, length = -1): + return self.connection.VirtualFile.readfromid(self.id, length) + + def write(self, buf): + return self.connection.VirtualFile.writetoid(self.id, buf) + + def close(self): + return self.connection.VirtualFile.closebyid(self.id) diff --git a/rdiff-backup/rdiff_backup/destructive_stepping.py b/rdiff-backup/rdiff_backup/destructive_stepping.py new file mode 100644 index 0000000..80d274e --- /dev/null +++ b/rdiff-backup/rdiff_backup/destructive_stepping.py @@ -0,0 +1,250 @@ +from __future__ import generators +execfile("rorpiter.py") + +####################################################################### +# +# destructive-stepping - Deal with side effects from traversing trees +# + +class DSRPath(RPath): + """Destructive Stepping RPath + + Sometimes when we traverse the directory tree, even when we just + want to read files, we have to change things, like the permissions + of a file or directory in order to read it, or the file's access + times. This class is like an RPath, but the permission and time + modifications are delayed, so that they can be done at the very + end when they won't be disturbed later. + + """ + def __init__(self, *args): + self.perms_delayed = self.times_delayed = None + RPath.__init__(self, *args) + + def __getstate__(self): + """Return picklable state. See RPath __getstate__.""" + assert self.conn is Globals.local_connection # Can't pickle a conn + pickle_dict = {} + for attrib in ['index', 'data', 'perms_delayed', 'times_delayed', + 'newperms', 'newtimes', 'path', 'base']: + if self.__dict__.has_key(attrib): + pickle_dict[attrib] = self.__dict__[attrib] + return pickle_dict + + def __setstate__(self, pickle_dict): + """Set state from object produced by getstate""" + self.conn = Globals.local_connection + for attrib in pickle_dict.keys(): + self.__dict__[attrib] = pickle_dict[attrib] + + def delay_perm_writes(self): + """Signal that permission writing should be delayed until the end""" + self.perms_delayed = 1 + self.newperms = None + + def delay_time_changes(self): + """Signal that time changes should also be delayed until the end""" + self.times_delayed = 1 + self.newtimes = None + + def chmod(self, permissions): + """Change permissions, delaying if self.perms_delayed is set""" + if self.perms_delayed: + self.newperms = 1 + self.data['perms'] = permissions + else: RPath.chmod(self, permissions) + + def chmod_bypass(self, permissions): + """Change permissions without updating the data dictionary""" + self.conn.os.chmod(self.path, permissions) + self.perms_delayed = self.newperms = 1 + + def remember_times(self): + """Mark times as changed so they can be restored later""" + self.times_delayed = self.newtimes = 1 + + def settime(self, accesstime, modtime): + """Change times, delaying if self.times_delayed is set""" + if self.times_delayed: + self.newtimes = 1 + self.data['atime'] = accesstime + self.data['mtime'] = modtime + else: RPath.settime(self, accesstime, modtime) + + def settime_bypass(self, accesstime, modtime): + """Change times without updating data dictionary""" + self.conn.os.utime(self.path, (accesstime, modtime)) + + def setmtime(self, modtime): + """Change mtime, delaying if self.times_delayed is set""" + if self.times_delayed: + self.newtimes = 1 + self.data['mtime'] = modtime + else: RPath.setmtime(self, modtime) + + def setmtime_bypass(self, modtime): + """Change mtime without updating data dictionary""" + self.conn.os.utime(self.path, (time.time(), modtime)) + + def restoretimes(self): + """Write times in self.data back to file""" + RPath.settime(self, self.data['atime'], self.data['mtime']) + + def restoreperms(self): + """Write permissions in self.data back to file""" + RPath.chmod(self, self.data['perms']) + + def write_changes(self): + """Write saved up permission/time changes""" + if not self.lstat(): return # File has been deleted in meantime + + if self.perms_delayed and self.newperms: + self.conn.os.chmod(self.path, self.getperms()) + if self.times_delayed: + if self.data.has_key('atime'): + self.settime_bypass(self.getatime(), self.getmtime()) + elif self.newtimes and self.data.has_key('mtime'): + self.setmtime_bypass(self.getmtime()) + + +class DestructiveStepping: + """Destructive stepping""" + def initialize(dsrpath, source): + """Change permissions of dsrpath, possibly delay writes + + Abort if we need to access something and can't. If the file + is on the source partition, just log warning and return true. + Return false if everything good to go. + + """ + if not source or Globals.change_source_perms: + dsrpath.delay_perm_writes() + + def warn(err): + Log("Received error '%s' when dealing with file %s, skipping..." + % (err, dsrpath.path), 1) + + def abort(): + Log.FatalError("Missing access to file %s - aborting." % + dsrpath.path) + + def try_chmod(perms): + """Try to change the perms. If fail, return error.""" + try: dsrpath.chmod_bypass(perms) + except os.error, err: return err + return None + + if dsrpath.isreg() and not dsrpath.readable(): + if source: + if Globals.change_source_perms and dsrpath.isowner(): + err = try_chmod(0400) + if err: + warn(err) + return 1 + else: + warn("No read permissions") + return 1 + elif not Globals.change_mirror_perms or try_chmod(0600): abort() + elif dsrpath.isdir(): + if source and (not dsrpath.readable() or not dsrpath.executable()): + if Globals.change_source_perms and dsrpath.isowner(): + err = try_chmod(0500) + if err: + warn(err) + return 1 + else: + warn("No read or exec permissions") + return 1 + elif not source and not dsrpath.hasfullperms(): + if Globals.change_mirror_perms: try_chmod(0700) + + # Permissions above; now try to preserve access times if necessary + if (source and (Globals.preserve_atime or + Globals.change_source_perms) or + not source): + # These are the circumstances under which we will have to + # touch up a file's times after we are done with it + dsrpath.remember_times() + return None + + def Finalizer(initial_state = None): + """Return a finalizer that can work on an iterator of dsrpaths + + The reason we have to use an IterTreeReducer is that some files + should be updated immediately, but for directories we sometimes + need to update all the files in the directory before finally + coming back to it. + + """ + return IterTreeReducer(lambda x: None, lambda x,y: None, None, + lambda dsrpath, x, y: dsrpath.write_changes(), + initial_state) + + def isexcluded(dsrp, source): + """Return true if given DSRPath is excluded/ignored + + If source = 1, treat as source file, otherwise treat as + destination file. + + """ + if Globals.exclude_device_files and dsrp.isdev(): return 1 + + if source: exclude_regexps = Globals.exclude_regexps + else: exclude_regexps = Globals.exclude_mirror_regexps + + for regexp in exclude_regexps: + if regexp.match(dsrp.path): + Log("Excluding %s" % dsrp.path, 6) + return 1 + return None + + def Iterate_from(baserp, source, starting_index = None): + """Iterate dsrps from baserp, skipping any matching exclude_regexps + + includes only dsrps with indicies greater than starting_index + if starting_index is not None. + + """ + def helper_starting_from(dsrpath): + """Like helper, but only start iterating after starting_index""" + if dsrpath.index > starting_index: + # Past starting_index, revert to normal helper + for dsrp in helper(dsrpath): yield dsrp + elif dsrpath.index == starting_index[:len(dsrpath.index)]: + # May encounter starting index on this branch + if (not DestructiveStepping.isexcluded(dsrpath, source) and + not DestructiveStepping.initialize(dsrpath, source)): + if dsrpath.isdir(): + dir_listing = dsrpath.listdir() + dir_listing.sort() + for filename in dir_listing: + for dsrp in helper_starting_from( + dsrpath.append(filename)): + yield dsrp + + def helper(dsrpath): + if (not DestructiveStepping.isexcluded(dsrpath, source) and + not DestructiveStepping.initialize(dsrpath, source)): + yield dsrpath + if dsrpath.isdir(): + dir_listing = dsrpath.listdir() + dir_listing.sort() + for filename in dir_listing: + for dsrp in helper(dsrpath.append(filename)): + yield dsrp + + base_dsrpath = DSRPath(baserp.conn, baserp.base, + baserp.index, baserp.data) + if starting_index is None: return helper(base_dsrpath) + else: return helper_starting_from(base_dsrpath) + + def Iterate_with_Finalizer(baserp, source): + """Like Iterate_from, but finalize each dsrp afterwards""" + finalize = DestructiveStepping.Finalizer() + for dsrp in DestructiveStepping.Iterate_from(baserp, source): + yield dsrp + finalize(dsrp) + finalize.getresult() + + +MakeStatic(DestructiveStepping) diff --git a/rdiff-backup/rdiff_backup/filelist.py b/rdiff-backup/rdiff_backup/filelist.py new file mode 100644 index 0000000..7a660c3 --- /dev/null +++ b/rdiff-backup/rdiff_backup/filelist.py @@ -0,0 +1,106 @@ +from __future__ import generators +execfile("manage.py") + +####################################################################### +# +# filelist - Some routines that help with operations over files listed +# in standard input instead of over whole directories. +# + +class FilelistError(Exception): pass + +class Filelist: + """Many of these methods have analogs in highlevel.py""" + def File2Iter(fp, baserp): + """Convert file obj with one pathname per line into rpiter + + Closes fp when done. Given files are added to baserp. + + """ + while 1: + line = fp.readline() + if not line: break + if line[-1] == "\n": line = line[:-1] # strip trailing newline + if not line: continue # skip blank lines + elif line[0] == "/": raise FilelistError( + "Read in absolute file name %s." % line) + yield baserp.append(line) + assert not fp.close(), "Error closing filelist fp" + + def Mirror(src_rpath, dest_rpath, rpiter): + """Copy files in fileiter from src_rpath to dest_rpath""" + sigiter = dest_rpath.conn.Filelist.get_sigs(dest_rpath, rpiter) + diffiter = Filelist.get_diffs(src_rpath, sigiter) + dest_rpath.conn.Filelist.patch(dest_rpath, diffiter) + dest_rpath.setdata() + + def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath): + """Mirror + put increment in tree based at inc_rpath""" + sigiter = dest_rpath.conn.Filelist.get_sigs(dest_rpath, rpiter) + diffiter = Filelist.get_diffs(src_rpath, sigiter) + dest_rpath.conn.Filelist.patch_and_increment(dest_rpath, diffiter, + inc_rpath) + dest_rpath.setdata() + + def get_sigs(dest_rpbase, rpiter): + """Get signatures of file analogs in rpiter + + This is meant to be run on the destination side. Only the + extention part of the rps in rpiter will be used; the base is + ignored. + + """ + def dest_iter(src_iter): + for src_rp in src_iter: yield dest_rpbase.new_index(src_rp.index) + return RORPIter.Signatures(dest_iter()) + + def get_diffs(src_rpbase, sigiter): + """Get diffs based on sigiter and files in src_rpbase + + This should be run on the local side. + + """ + for sig_rorp in sigiter: + new_rp = src_rpbase.new_index(sig_rorp.index) + yield RORPIter.diffonce(sig_rorp, new_rp) + + def patch(dest_rpbase, diffiter): + """Process diffs in diffiter and update files in dest_rbpase. + + Run remotely. + + """ + for diff_rorp in diffiter: + basisrp = dest_rpbase.new_index(diff_rorp.index) + if basisrp.lstat(): Filelist.make_subdirs(basisrp) + Log("Processing %s" % basisrp.path, 7) + RORPIter.patchonce(dest_rpbase, basisrp, diff_rorp) + + def patch_and_increment(dest_rpbase, diffiter, inc_rpbase): + """Apply diffs in diffiter to dest_rpbase, and increment to inc_rpbase + + Also to be run remotely. + + """ + for diff_rorp in diffiter: + basisrp = dest_rpbase.new_index(diff_rorp.index) + if diff_rorp.lstat(): Filelist.make_subdirs(basisrp) + Log("Processing %s" % basisrp.path, 7) + # XXX This isn't done yet... + + def make_subdirs(rpath): + """Make sure that all the directories under the rpath exist + + This function doesn't try to get the permissions right on the + underlying directories, just do the minimum to make sure the + file can be created. + + """ + dirname = rpath.dirsplit()[0] + if dirname == '.' or dirname == '': return + dir_rp = RPath(rpath.conn, dirname) + Filelist.make_subdirs(dir_rp) + if not dir_rp.lstat(): dir_rp.mkdir() + + +MakeStatic(Filelist) diff --git a/rdiff-backup/rdiff_backup/header.py b/rdiff-backup/rdiff_backup/header.py new file mode 100644 index 0000000..31b3ff0 --- /dev/null +++ b/rdiff-backup/rdiff_backup/header.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python +# +# rdiff-backup -- Mirror files while keeping incremental changes +# Version 0.6.0 released March 14, 2002 +# Copyright (C) 2001 Ben Escoto +# +# This program is licensed under the GNU General Public License (GPL). +# Distributions of rdiff-backup usually include a copy of the GPL in a +# file called COPYING. The GPL is also available online at +# http://www.gnu.org/copyleft/gpl.html. +# +# Please send mail to me or the mailing list if you find bugs or have +# any suggestions. + +from __future__ import nested_scopes, generators +import os, stat, time, sys, getopt, re, cPickle, types, shutil, sha, marshal, traceback, popen2, tempfile + + diff --git a/rdiff-backup/rdiff_backup/highlevel.py b/rdiff-backup/rdiff_backup/highlevel.py new file mode 100644 index 0000000..55fe007 --- /dev/null +++ b/rdiff-backup/rdiff_backup/highlevel.py @@ -0,0 +1,288 @@ +from __future__ import generators +execfile("filelist.py") + +####################################################################### +# +# highlevel - High level functions for mirroring, mirror & inc, etc. +# + +class SkipFileException(Exception): + """Signal that the current file should be skipped but then continue + + This exception will often be raised when there is problem reading + an individual file, but it makes sense for the rest of the backup + to keep going. + + """ + pass + + +class HighLevel: + """High level static functions + + The design of some of these functions is represented on the + accompanying diagram. + + """ + def Mirror(src_rpath, dest_rpath, checkpoint = 1, session_info = None): + """Turn dest_rpath into a copy of src_rpath + + Checkpoint true means to checkpoint periodically, otherwise + not. If session_info is given, try to resume Mirroring from + that point. + + """ + SourceS = src_rpath.conn.HLSourceStruct + DestS = dest_rpath.conn.HLDestinationStruct + + SourceS.set_session_info(session_info) + DestS.set_session_info(session_info) + src_init_dsiter = SourceS.split_initial_dsiter(src_rpath) + dest_sigiter = DestS.get_sigs(dest_rpath, src_init_dsiter) + diffiter = SourceS.get_diffs_and_finalize(dest_sigiter) + DestS.patch_and_finalize(dest_rpath, diffiter, checkpoint) + + dest_rpath.setdata() + + def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath, + session_info = None): + """Mirror + put increments in tree based at inc_rpath""" + SourceS = src_rpath.conn.HLSourceStruct + DestS = dest_rpath.conn.HLDestinationStruct + + SourceS.set_session_info(session_info) + DestS.set_session_info(session_info) + if not session_info: dest_rpath.conn.SaveState.touch_last_file() + src_init_dsiter = SourceS.split_initial_dsiter(src_rpath) + dest_sigiter = DestS.get_sigs(dest_rpath, src_init_dsiter) + diffiter = SourceS.get_diffs_and_finalize(dest_sigiter) + DestS.patch_increment_and_finalize(dest_rpath, diffiter, inc_rpath) + + dest_rpath.setdata() + inc_rpath.setdata() + + def Restore(rest_time, mirror_base, baseinc_tup, target_base): + """Like Restore.RestoreRecursive but check arguments""" + if not isinstance(target_base, DSRPath): + target_base = DSRPath(target_base.conn, target_base.base, + target_base.index, target_base.data) + Restore.RestoreRecursive(rest_time, mirror_base, + baseinc_tup, target_base) + +MakeStatic(HighLevel) + + +class HLSourceStruct: + """Hold info used by HL on the source side""" + _session_info = None # set to si if resuming + def set_session_info(cls, session_info): + cls._session_info = session_info + + def iterate_from(cls, rpath): + """Supply more aruments to DestructiveStepping.Iterate_from""" + if cls._session_info: + return DestructiveStepping.Iterate_from(rpath, 1, + cls._session_info.last_index) + else: return DestructiveStepping.Iterate_from(rpath, 1) + + def split_initial_dsiter(cls, rpath): + """Set iterators of all dsrps from rpath, returning one""" + dsiter = cls.iterate_from(rpath) + initial_dsiter1, cls.initial_dsiter2 = Iter.multiplex(dsiter, 2) + return initial_dsiter1 + + def get_diffs_and_finalize(cls, sigiter): + """Return diffs and finalize any dsrp changes remaining + + Return a rorpiterator with files included of signatures of + dissimilar files. This is the last operation run on the local + filestream, so finalize dsrp writes. + + """ + collated = RORPIter.CollateIterators(cls.initial_dsiter2, sigiter) + finalizer = DestructiveStepping.Finalizer() + def diffs(): + for dsrp, dest_sig in collated: + try: + if dest_sig: + if dest_sig.isplaceholder(): yield dest_sig + else: yield RORPIter.diffonce(dest_sig, dsrp) + if dsrp: finalizer(dsrp) + except (IOError, OSError, RdiffException): + Log.exception() + Log("Error processing %s, skipping" % + str(dest_sig.index), 2) + finalizer.getresult() + return diffs() + +MakeClass(HLSourceStruct) + + +class HLDestinationStruct: + """Hold info used by HL on the destination side""" + _session_info = None # set to si if resuming + def set_session_info(cls, session_info): + cls._session_info = session_info + + def iterate_from(cls, rpath): + """Supply more arguments to DestructiveStepping.Iterate_from""" + if cls._session_info: + return DestructiveStepping.Iterate_from(rpath, None, + cls._session_info.last_index) + else: return DestructiveStepping.Iterate_from(rpath, None) + + def split_initial_dsiter(cls, rpath): + """Set initial_dsiters (iteration of all dsrps from rpath)""" + dsiter = cls.iterate_from(rpath) + result, cls.initial_dsiter2 = Iter.multiplex(dsiter, 2) + return result + + def get_dissimilar(cls, baserp, src_init_iter, dest_init_iter): + """Get dissimilars + + Returns an iterator which enumerates the dsrps which are + different on the source and destination ends. The dsrps do + not necessarily exist on the destination end. + + Also, to prevent the system from getting backed up on the + remote end, if we don't get enough dissimilars, stick in a + placeholder every so often, like fiber. The more + placeholders, the more bandwidth used, but if there aren't + enough, lots of memory will be used because files will be + accumulating on the source side. How much will accumulate + will depend on the Globals.conn_bufsize value. + + """ + collated = RORPIter.CollateIterators(src_init_iter, dest_init_iter) + def generate_dissimilar(): + counter = 0 + for src_rorp, dest_dsrp in collated: + if not dest_dsrp: + dsrp = DSRPath(baserp.conn, baserp.base, src_rorp.index) + if dsrp.lstat(): + Log("Warning: Found unexpected destination file %s." + % dsrp.path, 2) + if DestructiveStepping.isexcluded(dsrp, None): continue + counter = 0 + yield dsrp + elif not src_rorp or not src_rorp == dest_dsrp: + counter = 0 + yield dest_dsrp + else: # source and destinition both exist and are same + if counter == 20: + placeholder = RORPath(src_rorp.index) + placeholder.make_placeholder() + counter = 0 + yield placeholder + else: counter += 1 + return generate_dissimilar() + + def get_sigs(cls, baserp, src_init_iter): + """Return signatures of all dissimilar files""" + dest_iters1 = cls.split_initial_dsiter(baserp) + dissimilars = cls.get_dissimilar(baserp, src_init_iter, dest_iters1) + return RORPIter.Signatures(dissimilars) + + def get_dsrp(cls, dest_rpath, index): + """Return initialized dsrp based on dest_rpath with given index""" + dsrp = DSRPath(dest_rpath.conn, dest_rpath.base, index) + DestructiveStepping.initialize(dsrp, None) + return dsrp + + def get_finalizer(cls): + """Return finalizer, starting from session info if necessary""" + init_state = cls._session_info and cls._session_info.finalizer_state + return DestructiveStepping.Finalizer(init_state) + + def get_ITR(cls, inc_rpath): + """Return ITR, starting from state if necessary""" + init_state = cls._session_info and cls._session_info.ITR_state + return Inc.make_patch_increment_ITR(inc_rpath, init_state) + + def patch_and_finalize(cls, dest_rpath, diffs, checkpoint = 1): + """Apply diffs and finalize""" + collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) + finalizer = cls.get_finalizer() + dsrp = None + + def error_checked(): + """Inner writing loop, check this for errors""" + indexed_tuple = collated.next() + Log("Processing %s" % str(indexed_tuple), 7) + diff_rorp, dsrp = indexed_tuple + if not dsrp: + dsrp = cls.get_dsrp(dest_rpath, diff_rorp.index) + DestructiveStepping.initialize(dsrp, None) + if diff_rorp and not diff_rorp.isplaceholder(): + RORPIter.patchonce_action(None, dsrp, diff_rorp).execute() + finalizer(dsrp) + return dsrp + + try: + while 1: + try: dsrp = cls.check_skip_error(error_checked) + except StopIteration: break + if checkpoint: SaveState.checkpoint_mirror(finalizer, dsrp) + except: cls.handle_last_error(dsrp, finalizer) + finalizer.getresult() + if checkpoint: SaveState.checkpoint_remove() + + def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath): + """Apply diffs, write increment if necessary, and finalize""" + collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2) + finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath) + dsrp = None + + def error_checked(): + """Inner writing loop, catch variety of errors from this""" + indexed_tuple = collated.next() + Log("Processing %s" % str(indexed_tuple), 7) + diff_rorp, dsrp = indexed_tuple + if not dsrp: + dsrp = cls.get_dsrp(dest_rpath, indexed_tuple.index) + DestructiveStepping.initialize(dsrp, None) + indexed_tuple = IndexedTuple(indexed_tuple.index, + (diff_rorp, dsrp)) + if diff_rorp and diff_rorp.isplaceholder(): + indexed_tuple = IndexedTuple(indexed_tuple.index, + (None, dsrp)) + ITR(indexed_tuple) + finalizer(dsrp) + return dsrp + + try: + while 1: + try: dsrp = cls.check_skip_error(error_checked) + except StopIteration: break + SaveState.checkpoint_inc_backup(ITR, finalizer, dsrp) + except: cls.handle_last_error(dsrp, finalizer, ITR) + ITR.getresult() + finalizer.getresult() + SaveState.checkpoint_remove() + + def check_skip_error(cls, thunk): + """Run thunk, catch certain errors skip files""" + try: return thunk() + except (IOError, OSError, SkipFileException), exp: + Log.exception() + if (not isinstance(exp, IOError) or + (isinstance(exp, IOError) and + (exp[0] in [2, # Means that a file is missing + 5, # Reported by docv (see list) + 13, # Permission denied IOError + 26] # Requested by Campbell (see list) - + # happens on some NT systems + ))): + Log("Skipping file", 2) + return None + else: raise + + def handle_last_error(cls, dsrp, finalizer, ITR = None): + """If catch fatal error, try to checkpoint before exiting""" + Log.exception(1) + if ITR: SaveState.checkpoint_inc_backup(ITR, finalizer, dsrp, 1) + else: SaveState.checkpoint_mirror(finalizer, dsrp, 1) + SaveState.touch_last_file_definitive() + raise + +MakeClass(HLDestinationStruct) diff --git a/rdiff-backup/rdiff_backup/increment.py b/rdiff-backup/rdiff_backup/increment.py new file mode 100644 index 0000000..4ed6a39 --- /dev/null +++ b/rdiff-backup/rdiff_backup/increment.py @@ -0,0 +1,180 @@ +execfile("destructive_stepping.py") + +####################################################################### +# +# increment - Provides Inc class, which writes increment files +# +# This code is what writes files ending in .diff, .snapshot, etc. +# + +class Inc: + """Class containing increment functions""" + def Increment_action(new, mirror, incpref): + """Main file incrementing function, returns RobustAction + + new is the file on the active partition, + mirror is the mirrored file from the last backup, + incpref is the prefix of the increment file. + + This function basically moves mirror -> incpref. + + """ + if not (new and new.lstat() or mirror.lstat()): + return Robust.null_action # Files deleted in meantime, do nothing + + Log("Incrementing mirror file " + mirror.path, 5) + if ((new and new.isdir()) or mirror.isdir()) and not incpref.isdir(): + incpref.mkdir() + + if not mirror.lstat(): return Inc.makemissing_action(incpref) + elif mirror.isdir(): return Inc.makedir_action(mirror, incpref) + elif new.isreg() and mirror.isreg(): + return Inc.makediff_action(new, mirror, incpref) + else: return Inc.makesnapshot_action(mirror, incpref) + + def Increment(new, mirror, incpref): + Inc.Increment_action(new, mirror, incpref).execute() + + def makemissing_action(incpref): + """Signify that mirror file was missing""" + return RobustAction(lambda: None, + Inc.get_inc_ext(incpref, "missing").touch, + lambda exp: None) + + def makesnapshot_action(mirror, incpref): + """Copy mirror to incfile, since new is quite different""" + snapshotrp = Inc.get_inc_ext(incpref, "snapshot") + return Robust.copy_with_attribs_action(mirror, snapshotrp) + + def makediff_action(new, mirror, incpref): + """Make incfile which is a diff new -> mirror""" + diff = Inc.get_inc_ext(incpref, "diff") + return Robust.chain([Rdiff.write_delta_action(new, mirror, diff), + Robust.copy_attribs_action(mirror, diff)]) + + def makedir_action(mirrordir, incpref): + """Make file indicating directory mirrordir has changed""" + dirsign = Inc.get_inc_ext(incpref, "dir") + def final(): + dirsign.touch() + RPath.copy_attribs(mirrordir, dirsign) + return RobustAction(lambda: None, final, dirsign.delete) + + def get_inc_ext(rp, typestr): + """Return RPath/DSRPath like rp but with inc/time extension + + If the file exists, then probably a previous backup has been + aborted. We then keep asking FindTime to get a time later + than the one that already has an inc file. + + """ + def get_newinc(timestr): + """Get new increment rp with given time suffix""" + addtostr = lambda s: "%s.%s.%s" % (s, timestr, typestr) + if rp.index: + return rp.__class__(rp.conn, rp.base, rp.index[:-1] + + (addtostr(rp.index[-1]),)) + else: return rp.__class__(rp.conn, addtostr(rp.base), rp.index) + + inctime = 0 + while 1: + inctime = Resume.FindTime(rp.index, inctime) + incrp = get_newinc(Time.timetostring(inctime)) + if not incrp.lstat(): return incrp + + def make_patch_increment_ITR(inc_rpath, initial_state = None): + """Return IterTreeReducer that patches and increments + + This has to be an ITR because directories that have files in + them changed are flagged with an increment marker. There are + four possibilities as to the order: + + 1. Normal file -> Normal file: right away + 2. Directory -> Directory: wait until files in the directory + are processed, as we won't know whether to add a marker + until the end. + 3. Normal file -> Directory: right away, so later files will + have a directory to go into. + 4. Directory -> Normal file: Wait until the end, so we can + process all the files in the directory. + + """ + def base_init(indexed_tuple): + """Patch if appropriate, return (a,b) tuple + + a is true if found directory and thus didn't take action + + if a is false, b is true if some changes were made + + if a is true, b is the rp of a temporary file used to hold + the diff_rorp's data (for dir -> normal file change), and + false if none was necessary. + + """ + diff_rorp, dsrp = indexed_tuple + incpref = inc_rpath.new_index(indexed_tuple.index) + if dsrp.isdir(): return init_dir(dsrp, diff_rorp, incpref) + else: return init_non_dir(dsrp, diff_rorp, incpref) + + def init_dir(dsrp, diff_rorp, incpref): + """Initial processing of a directory + + Make the corresponding directory right away, but wait + until the end to write the replacement. However, if the + diff_rorp contains data, we must write it locally before + continuing, or else that data will be lost in the stream. + + """ + if not (incpref.lstat() and incpref.isdir()): incpref.mkdir() + if diff_rorp and diff_rorp.isreg() and diff_rorp.file: + tf = TempFileManager.new(dsrp) + RPathStatic.copy_with_attribs(diff_rorp, tf) + tf.set_attached_filetype(diff_rorp.get_attached_filetype()) + return (1, tf) + else: return (1, None) + + def init_non_dir(dsrp, diff_rorp, incpref): + """Initial processing of non-directory + + If a reverse diff is called for it is generated by apply + the forwards diff first on a temporary file. + + """ + if diff_rorp: + if dsrp.isreg() and diff_rorp.isreg(): + tf = TempFileManager.new(dsrp) + def init_thunk(): + Rdiff.patch_with_attribs_action(dsrp, diff_rorp, + tf).execute() + Inc.Increment_action(tf, dsrp, incpref).execute() + Robust.make_tf_robustaction(init_thunk, (tf,), + (dsrp,)).execute() + else: + Robust.chain([Inc.Increment_action(diff_rorp, dsrp, + incpref), + RORPIter.patchonce_action( + None, dsrp, diff_rorp)]).execute() + return (None, 1) + return (None, None) + + def base_final(base_tuple, base_init_tuple, changed): + """Patch directory if not done, return true iff made change""" + if base_init_tuple[0]: # was directory + diff_rorp, dsrp = base_tuple + if changed or diff_rorp: + if base_init_tuple[1]: diff_rorp = base_init_tuple[1] + Inc.Increment(diff_rorp, dsrp, + inc_rpath.new_index(base_tuple.index)) + if diff_rorp: + RORPIter.patchonce_action(None, dsrp, + diff_rorp).execute() + if isinstance(diff_rorp, TempFile): diff_rorp.delete() + return 1 + return None + else: # changed iff base_init_tuple says it was + return base_init_tuple[1] + + return IterTreeReducer(base_init, lambda x,y: x or y, None, + base_final, initial_state) + +MakeStatic(Inc) diff --git a/rdiff-backup/rdiff_backup/iterfile.py b/rdiff-backup/rdiff_backup/iterfile.py new file mode 100644 index 0000000..21629b2 --- /dev/null +++ b/rdiff-backup/rdiff_backup/iterfile.py @@ -0,0 +1,235 @@ +execfile("ttime.py") +import cPickle + +####################################################################### +# +# iterfile - Convert an iterator to a file object and vice-versa +# + +class IterFileException(Exception): pass + +class UnwrapFile: + """Contains some basic methods for parsing a file containing an iter""" + def __init__(self, file): + self.file = file + + def _s2l(self, s): + """Convert string to long int""" + assert len(s) == 7 + l = 0L + for i in range(7): l = l*256 + ord(s[i]) + return l + + def _get(self): + """Return pair (type, data) next in line on the file + + type is a single character which is either "o" for object, "f" + for file, "c" for a continution of a file, or None if no more + data can be read. Data is either the file's data, if type is + "c" or "f", or the actual object if the type is "o". + + """ + header = self.file.read(8) + if not header: return None, None + assert len(header) == 8, "Header is only %d bytes" % len(header) + type, length = header[0], self._s2l(header[1:]) + buf = self.file.read(length) + if type == "o": return type, cPickle.loads(buf) + else: return type, buf + + +class IterWrappingFile(UnwrapFile): + """An iterator generated from a file. + + Initialize with a file type object, and then it will return the + elements of the file in order. + + """ + def __init__(self, file): + UnwrapFile.__init__(self, file) + self.currently_in_file = None + + def __iter__(self): return self + + def next(self): + if self.currently_in_file: + self.currently_in_file.close() # no error checking by this point + type, data = self._get() + if not type: raise StopIteration + if type == "o": return data + elif type == "f": + file = IterVirtualFile(self, data) + if data: self.currently_in_file = file + else: self.currently_in_file = None + return file + else: raise IterFileException("Bad file type %s" % type) + + +class IterVirtualFile(UnwrapFile): + """Another version of a pretend file + + This is returned by IterWrappingFile when a file is embedded in + the main file that the IterWrappingFile is based around. + + """ + def __init__(self, iwf, initial_data): + """Initializer + + initial_data is the data from the first block of the file. + iwf is the iter wrapping file that spawned this + IterVirtualFile. + + """ + UnwrapFile.__init__(self, iwf.file) + self.iwf = iwf + self.bufferlist = [initial_data] + self.bufferlen = len(initial_data) + self.closed = None + + def check_consistency(self): + l = len("".join(self.bufferlist)) + assert l == self.bufferlen, \ + "Length of IVF bufferlist doesn't match (%s, %s)" % \ + (l, self.bufferlen) + + def read(self, length): + assert not self.closed + if self.iwf.currently_in_file: + while length >= self.bufferlen: + if not self.addtobuffer(): break + + real_len = min(length, self.bufferlen) + combined_buffer = "".join(self.bufferlist) + assert len(combined_buffer) == self.bufferlen, \ + (len(combined_buffer), self.bufferlen) + self.bufferlist = [combined_buffer[real_len:]] + self.bufferlen = self.bufferlen - real_len + return combined_buffer[:real_len] + + def addtobuffer(self): + """Read a chunk from the file and add it to the buffer""" + assert self.iwf.currently_in_file + type, data = self._get() + assert type == "c", "Type is %s instead of c" % type + if data: + self.bufferlen = self.bufferlen + len(data) + self.bufferlist.append(data) + return 1 + else: + self.iwf.currently_in_file = None + return None + + def close(self): + """Currently just reads whats left and discards it""" + while self.iwf.currently_in_file: + self.addtobuffer() + self.bufferlist = [] + self.bufferlen = 0 + self.closed = 1 + + +class FileWrappingIter: + """A file interface wrapping around an iterator + + This is initialized with an iterator, and then converts it into a + stream of characters. The object will evaluate as little of the + iterator as is necessary to provide the requested bytes. + + The actual file is a sequence of marshaled objects, each preceded + by 8 bytes which identifies the following the type of object, and + specifies its length. File objects are not marshalled, but the + data is written in chunks of Globals.blocksize, and the following + blocks can identify themselves as continuations. + + """ + def __init__(self, iter): + """Initialize with iter""" + self.iter = iter + self.bufferlist = [] + self.bufferlen = 0L + self.currently_in_file = None + self.closed = None + + def read(self, length): + """Return next length bytes in file""" + assert not self.closed + while self.bufferlen < length: + if not self.addtobuffer(): break + + combined_buffer = "".join(self.bufferlist) + assert len(combined_buffer) == self.bufferlen + real_len = min(self.bufferlen, length) + self.bufferlen = self.bufferlen - real_len + self.bufferlist = [combined_buffer[real_len:]] + return combined_buffer[:real_len] + + def addtobuffer(self): + """Updates self.bufferlist and self.bufferlen, adding on a chunk + + Returns None if we have reached the end of the iterator, + otherwise return true. + + """ + if self.currently_in_file: + buf = "c" + self.addfromfile() + else: + try: currentobj = self.iter.next() + except StopIteration: return None + if hasattr(currentobj, "read") and hasattr(currentobj, "close"): + self.currently_in_file = currentobj + buf = "f" + self.addfromfile() + else: + pickle = cPickle.dumps(currentobj, 1) + buf = "o" + self._l2s(len(pickle)) + pickle + + self.bufferlist.append(buf) + self.bufferlen = self.bufferlen + len(buf) + return 1 + + def addfromfile(self): + """Read a chunk from the current file and return it""" + buf = self.currently_in_file.read(Globals.blocksize) + if not buf: + assert not self.currently_in_file.close() + self.currently_in_file = None + return self._l2s(len(buf)) + buf + + def _l2s(self, l): + """Convert long int to string of 7 characters""" + s = "" + for i in range(7): + l, remainder = divmod(l, 256) + s = chr(remainder) + s + assert remainder == 0 + return s + + def close(self): self.closed = 1 + + +class BufferedRead: + """Buffer the .read() calls to the given file + + This is used to lessen overhead and latency when a file is sent + over a connection. + + """ + def __init__(self, file): + self.file = file + self.buffer = "" + self.bufsize = Globals.conn_bufsize + + def read(self, l = -1): + if l < 0: # Read as much as possible + result = self.buffer + self.file.read() + self.buffer = "" + return result + + if len(self.buffer) < l: # Try to make buffer as long as l + self.buffer += self.file.read(max(self.bufsize, + l - len(self.buffer))) + actual_size = min(l, len(self.buffer)) + result = self.buffer[:actual_size] + self.buffer = self.buffer[actual_size:] + return result + + def close(self): return self.file.close() diff --git a/rdiff-backup/rdiff_backup/lazy.py b/rdiff-backup/rdiff_backup/lazy.py new file mode 100644 index 0000000..28e92c3 --- /dev/null +++ b/rdiff-backup/rdiff_backup/lazy.py @@ -0,0 +1,343 @@ +from __future__ import generators +execfile("static.py") +import os, stat, types + +####################################################################### +# +# lazy - Define some lazy data structures and functions acting on them +# + +class Iter: + """Hold static methods for the manipulation of lazy iterators""" + + def filter(predicate, iterator): + """Like filter in a lazy functional programming language""" + for i in iterator: + if predicate(i): yield i + + def map(function, iterator): + """Like map in a lazy functional programming language""" + for i in iterator: yield function(i) + + def foreach(function, iterator): + """Run function on each element in iterator""" + for i in iterator: function(i) + + def cat(*iters): + """Lazily concatenate iterators""" + for iter in iters: + for i in iter: yield i + + def cat2(iter_of_iters): + """Lazily concatenate iterators, iterated by big iterator""" + for iter in iter_of_iters: + for i in iter: yield i + + def empty(iter): + """True if iterator has length 0""" + for i in iter: return None + return 1 + + def equal(iter1, iter2, verbose = None, operator = lambda x, y: x == y): + """True if iterator 1 has same elements as iterator 2 + + Use equality operator, or == if it is unspecified. + + """ + for i1 in iter1: + try: i2 = iter2.next() + except StopIteration: + if verbose: print "End when i1 = %s" % i1 + return None + if not operator(i1, i2): + if verbose: print "%s not equal to %s" % (i1, i2) + return None + try: i2 = iter2.next() + except StopIteration: return 1 + if verbose: print "End when i2 = %s" % i2 + return None + + def Or(iter): + """True if any element in iterator is true. Short circuiting""" + i = None + for i in iter: + if i: return i + return i + + def And(iter): + """True if all elements in iterator are true. Short circuiting""" + i = 1 + for i in iter: + if not i: return i + return i + + def len(iter): + """Return length of iterator""" + i = 0 + while 1: + try: iter.next() + except StopIteration: return i + i = i+1 + + def foldr(f, default, iter): + """foldr the "fundamental list recursion operator"?""" + try: next = iter.next() + except StopIteration: return default + return f(next, Iter.foldr(f, default, iter)) + + def foldl(f, default, iter): + """the fundamental list iteration operator..""" + while 1: + try: next = iter.next() + except StopIteration: return default + default = f(default, next) + + def multiplex(iter, num_of_forks, final_func = None, closing_func = None): + """Split a single iterater into a number of streams + + The return val will be a list with length num_of_forks, each + of which will be an iterator like iter. final_func is the + function that will be called on each element in iter just as + it is being removed from the buffer. closing_func is called + when all the streams are finished. + + """ + if num_of_forks == 2 and not final_func and not closing_func: + im2 = IterMultiplex2(iter) + return (im2.yielda(), im2.yieldb()) + if not final_func: final_func = lambda i: None + if not closing_func: closing_func = lambda: None + + # buffer is a list of elements that some iterators need and others + # don't + buffer = [] + + # buffer[forkposition[i]] is the next element yieled by iterator + # i. If it is -1, yield from the original iter + starting_forkposition = [-1] * num_of_forks + forkposition = starting_forkposition[:] + called_closing_func = [None] + + def get_next(fork_num): + """Return the next element requested by fork_num""" + if forkposition[fork_num] == -1: + try: buffer.insert(0, iter.next()) + except StopIteration: + # call closing_func if necessary + if (forkposition == starting_forkposition and + not called_closing_func[0]): + closing_func() + called_closing_func[0] = None + raise StopIteration + for i in range(num_of_forks): forkposition[i] += 1 + + return_val = buffer[forkposition[fork_num]] + forkposition[fork_num] -= 1 + + blen = len(buffer) + if not (blen-1) in forkposition: + # Last position in buffer no longer needed + assert forkposition[fork_num] == blen-2 + final_func(buffer[blen-1]) + del buffer[blen-1] + return return_val + + def make_iterator(fork_num): + while(1): yield get_next(fork_num) + + return tuple(map(make_iterator, range(num_of_forks))) + +MakeStatic(Iter) + + +class IterMultiplex2: + """Multiplex an iterator into 2 parts + + This is a special optimized case of the Iter.multiplex function, + used when there is no closing_func or final_func, and we only want + to split it into 2. By profiling, this is a time sensitive class. + + """ + def __init__(self, iter): + self.a_leading_by = 0 # How many places a is ahead of b + self.buffer = [] + self.iter = iter + + def yielda(self): + """Return first iterator""" + buf, iter = self.buffer, self.iter + while(1): + if self.a_leading_by >= 0: # a is in front, add new element + elem = iter.next() # exception will be passed + buf.append(elem) + else: elem = buf.pop(0) # b is in front, subtract an element + self.a_leading_by += 1 + yield elem + + def yieldb(self): + """Return second iterator""" + buf, iter = self.buffer, self.iter + while(1): + if self.a_leading_by <= 0: # b is in front, add new element + elem = iter.next() # exception will be passed + buf.append(elem) + else: elem = buf.pop(0) # a is in front, subtract an element + self.a_leading_by -= 1 + yield elem + + +class IterTreeReducer: + """Tree style reducer object for iterator + + The indicies of a RORPIter form a tree type structure. This class + can be used on each element of an iter in sequence and the result + will be as if the corresponding tree was reduced. This tries to + bridge the gap between the tree nature of directories, and the + iterator nature of the connection between hosts and the temporal + order in which the files are processed. + + The elements of the iterator are required to have a tuple-style + .index, called "indexed elem" below. + + """ + def __init__(self, base_init, branch_reducer, + branch_base, base_final, initial_state = None): + """ITR initializer + + base_init is a function of one argument, an indexed elem. It + is called immediately on any elem in the iterator. It should + return some value type A. + + branch_reducer and branch_base are used to form a value on a + bunch of reduced branches, in the way that a linked list of + type C can be folded to form a value type B. + + base_final is called when leaving a tree. It takes three + arguments, the indexed elem, the output (type A) of base_init, + the output of branch_reducer on all the branches (type B) and + returns a value type C. + + """ + self.base_init = base_init + self.branch_reducer = branch_reducer + self.base_final = base_final + self.branch_base = branch_base + + if initial_state: self.setstate(initial_state) + else: + self.state = IterTreeReducerState(branch_base) + self.subreducer = None + + def setstate(self, state): + """Update with new state, recursive if necessary""" + self.state = state + if state.substate: self.subreducer = self.newinstance(state.substate) + else: self.subreducer = None + + def getstate(self): return self.state + + def getresult(self): + """Return results of calculation""" + if not self.state.calculated: self.calculate_final_val() + return self.state.final_val + + def intree(self, index): + """Return true if index is still in current tree""" + return self.state.base_index == index[:len(self.state.base_index)] + + def newinstance(self, state = None): + """Return reducer of same type as self + + If state is None, sets substate of self.state, otherwise + assume this is already set. + + """ + new = self.__class__(self.base_init, self.branch_reducer, + self.branch_base, self.base_final, state) + if state is None: self.state.substate = new.state + return new + + def process_w_subreducer(self, indexed_elem): + """Give object to subreducer, if necessary update branch_val""" + if not self.subreducer: + self.subreducer = self.newinstance() + if not self.subreducer(indexed_elem): + self.state.branch_val = self.branch_reducer(self.state.branch_val, + self.subreducer.getresult()) + self.subreducer = self.newinstance() + assert self.subreducer(indexed_elem) + + def calculate_final_val(self): + """Set final value""" + if self.subreducer: + self.state.branch_val = self.branch_reducer(self.state.branch_val, + self.subreducer.getresult()) + if self.state.current_index is None: + # No input, set None as default value + self.state.final_val = None + else: + self.state.final_val = self.base_final(self.state.base_elem, + self.state.base_init_val, + self.state.branch_val) + self.state.calculated = 1 + + def __call__(self, indexed_elem): + """Process elem, current position in iterator + + Returns true if elem successfully processed, false if elem is + not in the current tree and thus the final result is + available. + + """ + index = indexed_elem.index + assert type(index) is types.TupleType + + if self.state.current_index is None: # must be at base + self.state.base_init_val = self.base_init(indexed_elem) + # Do most crash-prone op first, so we don't leave inconsistent + self.state.current_index = index + self.state.base_index = index + self.state.base_elem = indexed_elem + return 1 + elif not index > self.state.current_index: + Log("Warning: oldindex %s >= newindex %s" % + (self.state.current_index, index), 2) + + if not self.intree(index): + self.calculate_final_val() + return None + else: + self.process_w_subreducer(indexed_elem) + self.state.current_index = index + return 1 + + +class IterTreeReducerState: + """Holds the state for IterTreeReducers + + An IterTreeReducer cannot be pickled directly because it holds + some anonymous functions. This class contains the relevant data + that is likely to be picklable, so the ITR can be saved and loaded + if the associated functions are known. + + """ + def __init__(self, branch_base): + """ITRS initializer + + Class variables: + self.current_index - last index processing started on, or None + self.base_index - index of first element processed + self.base_elem - first element processed + self.branch_val - default branch reducing value + + self.calculated - true iff the final value has been calculated + self.base_init_val - return value of base_init function + self.final_val - Final value once it's calculated + self.substate - IterTreeReducerState when subreducer active + + """ + self.current_index = None + self.calculated = None + self.branch_val = branch_base + self.substate = None + diff --git a/rdiff-backup/rdiff_backup/log.py b/rdiff-backup/rdiff_backup/log.py new file mode 100644 index 0000000..5416fd2 --- /dev/null +++ b/rdiff-backup/rdiff_backup/log.py @@ -0,0 +1,142 @@ +import time, sys +execfile("lazy.py") + +####################################################################### +# +# log - Manage logging +# + +class LoggerError(Exception): pass + +class Logger: + """All functions which deal with logging""" + def __init__(self): + self.log_file_open = None + self.log_file_local = None + self.verbosity = self.term_verbosity = 3 + # termverbset is true if the term_verbosity has been explicity set + self.termverbset = None + + def setverbosity(self, verbosity_string): + """Set verbosity levels. Takes a number string""" + try: self.verbosity = int(verbosity_string) + except ValueError: + Log.FatalError("Verbosity must be a number, received '%s' " + "instead." % verbosity_string) + if not self.termverbset: self.term_verbosity = self.verbosity + + def setterm_verbosity(self, termverb_string): + """Set verbosity to terminal. Takes a number string""" + try: self.term_verbosity = int(termverb_string) + except ValueError: + Log.FatalError("Terminal verbosity must be a number, received " + "'%s' insteaxd." % termverb_string) + self.termverbset = 1 + + def open_logfile(self, rpath): + """Inform all connections of an open logfile. + + rpath.conn will write to the file, and the others will pass + write commands off to it. + + """ + for conn in Globals.connections: + conn.Log.open_logfile_allconn(rpath.conn) + rpath.conn.Log.open_logfile_local(rpath) + + def open_logfile_allconn(self, log_file_conn): + """Run on all connections to signal log file is open""" + self.log_file_open = 1 + self.log_file_conn = log_file_conn + + def open_logfile_local(self, rpath): + """Open logfile locally - should only be run on one connection""" + assert self.log_file_conn is Globals.local_connection + self.log_file_local = 1 + self.logrp = rpath + self.logfp = rpath.open("a") + + def close_logfile(self): + """Close logfile and inform all connections""" + if self.log_file_open: + for conn in Globals.connections: + conn.Log.close_logfile_allconn() + self.log_file_conn.Log.close_logfile_local() + + def close_logfile_allconn(self): + """Run on every connection""" + self.log_file_open = None + + def close_logfile_local(self): + """Run by logging connection - close logfile""" + assert self.log_file_conn is Globals.local_connection + assert not self.logfp.close() + + def format(self, message, verbosity): + """Format the message, possibly adding date information""" + if verbosity < 9: return message + "\n" + else: return "%s %s\n" % (time.asctime(time.localtime(time.time())), + message) + + def __call__(self, message, verbosity): + """Log message that has verbosity importance""" + if verbosity <= self.verbosity: self.log_to_file(message) + if verbosity <= self.term_verbosity: + self.log_to_term(message, verbosity) + + def log_to_file(self, message): + """Write the message to the log file, if possible""" + if self.log_file_open: + if self.log_file_local: + self.logfp.write(self.format(message, self.verbosity)) + else: self.log_file_conn.Log.log_to_file(message) + + def log_to_term(self, message, verbosity): + """Write message to stdout/stderr""" + if verbosity <= 2 or Globals.server: termfp = sys.stderr + else: termfp = sys.stdout + termfp.write(self.format(message, self.term_verbosity)) + + def conn(self, direction, result, req_num): + """Log some data on the connection + + The main worry with this function is that something in here + will create more network traffic, which will spiral to + infinite regress. So, for instance, logging must only be done + to the terminal, because otherwise the log file may be remote. + + """ + if self.term_verbosity < 9: return + if type(result) is types.StringType: result_repr = repr(result) + else: result_repr = str(result) + if Globals.server: conn_str = "Server" + else: conn_str = "Client" + self.log_to_term("%s %s (%d): %s" % + (conn_str, direction, req_num, result_repr), 9) + + def FatalError(self, message): + self("Fatal Error: " + message, 1) + Globals.Main.cleanup() + sys.exit(1) + + def exception(self, only_terminal = 0): + """Log an exception and traceback at verbosity 2 + + If only_terminal is None, log normally. If it is 1, then only + log to disk if log file is local (self.log_file_open = 1). If + it is 2, don't log to disk at all. + + """ + assert only_terminal in (0, 1, 2) + if (only_terminal == 0 or + (only_terminal == 1 and self.log_file_open)): + logging_func = self.__call__ + else: logging_func = self.log_to_term + + exc_info = sys.exc_info() + logging_func("Exception %s raised of class %s" % + (exc_info[1], exc_info[0]), 2) + logging_func("".join(traceback.format_tb(exc_info[2])), 2) + + +Log = Logger() diff --git a/rdiff-backup/rdiff_backup/manage.py b/rdiff-backup/rdiff_backup/manage.py new file mode 100644 index 0000000..c0f4a85 --- /dev/null +++ b/rdiff-backup/rdiff_backup/manage.py @@ -0,0 +1,99 @@ +execfile("restore.py") + +####################################################################### +# +# manage - list, delete, and otherwise manage increments +# + +class ManageException(Exception): pass + +class Manage: + def get_incobjs(datadir): + """Return Increments objects given the rdiff-backup data directory""" + return map(IncObj, Manage.find_incrps_with_base(datadir, "increments")) + + def find_incrps_with_base(dir_rp, basename): + """Return list of incfiles with given basename in dir_rp""" + rps = map(dir_rp.append, dir_rp.listdir()) + incrps = filter(RPath.isincfile, rps) + result = filter(lambda rp: rp.getincbase_str() == basename, incrps) + Log("find_incrps_with_base: found %d incs" % len(result), 6) + return result + + def describe_root_incs(datadir): + """Return a string describing all the the root increments""" + result = [] + currentrps = Manage.find_incrps_with_base(datadir, "current_mirror") + if not currentrps: + Log("Warning: no current mirror marker found", 1) + elif len(currentrps) > 1: + Log("Warning: multiple mirror markers found", 1) + for rp in currentrps: + result.append("Found mirror marker %s" % rp.path) + result.append("Indicating latest mirror taken at %s" % + Time.stringtopretty(rp.getinctime())) + result.append("---------------------------------------------" + "-------------") + + # Sort so they are in reverse order by time + time_w_incobjs = map(lambda io: (-io.time, io), + Manage.get_incobjs(datadir)) + time_w_incobjs.sort() + incobjs = map(lambda x: x[1], time_w_incobjs) + result.append("Found %d increments:" % len(incobjs)) + result.append("\n------------------------------------------\n".join( + map(IncObj.full_description, incobjs))) + return "\n".join(result) + + def delete_earlier_than(baserp, time): + """Deleting increments older than time in directory baserp + + time is in seconds. It will then delete any empty directories + in the tree. To process the entire backup area, the + rdiff-backup-data directory should be the root of the tree. + + """ + def yield_files(rp): + yield rp + if rp.isdir(): + for filename in rp.listdir(): + for sub_rp in yield_files(rp.append(filename)): + yield sub_rp + + for rp in yield_files(baserp): + if ((rp.isincfile() and + Time.stringtotime(rp.getinctime()) < time) or + (rp.isdir() and not rp.listdir())): + Log("Deleting increment file %s" % rp.path, 5) + rp.delete() + +MakeStatic(Manage) + + +class IncObj: + """Increment object - represent a completed increment""" + def __init__(self, incrp): + """IncObj initializer + + incrp is an RPath of a path like increments.TIMESTR.dir + standing for the root of the increment. + + """ + if not incrp.isincfile(): + raise ManageException("%s is not an inc file" % incrp.path) + self.incrp = incrp + self.time = Time.stringtotime(incrp.getinctime()) + + def getbaserp(self): + """Return rp of the incrp without extensions""" + return self.incrp.getincbase() + + def pretty_time(self): + """Return a formatted version of inc's time""" + return Time.timetopretty(self.time) + + def full_description(self): + """Return string describing increment""" + s = ["Increment file %s" % self.incrp.path, + "Date: %s" % self.pretty_time()] + return "\n".join(s) diff --git a/rdiff-backup/rdiff_backup/restore.py b/rdiff-backup/rdiff_backup/restore.py new file mode 100644 index 0000000..1f7d24e --- /dev/null +++ b/rdiff-backup/rdiff_backup/restore.py @@ -0,0 +1,158 @@ +from __future__ import generators +execfile("increment.py") +import tempfile + +####################################################################### +# +# restore - Read increment files and restore to original +# + +class RestoreError(Exception): pass + +class Restore: + def RestoreFile(rest_time, rpbase, inclist, rptarget): + """Non-recursive restore function + + rest_time is the time in seconds to restore to, + rpbase is the base name of the file being restored, + inclist is a list of rpaths containing all the relevant increments, + and rptarget is the rpath that will be written with the restored file. + + """ + inclist = Restore.sortincseq(rest_time, inclist) + if not inclist and not (rpbase and rpbase.lstat()): + return # no increments were applicable + Log("Restoring %s with increments %s to %s" % + (rpbase and rpbase.path, + Restore.inclist2str(inclist), rptarget.path), 5) + if not inclist or inclist[0].getinctype() == "diff": + assert rpbase and rpbase.lstat(), \ + "No base to go with incs %s" % Restore.inclist2str(inclist) + RPath.copy_with_attribs(rpbase, rptarget) + for inc in inclist: Restore.applyinc(inc, rptarget) + + def inclist2str(inclist): + """Return string version of inclist for logging""" + return ",".join(map(lambda x: x.path, inclist)) + + def sortincseq(rest_time, inclist): + """Sort the inc sequence, and throw away irrelevant increments""" + incpairs = map(lambda rp: (Time.stringtotime(rp.getinctime()), rp), + inclist) + # Only consider increments at or after the time being restored + incpairs = filter(lambda pair: pair[0] >= rest_time, incpairs) + + # Now throw away older unnecessary increments + incpairs.sort() + i = 0 + while(i < len(incpairs)): + # Only diff type increments require later versions + if incpairs[i][1].getinctype() != "diff": break + i = i+1 + incpairs = incpairs[:i+1] + + # Return increments in reversed order + incpairs.reverse() + return map(lambda pair: pair[1], incpairs) + + def applyinc(inc, target): + """Apply increment rp inc to targetrp target""" + Log("Applying increment %s to %s" % (inc.path, target.path), 6) + inctype = inc.getinctype() + if inctype == "diff": + if not target.lstat(): + raise RestoreError("Bad increment sequence at " + inc.path) + Rdiff.patch_action(target, inc).execute() + elif inctype == "dir": + if not target.isdir(): + if target.lstat(): + raise RestoreError("File %s already exists" % target.path) + target.mkdir() + elif inctype == "missing": return + elif inctype == "snapshot": RPath.copy(inc, target) + else: raise RestoreError("Unknown inctype %s" % inctype) + RPath.copy_attribs(inc, target) + + def RestoreRecursive(rest_time, mirror_base, baseinc_tup, target_base): + """Recursive restore function. + + rest_time is the time in seconds to restore to; + mirror_base is an rpath of the mirror directory corresponding + to the one to be restored; + baseinc_tup is the inc tuple (incdir, list of incs) to be + restored; + and target_base in the dsrp of the target directory. + + """ + assert isinstance(target_base, DSRPath) + collated = RORPIter.CollateIterators( + DestructiveStepping.Iterate_from(mirror_base, None), + Restore.yield_inc_tuples(baseinc_tup)) + mirror_finalizer = DestructiveStepping.Finalizer() + target_finalizer = DestructiveStepping.Finalizer() + + for mirror, inc_tup in collated: + if not inc_tup: + inclist = [] + target = target_base.new_index(mirror.index) + else: + inclist = inc_tup[1] + target = target_base.new_index(inc_tup.index) + DestructiveStepping.initialize(target, None) + Restore.RestoreFile(rest_time, mirror, inclist, target) + target_finalizer(target) + if mirror: mirror_finalizer(mirror) + target_finalizer.getresult() + mirror_finalizer.getresult() + + def yield_inc_tuples(inc_tuple): + """Iterate increment tuples starting with inc_tuple + + An increment tuple is an IndexedTuple (pair). The first will + be the rpath of a directory, and the second is a list of all + the increments associated with that directory. If there are + increments that do not correspond to a directory, the first + element will be None. All the rpaths involved correspond to + files in the increment directory. + + """ + oldindex, rpath = inc_tuple.index, inc_tuple[0] + yield inc_tuple + if not rpath or not rpath.isdir(): return + + inc_list_dict = {} # Index tuple lists by index + dirlist = rpath.listdir() + + def affirm_dict_indexed(index): + """Make sure the inc_list_dict has given index""" + if not inc_list_dict.has_key(index): + inc_list_dict[index] = [None, []] + + def add_to_dict(filename): + """Add filename to the inc tuple dictionary""" + rp = rpath.append(filename) + if rp.isincfile(): + basename = rp.getincbase_str() + affirm_dict_indexed(basename) + inc_list_dict[basename][1].append(rp) + elif rp.isdir(): + affirm_dict_indexed(filename) + inc_list_dict[filename][0] = rp + + def list2tuple(index): + """Return inc_tuple version of dictionary entry by index""" + inclist = inc_list_dict[index] + if not inclist[1]: return None # no increments, so ignore + return IndexedTuple(oldindex + (index,), inclist) + + for filename in dirlist: add_to_dict(filename) + keys = inc_list_dict.keys() + keys.sort() + for index in keys: + new_inc_tuple = list2tuple(index) + if not new_inc_tuple: continue + elif new_inc_tuple[0]: # corresponds to directory + for i in Restore.yield_inc_tuples(new_inc_tuple): yield i + else: yield new_inc_tuple + +MakeStatic(Restore) diff --git a/rdiff-backup/rdiff_backup/rlist.py b/rdiff-backup/rdiff_backup/rlist.py new file mode 100644 index 0000000..c0f8ee9 --- /dev/null +++ b/rdiff-backup/rdiff_backup/rlist.py @@ -0,0 +1,240 @@ +from __future__ import generators +import marshal, sha, types +execfile("iterfile.py") + +####################################################################### +# +# rlist - Define the CachingIter, and sig/diff/patch ops on iterators +# + +class CachingIter: + """Cache parts of an iter using a list + + Turn an iter into something that you can prepend elements into, + and also read from without apparently changing the state. + + """ + def __init__(self, iter_or_list): + if type(iter_or_list) is types.ListType: + self.iter = iter(iter_or_list) + else: self.iter = iter_or_list + self.next = self.iter.next + self.head = [] + + def __iter__(self): return self + + def _next(self): + """Take elements from the head list + + When there are elements waiting before the main iterator, this + is the next function. If not, iter.next returns to being next. + + """ + head = self.head + a = head[0] + del head[0] + if not head: self.next = self.iter.next + return a + + def nextrange(self, m): + """Return next m elements in list""" + l = head[:m] + del head[:m] + for i in xrange(m - len(l)): l.append(self.iter.next()) + return l + + def peek(self): + """Return next element without removing it from iterator""" + n = self.next() + self.push(n) + return n + + def push(self, elem): + """Insert an element into the iterator at the beginning""" + if not self.head: self.next = self._next + self.head.insert(0, elem) + + def pushrange(self, elem_list): + """Insert list of multiple elements at the beginning""" + if not self.head: self.next = self._next + self.head[:0] = elem_list + + def cache(self, m): + """Move next m elements from iter to internal list + + If m is None, append the entire rest of the iterator. + + """ + h, it = self.head, self.iter + if m is None: + for i in it: h.append(i) + else: + for i in xrange(m): h.append(it.next()) + + def __getitem__(self, key): + """Support a[i:j] style notation. Non destructive""" + if type(key) is types.SliceType: + if key.stop > len(self.head): self.cache(key.stop - len(self.head)) + return self.head[key.start, key.stop] + else: + if key >= len(self.head): self.cache(key + 1 - len(self.head)) + return self.head[key] + + + +class RListDelta: + """Note a difference from one iterator (A) to another (B) + + The min, max pairs are indicies which stand for the half-open + interval (min, max], and elemlist is a list of all the elements in + A which fall within this interval. + + These are produced by the function RList.Deltas(...) + + """ + def __init__(self, (min, max), elemlist): + self.min, self.max = min, max + self.elemlist = elemlist + + + +class RList: + """Tools for signatures, diffing, and patching an iterator + + This class requires that the iterators involved are yielding + objects that have .index and .data attributes. Two objects with + the same .data attribute are supposed to be equivalent. The + iterator must also yield the objects in increasing order with + respect to the .index attribute. + + """ + blocksize = 100 + + def Signatures(iter): + """Return iterator of signatures from stream of pairs + + Each signature is an ordered pair (last index sig applies to, + SHA digest of data) + + """ + i, s = 0, sha.new() + for iter_elem in iter: + s.update(marshal.dumps(iter_elem.data)) + i = i+1 + if i == RList.blocksize: + yield (iter_elem.index, s.digest()) + i, s = 0, sha.new() + if i != 0: yield (iter_elem.index, s.digest()) + + def sig_one_block(iter_or_list): + """Return the digest portion of a signature on given list""" + s = sha.new() + for iter_elem in iter_or_list: s.update(marshal.dumps(iter_elem.data)) + return s.digest() + + def Deltas(remote_sigs, iter): + """Return iterator of Delta objects that bring iter to remote""" + def get_before(index, iter): + """Return elements in iter whose index is before or equal index + iter needs to be pushable + """ + l = [] + while 1: + try: iter_elem = iter.next() + except StopIteration: return l + if iter_elem.index > index: break + l.append(iter_elem) + iter.push(iter_elem) + return l + + if not isinstance(iter, CachingIter): iter = CachingIter(iter) + oldindex = None + for (rs_index, rs_digest) in remote_sigs: + l = get_before(rs_index, iter) + if rs_digest != RList.sig_one_block(l): + yield RListDelta((oldindex, rs_index), l) + oldindex = rs_index + + def patch_once(basis, delta): + """Apply one delta to basis to return original iterator + + This returns original iterator up to and including the max range + of delta, then stop. basis should be pushable. + + """ + # Return elements of basis until start of delta range + for basis_elem in basis: + if basis_elem.index > delta.min: + basis.push(basis_elem) + break + yield basis_elem + + # Yield elements of delta... + for elem in delta.elemlist: yield elem + + # Finally, discard basis until end of delta range + for basis_elem in basis: + if basis_elem.index > delta.max: + basis.push(basis_elem) + break + + def Patch(basis, deltas): + """Apply a delta stream to basis iterator, yielding original""" + if not isinstance(basis, CachingIter): basis = CachingIter(basis) + for d in deltas: + for elem in RList.patch_once(basis, d): yield elem + for elem in basis: yield elem + + def get_difference_once(basis, delta): + """From one delta, find differences from basis + + Will return pairs (basis_elem, new_elem) where basis_elem is + the element from the basis iterator and new_elem is the + element from the other iterator. If either is missing None + will take its place. If both are present iff two have the + same index. + + """ + # Discard any elements of basis before delta starts + for basis_elem in basis: + if basis_elem.index > delta.min: + basis.push(basis_elem) + break + + # In range compare each one by one + di, boverflow, doverflow = 0, None, None + while 1: + # Set indicies and data, or mark if at end of range already + try: + basis_elem = basis.next() + if basis_elem.index > delta.max: + basis.push(basis_elem) + boverflow = 1 + except StopIteration: boverflow = 1 + if di >= len(delta.elemlist): doverflow = 1 + else: delta_elem = delta.elemlist[di] + + if boverflow and doverflow: break + elif boverflow: + yield (None, delta_elem) + di = di+1 + elif doverflow: yield (basis_elem, None) + + # Now can assume that everything is in range + elif basis_elem.index > delta_elem.index: + yield (None, delta_elem) + basis.push(basis_elem) + di = di+1 + elif basis_elem.index == delta_elem.index: + if basis_elem.data != delta_elem.data: + yield (basis_elem, delta_elem) + di = di+1 + else: yield (basis_elem, None) + + def Dissimilar(basis, deltas): + """Return iter of differences from delta iter and basis iter""" + if not isinstance(basis, CachingIter): basis = CachingIter(basis) + for d in deltas: + for triple in RList.get_difference_once(basis, d): yield triple + +MakeStatic(RList) diff --git a/rdiff-backup/rdiff_backup/robust.py b/rdiff-backup/rdiff_backup/robust.py new file mode 100644 index 0000000..c23ff6a --- /dev/null +++ b/rdiff-backup/rdiff_backup/robust.py @@ -0,0 +1,537 @@ +import tempfile +execfile("rpath.py") + +####################################################################### +# +# robust - code which prevents mirror from being corrupted, error-recovery +# +# Ideally no matter an instance of rdiff-backup gets aborted, no +# information should get lost. The target directory should be left in +# a coherent state, and later instances of rdiff-backup should clean +# things up so there is no sign that anything ever got aborted or +# failed. +# +# Thus, files should be updated in an atomic way as possible. Each +# file should be updated (and the corresponding diff files written) or +# not, and it should be clear which happened. In general, I don't +# think this is possible, since the creation of the diff files and the +# changing of updated files cannot be guarateed to happen together. +# It is possible, I think, to record various information to files +# which would allow a later process to figure out what the last +# operation was, but this would add several file operations to the +# processing of each file, and I don't think, would be a good +# tradeoff. +# +# The compromise reached here is that diff files should be created +# just before the mirror files are updated, and each file update +# should be done with a rename operation on a file in the same +# directory. Furthermore, every once in a while, rdiff-backup will +# record which file it just finished processing. If any fatal errors +# are caught, it will also record the last processed file. Future +# instances may not know exactly when the previous instance was +# aborted, but they will be able to narrow down the possibilities. + +class RobustAction: + """Represents a file operation to be accomplished later""" + def __init__(self, init_thunk, final_thunk, error_thunk): + """RobustAction initializer + + All the thunks are functions whose return value will be + ignored. init_thunk should not make any irreversible changes + but prepare for the writing of the important data. final_thunk + should be as short as possible and do the real work. + error_thunk is run if there is an error in init_thunk or + final_thunk. Errors in init_thunk should be corrected by + error_thunk as if nothing had been run in the first place. + The functions take no arguments except for error_thunk, which + receives the exception as its only argument. + + """ + self.init_thunk = init_thunk + self.final_thunk = final_thunk + self.error_thunk = error_thunk + + def execute(self): + """Actually run the operation""" + try: + self.init_thunk() + self.final_thunk() + except Exception, exp: # Catch all errors + Log.exception() + self.error_thunk(exp) + raise exp + + +class Robust: + """Contains various file operations made safer using tempfiles""" + null_action = RobustAction(lambda: None, lambda: None, lambda e: None) + def chain(robust_action_list): + """Return chain tying together a number of robust actions + + The whole chain will be aborted if some error occurs in + initialization stage of any of the component actions. + + """ + ras_with_completed_inits = [] + def init(): + for ra in robust_action_list: + ras_with_completed_inits.append(ra) + ra.init_thunk() + def final(): + for ra in robust_action_list: ra.final_thunk() + def error(exp): + for ra in ras_with_completed_inits: ra.error_thunk(exp) + return RobustAction(init, final, error) + + def chain_nested(robust_action_list): + """Like chain but final actions performed in reverse order""" + ras_with_completed_inits = [] + def init(): + for ra in robust_action_list: + ras_with_completed_inits.append(ra) + ra.init_thunk() + def final(): + ralist_copy = robust_action_list[:] + ralist_copy.reverse() + for ra in ralist_copy: ra.final_thunk() + def error(exp): + for ra in ras_with_completed_inits: ra.error_thunk(exp) + return RobustAction(init, final, error) + + def make_tf_robustaction(init_thunk, tempfiles, final_renames = None): + """Shortcut RobustAction creator when only tempfiles involved + + Often the robust action will just consist of some initial + stage, renaming tempfiles in the final stage, and deleting + them if there is an error. This function makes it easier to + create RobustActions of that type. + + """ + assert type(tempfiles) is types.TupleType, tempfiles + if final_renames is None: final = lambda: None + else: + assert len(tempfiles) == len(final_renames) + def final(): # rename tempfiles to final positions + for i in range(len(tempfiles)): + final_name = final_renames[i] + if final_name: + if final_name.isdir(): # Cannot rename over directory + final_name.delete() + tempfiles[i].rename(final_name) + def error(exp): + for tf in tempfiles: tf.delete() + return RobustAction(init_thunk, final, error) + + def copy_action(rorpin, rpout): + """Return robust action copying rorpin to rpout + + The source can be a rorp or an rpath. Does not recurse. If + directories copied, then just exit (output directory not + overwritten). + + """ + tfl = [None] # Need mutable object that init and final can access + def init(): + if not (rorpin.isdir() and rpout.isdir()): # already a dir + tfl[0] = TempFileManager.new(rpout) + if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb")) + else: RPath.copy(rorpin, tf) + def final(): + if tfl[0] and tfl[0].lstat(): + if rpout.isdir(): rpout.delete() + tfl[0].rename(rpout) + return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) + + def copy_with_attribs_action(rorpin, rpout): + """Like copy_action but also copy attributes""" + tfl = [None] # Need mutable object that init and final can access + def init(): + if not (rorpin.isdir() and rpout.isdir()): # already a dir + tfl[0] = TempFileManager.new(rpout) + if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb")) + else: RPath.copy(rorpin, tfl[0]) + if tfl[0].lstat(): # Some files, like sockets, won't be created + RPathStatic.copy_attribs(rorpin, tfl[0]) + def final(): + if rorpin.isdir() and rpout.isdir(): + RPath.copy_attribs(rorpin, rpout) + elif tfl[0] and tfl[0].lstat(): + if rpout.isdir(): rpout.delete() + tfl[0].rename(rpout) + return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete()) + + def copy_attribs_action(rorpin, rpout): + """Return action which just copies attributes + + Copying attributes is already pretty atomic, so just run + normal sequence. + + """ + def final(): RPath.copy_attribs(rorpin, rpout) + return RobustAction(lambda: None, final, lambda e: None) + + def symlink_action(rpath, linktext): + """Return symlink action by moving one file over another""" + tf = TempFileManager.new(rpath) + def init(): tf.symlink(linktext) + return Robust.make_tf_robustaction(init, (tf,), (rpath,)) + + def destructive_write_action(rp, s): + """Return action writing string s to rpath rp in robust way + + This will overwrite any data currently in rp. + + """ + tf = TempFileManager.new(rp) + def init(): + fp = tf.open("wb") + fp.write(s) + assert not fp.close() + tf.setdata() + return Robust.make_tf_robustaction(init, (tf,), (rp,)) + +MakeStatic(Robust) + + +class TempFileManager: + """Manage temp files""" + + # This is a connection-specific list of temp files, to be cleaned + # up before rdiff-backup exits. + _tempfiles = [] + + # To make collisions less likely, this gets put in the file name + # and incremented whenever a new file is requested. + _tfindex = 0 + + def new(cls, rp_base, same_dir = 1): + """Return new tempfile that isn't in use. + + If same_dir, tempfile will be in same directory as rp_base. + Otherwise, use tempfile module to get filename. + + """ + conn = rp_base.conn + if conn is not Globals.local_connection: + return conn.TempFileManager.new(rp_base, same_dir) + + def find_unused(conn, dir): + """Find an unused tempfile with connection conn in directory dir""" + while 1: + if cls._tfindex > 100000000: + Log("Resetting index", 2) + cls._tfindex = 0 + tf = TempFile(conn, os.path.join(dir, + "rdiff-backup.tmp.%d" % cls._tfindex)) + cls._tfindex = cls._tfindex+1 + if not tf.lstat(): return tf + + if same_dir: tf = find_unused(conn, rp_base.dirsplit()[0]) + else: tf = TempFile(conn, tempfile.mktemp()) + cls._tempfiles.append(tf) + return tf + + def remove_listing(cls, tempfile): + """Remove listing of tempfile""" + if Globals.local_connection is not tempfile.conn: + tempfile.conn.TempFileManager.remove_listing(tempfile) + elif tempfile in cls._tempfiles: cls._tempfiles.remove(tempfile) + + def delete_all(cls): + """Delete all remaining tempfiles""" + for tf in cls._tempfiles[:]: tf.delete() + +MakeClass(TempFileManager) + + +class TempFile(RPath): + """Like an RPath, but keep track of which ones are still here""" + def rename(self, rp_dest): + """Rename temp file to permanent location, possibly overwriting""" + if self.isdir() and not rp_dest.isdir(): + # Cannot move a directory directly over another file + rp_dest.delete() + if (isinstance(rp_dest, DSRPath) and rp_dest.perms_delayed + and not self.hasfullperms()): + # If we are moving to a delayed perm directory, delay + # permission change on destination. + rp_dest.chmod(self.getperms()) + self.chmod(0700) + RPathStatic.rename(self, rp_dest) + TempFileManager.remove_listing(self) + + def delete(self): + RPath.delete(self) + TempFileManager.remove_listing(self) + + +class SaveState: + """Save state in the middle of backups for resuming later""" + _last_file_sym = None # RPath of sym pointing to last file processed + _last_file_definitive_rp = None # Touch this if last file is really last + _last_checkpoint_time = 0 # time in seconds of last checkpoint + _checkpoint_rp = None # RPath of checkpoint data pickle + + def init_filenames(cls, incrementing): + """Set rpaths of markers. Assume rbdir already set. + + If incrementing, then indicate increment operation, otherwise + indicate mirror. + + """ + if not Globals.isbackup_writer: + return Globals.backup_writer.SaveState.init_filenames(incrementing) + + assert Globals.local_connection is Globals.rbdir.conn, \ + Globals.rbdir.conn + if incrementing: cls._last_file_sym = Globals.rbdir.append( + "last-file-incremented.%s.snapshot" % Time.curtimestr) + else: cls._last_file_sym = Globals.rbdir.append( + "last-file-mirrored.%s.snapshot" % Time.curtimestr) + cls._checkpoint_rp = Globals.rbdir.append( + "checkpoint-data.%s.snapshot" % Time.curtimestr) + cls._last_file_definitive_rp = Globals.rbdir.append( + "last-file-definitive.%s.snapshot" % Time.curtimestr) + + def touch_last_file(cls): + """Touch last file marker, indicating backup has begun""" + cls._last_file_sym.touch() + + def touch_last_file_definitive(cls): + """Create last-file-definitive marker + + When a backup gets aborted, there may be time to indicate the + last file successfully processed, and this should be touched. + Sometimes when the abort is hard, there may be a last file + indicated, but further files since then have been processed, + in which case this shouldn't be touched. + + """ + cls._last_file_definitive_rp.touch() + + def record_last_file_action(cls, last_file_rorp): + """Action recording last file to be processed as symlink in rbdir + + last_file_rorp is None means that no file is known to have + been processed. + + """ + if last_file_rorp: + symtext = apply(os.path.join, + ('increments',) + last_file_rorp.index) + return Robust.symlink_action(cls._last_file_sym, symtext) + else: return RobustAction(lambda: None, cls.touch_last_file, + lambda exp: None) + + def checkpoint_inc_backup(cls, ITR, finalizer, last_file_rorp, + override = None): + """Save states of tree reducer and finalizer during inc backup + + If override is true, checkpoint even if one isn't due. + + """ + if not override and not cls.checkpoint_needed(): return + assert cls._checkpoint_rp, "_checkpoint_rp not set yet" + + cls._last_checkpoint_time = time.time() + Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) + state_string = cPickle.dumps((ITR.getstate(), finalizer.getstate())) + Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp, + state_string), + cls.record_last_file_action(last_file_rorp)]).execute() + + def checkpoint_mirror(cls, finalizer, last_file_rorp, override = None): + """For a mirror, only finalizer and last_file should be saved""" + if not override and not cls.checkpoint_needed(): return + if not cls._checkpoint_rp: + Log("Warning, _checkpoint_rp not set yet", 2) + return + + cls._last_checkpoint_time = time.time() + Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7) + state_string = cPickle.dumps(finalizer.getstate()) + Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp, + state_string), + cls.record_last_file_action(last_file_rorp)]).execute() + + def checkpoint_needed(cls): + """Returns true if another checkpoint is called for""" + return (time.time() > cls._last_checkpoint_time + + Globals.checkpoint_interval) + + def checkpoint_remove(cls): + """Remove all checkpointing data after successful operation""" + for rp in Resume.get_relevant_rps(): rp.delete() + +MakeClass(SaveState) + + +class Resume: + """Check for old aborted backups and resume if necessary""" + _session_info_list = None # List of ResumeSessionInfo's, sorted by time + def FindTime(cls, index, later_than = 0): + """For a given index, find the appropriate time to use for inc + + If it is clear which time to use (because it is determined by + definitive records, or there are no aborted backup, etc.) then + just return the appropriate time. Otherwise, if an aborted + backup was last checkpointed before the index, assume that it + didn't get there, and go for the older time. If an inc file + is already present, the function will be rerun with later time + specified. + + """ + if Time.prevtime > later_than: return Time.prevtime # usual case + + for si in cls.get_sis_covering_index(index): + if si.time > later_than: return si.time + raise SkipFileException("Index %s already covered, skipping" % + str(index)) + + def get_sis_covering_index(cls, index): + """Return sorted list of SessionInfos which may cover index + + Aborted backup may be relevant unless index is lower and we + are sure that it didn't go further. + + """ + return filter(lambda session_info: + not ((session_info.last_index is None or + session_info.last_index < index) and + session_info.last_definitive), + cls._session_info_list) + + def SetSessionInfo(cls): + """Read data directory and initialize _session_info""" + silist = [] + rp_quad_dict = cls.group_rps_by_time(cls.get_relevant_rps()) + times = rp_quad_dict.keys() + times.sort() + for time in times: + silist.append(cls.quad_to_si(time, rp_quad_dict[time])) + cls._session_info_list = silist + + def get_relevant_rps(cls): + """Return list of relevant rpaths in rbdata directory""" + relevant_bases = ['last-file-incremented', 'last-file-mirrored', + 'checkpoint-data', 'last-file-definitive'] + rps = map(Globals.rbdir.append, Globals.rbdir.listdir()) + return filter(lambda rp: rp.isincfile() + and rp.getincbase_str() in relevant_bases, rps) + + def group_rps_by_time(cls, rplist): + """Take list of rps return time dict {time: quadlist} + + Times in seconds are the keys, values are triples of rps + [last-file-incremented, last-file-mirrored, checkpoint-data, + last-is-definitive]. + + """ + result = {} + for rp in rplist: + time = Time.stringtotime(rp.getinctime()) + if result.has_key(time): quadlist = result[time] + else: quadlist = [None, None, None, None] + base_string = rp.getincbase_str() + if base_string == 'last-file-incremented': quadlist[0] = rp + elif base_string == 'last-file-mirrored': quadlist[1] = rp + elif base_string == 'last-file-definitive': quadlist[3] = 1 + else: + assert base_string == 'checkpoint-data' + quadlist[2] = rp + result[time] = quadlist + return result + + def quad_to_si(cls, time, quad): + """Take time, quadlist, return associated ResumeSessionInfo""" + increment_sym, mirror_sym, checkpoint_rp, last_definitive = quad + assert not (increment_sym and mirror_sym) # both shouldn't exist + ITR, finalizer = None, None + if increment_sym: + mirror = None + last_index = cls.sym_to_index(increment_sym) + if checkpoint_rp: + ITR, finalizer = cls.unpickle_checkpoint(checkpoint_rp) + elif mirror_sym: + mirror = 1 + last_index = cls.sym_to_index(mirror_sym) + if checkpoint_rp: + finalizer = cls.unpickle_checkpoint(checkpoint_rp) + return ResumeSessionInfo(mirror, time, last_index, last_definitive, + finalizer, ITR) + + def sym_to_index(cls, sym_rp): + """Read last file sym rp, return last file index + + If sym_rp is not a sym at all, return None, indicating that no + file index was ever conclusively processed. + + """ + if not sym_rp.issym(): return None + link_components = sym_rp.readlink().split("/") + assert link_components[0] == 'increments' + return tuple(link_components[1:]) + + def unpickle_checkpoint(cls, checkpoint_rp): + """Read data from checkpoint_rp and return unpickled data + + Return value is pair finalizer state for a mirror checkpoint, + and (patch increment ITR, finalizer state) for increment + checkpoint. + + """ + fp = checkpoint_rp.open("rb") + data = fp.read() + fp.close() + return cPickle.loads(data) + + def ResumeCheck(cls): + """Return relevant ResumeSessionInfo if there's one we should resume + + Also if find RSI to resume, reset current time to old resume + time. + + """ + cls.SetSessionInfo() + if not cls._session_info_list: + if Globals.resume == 1: + Log.FatalError("User specified resume, but no data on " + "previous backup found.") + else: return None + else: + si = cls._session_info_list[-1] + if (Globals.resume == 1 or + (time.time() <= (si.time + Globals.resume_window) and + not Globals.resume == 0)): + Log("Resuming aborted backup dated %s" % + Time.timetopretty(si.time), 2) + Time.setcurtime(si.time) + return si + else: + Log("Last backup dated %s was aborted, but we aren't " + "resuming it." % Time.timetopretty(si.time), 2) + return None + assert 0 + +MakeClass(Resume) + + +class ResumeSessionInfo: + """Hold information about a previously aborted session""" + def __init__(self, mirror, time, last_index, + last_definitive, finalizer_state = None, ITR_state = None): + """Class initializer + + time - starting time in seconds of backup + mirror - true if backup was a mirror, false if increment + last_index - Last confirmed index processed by backup, or None + last_definitive - True is we know last_index is really last + finalizer_state - finalizer reducer state if available + ITR_state - For increment, ITM reducer state (assume mirror if NA) + + """ + self.time = time + self.mirror = mirror + self.last_index = last_index + self.last_definitive = last_definitive + self.ITR_state, self.finalizer_state, = ITR_state, finalizer_state diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py new file mode 100644 index 0000000..5740ef8 --- /dev/null +++ b/rdiff-backup/rdiff_backup/rorpiter.py @@ -0,0 +1,248 @@ +execfile("robust.py") +from __future__ import generators +import tempfile + +####################################################################### +# +# rorpiter - Operations on Iterators of Read Only Remote Paths +# + +class RORPIterException(Exception): pass + +class RORPIter: + """Functions relating to iterators of Read Only RPaths + + The main structure will be an iterator that yields RORPaths. + Every RORPath has a "raw" form that makes it more amenable to + being turned into a file. The raw form of the iterator yields + each RORPath in the form of the tuple (index, data_dictionary, + files), where files is the number of files attached (usually 1 or + 0). After that, if a file is attached, it yields that file. + + """ + def ToRaw(rorp_iter): + """Convert a rorp iterator to raw form""" + for rorp in rorp_iter: + if rorp.file: + yield (rorp.index, rorp.data, 1) + yield rorp.file + else: yield (rorp.index, rorp.data, 0) + + def FromRaw(raw_iter): + """Convert raw rorp iter back to standard form""" + for index, data, num_files in raw_iter: + rorp = RORPath(index, data) + if num_files: + assert num_files == 1, "Only one file accepted right now" + rorp.setfile(RORPIter.getnext(raw_iter)) + yield rorp + + def ToFile(rorp_iter): + """Return file version of iterator""" + return FileWrappingIter(RORPIter.ToRaw(rorp_iter)) + + def FromFile(fileobj): + """Recover rorp iterator from file interface""" + return RORPIter.FromRaw(IterWrappingFile(fileobj)) + + def IterateRPaths(base_rp): + """Return an iterator yielding RPaths with given base rp""" + yield base_rp + if base_rp.isdir(): + dirlisting = base_rp.listdir() + dirlisting.sort() + for filename in dirlisting: + for rp in RORPIter.IterateRPaths(base_rp.append(filename)): + yield rp + + def Signatures(rp_iter): + """Yield signatures of rpaths in given rp_iter""" + for rp in rp_iter: + if rp.isplaceholder(): yield rp + else: + rorp = rp.getRORPath() + if rp.isreg(): rorp.setfile(Rdiff.get_signature(rp)) + yield rorp + + def GetSignatureIter(base_rp): + """Return a signature iterator recurring over the base_rp""" + return RORPIter.Signatures(RORPIter.IterateRPaths(base_rp)) + + def CollateIterators(*rorp_iters): + """Collate RORPath iterators by index + + So it takes two or more iterators of rorps and returns an + iterator yielding tuples like (rorp1, rorp2) with the same + index. If one or the other lacks that index, it will be None + + """ + # overflow[i] means that iter[i] has been exhausted + # rorps[i] is None means that it is time to replenish it. + iter_num = len(rorp_iters) + if iter_num == 2: + return RORPIter.Collate2Iters(rorp_iters[0], rorp_iters[1]) + overflow = [None] * iter_num + rorps = overflow[:] + + def setrorps(overflow, rorps): + """Set the overflow and rorps list""" + for i in range(iter_num): + if not overflow[i] and rorps[i] is None: + try: rorps[i] = rorp_iters[i].next() + except StopIteration: + overflow[i] = 1 + rorps[i] = None + + def getleastindex(rorps): + """Return the first index in rorps, assuming rorps isn't empty""" + return min(map(lambda rorp: rorp.index, + filter(lambda x: x, rorps))) + + def yield_tuples(iter_num, overflow, rorps): + while 1: + setrorps(overflow, rorps) + if not None in overflow: break + + index = getleastindex(rorps) + yieldval = [] + for i in range(iter_num): + if rorps[i] and rorps[i].index == index: + yieldval.append(rorps[i]) + rorps[i] = None + else: yieldval.append(None) + yield IndexedTuple(index, yieldval) + return yield_tuples(iter_num, overflow, rorps) + + def Collate2Iters(riter1, riter2): + """Special case of CollateIterators with 2 arguments + + This does the same thing but is faster because it doesn't have + to consider the >2 iterator case. Profiler says speed is + important here. + + """ + relem1, relem2 = None, None + while 1: + if not relem1: + try: relem1 = riter1.next() + except StopIteration: + if relem2: yield IndexedTuple(index2, (None, relem2)) + for relem2 in riter2: + yield IndexedTuple(relem2.index, (None, relem2)) + break + index1 = relem1.index + if not relem2: + try: relem2 = riter2.next() + except StopIteration: + if relem1: yield IndexedTuple(index1, (relem1, None)) + for relem1 in riter1: + yield IndexedTuple(relem1.index, (relem1, None)) + break + index2 = relem2.index + + if index1 < index2: + yield IndexedTuple(index1, (relem1, None)) + relem1 = None + elif index1 == index2: + yield IndexedTuple(index1, (relem1, relem2)) + relem1, relem2 = None, None + else: # index2 is less + yield IndexedTuple(index2, (None, relem2)) + relem2 = None + + def getnext(iter): + """Return the next element of an iterator, raising error if none""" + try: next = iter.next() + except StopIteration: raise RORPIterException("Unexpected end to iter") + return next + + def GetDiffIter(sig_iter, new_iter): + """Return delta iterator from sig_iter to new_iter + + The accompanying file for each will be a delta as produced by + rdiff, unless the destination file does not exist, in which + case it will be the file in its entirety. + + sig_iter may be composed of rorps, but new_iter should have + full RPaths. + + """ + collated_iter = RORPIter.CollateIterators(sig_iter, new_iter) + for rorp, rp in collated_iter: yield RORPIter.diffonce(rorp, rp) + + def diffonce(sig_rorp, new_rp): + """Return one diff rorp, based from signature rorp and orig rp""" + if sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg(): + diff_rorp = new_rp.getRORPath() + diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"), + new_rp)) + diff_rorp.set_attached_filetype('diff') + return diff_rorp + else: + # Just send over originial if diff isn't appropriate + if sig_rorp: sig_rorp.close_if_necessary() + if not new_rp: return RORPath(sig_rorp.index) + elif new_rp.isreg(): + diff_rorp = new_rp.getRORPath(1) + diff_rorp.set_attached_filetype('snapshot') + return diff_rorp + else: return new_rp.getRORPath() + + def PatchIter(base_rp, diff_iter): + """Patch the appropriate rps in basis_iter using diff_iter""" + basis_iter = RORPIter.IterateRPaths(base_rp) + collated_iter = RORPIter.CollateIterators(basis_iter, diff_iter) + for basisrp, diff_rorp in collated_iter: + RORPIter.patchonce_action(base_rp, basisrp, diff_rorp).execute() + + def patchonce_action(base_rp, basisrp, diff_rorp): + """Return action patching basisrp using diff_rorp""" + assert diff_rorp, "Missing diff index %s" % basisrp.index + if not diff_rorp.lstat(): + return RobustAction(lambda: None, basisrp.delete, lambda e: None) + + if basisrp and basisrp.isreg() and diff_rorp.isreg(): + assert diff_rorp.get_attached_filetype() == 'diff' + return Rdiff.patch_with_attribs_action(basisrp, diff_rorp) + else: # Diff contains whole file, just copy it over + if not basisrp: basisrp = base_rp.new_index(diff_rorp.index) + return Robust.copy_with_attribs_action(diff_rorp, basisrp) + +MakeStatic(RORPIter) + + + +class IndexedTuple: + """Like a tuple, but has .index + + This is used by CollateIterator above, and can be passed to the + IterTreeReducer. + + """ + def __init__(self, index, sequence): + self.index = index + self.data = tuple(sequence) + + def __len__(self): return len(self.data) + + def __getitem__(self, key): + """This only works for numerical keys (faster that way)""" + return self.data[key] + + def __cmp__(self, other): + assert isinstance(other, IndexedTuple) + if self.index < other.index: return -1 + elif self.index == other.index: return 0 + else: return 1 + + def __eq__(self, other): + if isinstance(other, IndexedTuple): + return self.index == other.index and self.data == other.data + elif type(other) is types.TupleType: + return self.data == other + else: return None + + def __str__(self): + assert len(self.data) == 2 + return "(%s, %s).%s" % (str(self.data[0]), str(self.data[1]), + str(self.index)) diff --git a/rdiff-backup/rdiff_backup/rpath.py b/rdiff-backup/rdiff_backup/rpath.py new file mode 100644 index 0000000..4e6cc8f --- /dev/null +++ b/rdiff-backup/rdiff_backup/rpath.py @@ -0,0 +1,704 @@ +execfile("connection.py") +import os, stat, re, sys, shutil + +####################################################################### +# +# rpath - Wrapper class around a real path like "/usr/bin/env" +# +# The RPath and associated classes make some function calls more +# convenient (e.g. RPath.getperms()) and also make working with files +# on remote systems transparent. +# + +class RPathException(Exception): pass + +class RPathStatic: + """Contains static methods for use with RPaths""" + def copyfileobj(inputfp, outputfp): + """Copies file inputfp to outputfp in blocksize intervals""" + blocksize = Globals.blocksize + while 1: + inbuf = inputfp.read(blocksize) + if not inbuf: break + outputfp.write(inbuf) + + def cmpfileobj(fp1, fp2): + """True if file objects fp1 and fp2 contain same data""" + blocksize = Globals.blocksize + while 1: + buf1 = fp1.read(blocksize) + buf2 = fp2.read(blocksize) + if buf1 != buf2: return None + elif not buf1: return 1 + + def check_for_files(*rps): + """Make sure that all the rps exist, raise error if not""" + for rp in rps: + if not rp.lstat(): + raise RPathException("File %s does not exist" % rp.path) + + def move(rpin, rpout): + """Move rpin to rpout, renaming if possible""" + try: RPath.rename(rpin, rpout) + except os.error: + RPath.copy(rpin, rpout) + rpin.delete() + + def copy(rpin, rpout): + """Copy RPath rpin to rpout. Works for symlinks, dirs, etc.""" + Log("Regular copying %s to %s" % (rpin.index, rpout.path), 6) + if not rpin.lstat(): + raise RPathException, ("File %s does not exist" % rpin.index) + + if rpout.lstat(): + if rpin.isreg() or not RPath.cmp(rpin, rpout): + rpout.delete() # easier to write that compare + else: return + + if rpin.isreg(): RPath.copy_reg_file(rpin, rpout) + elif rpin.isdir(): rpout.mkdir() + elif rpin.issym(): rpout.symlink(rpin.readlink()) + elif rpin.ischardev(): + major, minor = rpin.getdevnums() + rpout.makedev("c", major, minor) + elif rpin.isblkdev(): + major, minor = rpin.getdevnums() + rpout.makedev("b", major, minor) + elif rpin.isfifo(): rpout.mkfifo() + elif rpin.issock(): Log("Found socket, ignoring", 1) + else: raise RPathException("File %s has unknown type" % rpin.path) + + def copy_reg_file(rpin, rpout): + """Copy regular file rpin to rpout, possibly avoiding connection""" + try: + if rpout.conn is rpin.conn: + rpout.conn.shutil.copyfile(rpin.path, rpout.path) + rpout.data = {'type': rpin.data['type']} + return + except AttributeError: pass + rpout.write_from_fileobj(rpin.open("rb")) + + def cmp(rpin, rpout): + """True if rpin has the same data as rpout + + cmp does not compare file ownership, permissions, or times, or + examine the contents of a directory. + + """ + RPath.check_for_files(rpin, rpout) + if rpin.isreg(): + if not rpout.isreg(): return None + fp1, fp2 = rpin.open("rb"), rpout.open("rb") + result = RPathStatic.cmpfileobj(fp1, fp2) + if fp1.close() or fp2.close(): + raise RPathException("Error closing file") + return result + elif rpin.isdir(): return rpout.isdir() + elif rpin.issym(): + return rpout.issym() and (rpin.readlink() == rpout.readlink()) + elif rpin.ischardev(): + return rpout.ischardev() and \ + (rpin.getdevnums() == rpout.getdevnums()) + elif rpin.isblkdev(): + return rpout.isblkdev() and \ + (rpin.getdevnums() == rpout.getdevnums()) + elif rpin.isfifo(): return rpout.isfifo() + elif rpin.issock(): return rpout.issock() + else: raise RPathException("File %s has unknown type" % rpin.path) + + def copy_attribs(rpin, rpout): + """Change file attributes of rpout to match rpin + + Only changes the chmoddable bits, uid/gid ownership, and + timestamps, so both must already exist. + + """ + Log("Copying attributes from %s to %s" % (rpin.index, rpout.path), 7) + RPath.check_for_files(rpin, rpout) + if rpin.issym(): return # symlinks have no valid attributes + if Globals.change_ownership: apply(rpout.chown, rpin.getuidgid()) + rpout.chmod(rpin.getperms()) + if not rpin.isdev(): rpout.setmtime(rpin.getmtime()) + + def cmp_attribs(rp1, rp2): + """True if rp1 has the same file attributes as rp2 + + Does not compare file access times. If not changing + ownership, do not check user/group id. + + """ + RPath.check_for_files(rp1, rp2) + if Globals.change_ownership and rp1.getuidgid() != rp2.getuidgid(): + result = None + elif rp1.getperms() != rp2.getperms(): result = None + elif rp1.issym() and rp2.issym(): # Don't check times for some types + result = 1 + elif rp1.isblkdev() and rp2.isblkdev(): result = 1 + elif rp1.ischardev() and rp2.ischardev(): result = 1 + else: result = (rp1.getmtime() == rp2.getmtime()) + Log("Compare attribs %s and %s: %s" % (rp1.path, rp2.path, result), 7) + return result + + def copy_with_attribs(rpin, rpout): + """Copy file and then copy over attributes""" + RPath.copy(rpin, rpout) + RPath.copy_attribs(rpin, rpout) + + def quick_cmp_with_attribs(rp1, rp2): + """Quicker version of cmp_with_attribs + + Instead of reading all of each file, assume that regular files + are the same if the attributes compare. + + """ + if not RPath.cmp_attribs(rp1, rp2): return None + if rp1.isreg() and rp2.isreg() and (rp1.getlen() == rp2.getlen()): + return 1 + return RPath.cmp(rp1, rp2) + + def cmp_with_attribs(rp1, rp2): + """Combine cmp and cmp_attribs""" + return RPath.cmp_attribs(rp1, rp2) and RPath.cmp(rp1, rp2) + + def rename(rp_source, rp_dest): + """Rename rp_source to rp_dest""" + assert rp_source.conn is rp_dest.conn + Log("Renaming %s to %s" % (rp_source.path, rp_dest.path), 7) + rp_source.conn.os.rename(rp_source.path, rp_dest.path) + rp_dest.data = rp_source.data + rp_source.data = {'type': None} + + def tupled_lstat(filename): + """Like os.lstat, but return only a tuple, or None if os.error + + Later versions of os.lstat return a special lstat object, + which can confuse the pickler and cause errors in remote + operations. + + """ + try: return tuple(os.lstat(filename)) + except os.error: return None + + def cmp_recursive(rp1, rp2): + """True if rp1 and rp2 are at the base of same directories + + Includes only attributes, no file data. This function may not + be used in rdiff-backup but it comes in handy in the unit + tests. + + """ + rp1.setdata() + rp2.setdata() + dsiter1, dsiter2 = map(DestructiveStepping.Iterate_with_Finalizer, + [rp1, rp2], [1, None]) + result = Iter.equal(dsiter1, dsiter2, 1) + for i in dsiter1: pass # make sure all files processed anyway + for i in dsiter2: pass + return result + +MakeStatic(RPathStatic) + + +class RORPath(RPathStatic): + """Read Only RPath - carry information about a path + + These contain information about a file, and possible the file's + data, but do not have a connection and cannot be written to or + changed. The advantage of these objects is that they can be + communicated by encoding their index and data dictionary. + + """ + def __init__(self, index, data = None): + self.index = index + if data: self.data = data + else: self.data = {'type':None} # signify empty file + self.file = None + + def __eq__(self, other): + """Signal two files equivalent""" + if not Globals.change_ownership or self.issym() and other.issym(): + # Don't take file ownership into account when comparing + data1, data2 = self.data.copy(), other.data.copy() + for d in (data1, data2): + for key in ('uid', 'gid'): + if d.has_key(key): del d[key] + return self.index == other.index and data1 == data2 + else: return self.index == other.index and self.data == other.data + + def __str__(self): + """Pretty print file statistics""" + return "Index: %s\nData: %s" % (self.index, self.data) + + def __getstate__(self): + """Return picklable state + + This is necessary in case the RORPath is carrying around a + file object, which can't/shouldn't be pickled. + + """ + return (self.index, self.data) + + def __setstate__(self, rorp_state): + """Reproduce RORPath from __getstate__ output""" + self.index, self.data = rorp_state + + def make_placeholder(self): + """Make rorp into a placeholder + + This object doesn't contain any information about the file, + but, when passed along, may show where the previous stages are + in their processing. It is the RORPath equivalent of fiber. + + """ + self.data = {'placeholder': + ("It is actually good for placeholders to use" + "up a bit of memory, so the buffers get flushed" + "more often when placeholders move through." + "See the get_dissimilar docs for more info.")} + + def isplaceholder(self): + """True if the object is a placeholder""" + return self.data.has_key('placeholder') + + def lstat(self): + """Returns type of file + + The allowable types are None if the file doesn't exist, 'reg' + for a regular file, 'dir' for a directory, 'dev' for a device + file, 'fifo' for a fifo, 'sock' for a socket, and 'sym' for a + symlink. + + """ + return self.data['type'] + gettype = lstat + + def isdir(self): + """True if self is a dir""" + return self.data['type'] == 'dir' + + def isreg(self): + """True if self is a regular file""" + return self.data['type'] == 'reg' + + def issym(self): + """True if path is of a symlink""" + return self.data['type'] == 'sym' + + def isfifo(self): + """True if path is a fifo""" + return self.data['type'] == 'fifo' + + def ischardev(self): + """True if path is a character device file""" + return self.data['type'] == 'dev' and self.data['devnums'][0] == 'c' + + def isblkdev(self): + """True if path is a block device file""" + return self.data['type'] == 'dev' and self.data['devnums'][0] == 'b' + + def isdev(self): + """True if path is a device file""" + return self.data['type'] == 'dev' + + def issock(self): + """True if path is a socket""" + return self.data['type'] == 'sock' + + def getperms(self): + """Return permission block of file""" + return self.data['perms'] + + def getsize(self): + """Return length of file in bytes""" + return self.data['size'] + + def getuidgid(self): + """Return userid/groupid of file""" + return self.data['uid'], self.data['gid'] + + def getatime(self): + """Return access time in seconds""" + return self.data['atime'] + + def getmtime(self): + """Return modification time in seconds""" + return self.data['mtime'] + + def readlink(self): + """Wrapper around os.readlink()""" + return self.data['linkname'] + + def getdevnums(self): + """Return a devices major/minor numbers from dictionary""" + return self.data['devnums'][1:] + + def setfile(self, file): + """Right now just set self.file to be the already opened file""" + assert file and not self.file + def closing_hook(): self.file_already_open = None + self.file = RPathFileHook(file, closing_hook) + self.file_already_open = None + + def get_attached_filetype(self): + """If there is a file attached, say what it is + + Currently the choices are 'snapshot' meaning an exact copy of + something, and 'diff' for an rdiff style diff. + + """ + return self.data['filetype'] + + def set_attached_filetype(self, type): + """Set the type of the attached file""" + self.data['filetype'] = type + + def open(self, mode): + """Return file type object if any was given using self.setfile""" + if mode != "rb": raise RPathException("Bad mode %s" % mode) + if self.file_already_open: + raise RPathException("Attempt to open same file twice") + self.file_already_open = 1 + return self.file + + def close_if_necessary(self): + """If file is present, discard data and close""" + if self.file: + while self.file.read(Globals.blocksize): pass + assert not self.file.close(), \ + "Error closing file\ndata = %s\nindex = %s\n" % (self.data, + self.index) + self.file_already_open = None + + +class RPath(RORPath): + """Remote Path class - wrapper around a possibly non-local pathname + + This class contains a dictionary called "data" which should + contain all the information about the file sufficient for + identification (i.e. if two files have the the same (==) data + dictionary, they are the same file). + + """ + regex_chars_to_quote = re.compile("[\\\\\\\"\\$`]") + + def __init__(self, connection, base, index = (), data = None): + """RPath constructor + + connection = self.conn is the Connection the RPath will use to + make system calls, and index is the name of the rpath used for + comparison, and should be a tuple consisting of the parts of + the rpath after the base split up. For instance ("foo", + "bar") for "foo/bar" (no base), and ("local", "bin") for + "/usr/local/bin" if the base is "/usr". + + """ + self.conn = connection + self.index = index + self.base = base + self.path = apply(os.path.join, (base,) + self.index) + self.file = None + if data: self.data = data + else: self.setdata() + + def __str__(self): + return "Path: %s\nIndex: %s\nData: %s" % (self.path, self.index, + self.data) + + def __getstate__(self): + """Return picklable state + + The connection must be local because we can't pickle a + connection. Data and any attached file also won't be saved. + + """ + assert self.conn is Globals.local_connection + return (self.index, self.base, self.data) + + def __setstate__(self, rpath_state): + """Reproduce RPath from __getstate__ output""" + self.index, self.base, self.data = rpath_state + + def setdata(self): + """Create the data dictionary""" + statblock = self.conn.RPathStatic.tupled_lstat(self.path) + if statblock is None: + self.data = {'type':None} + return + data = {} + mode = statblock[stat.ST_MODE] + + if stat.S_ISREG(mode): + type = 'reg' + data['size'] = statblock[stat.ST_SIZE] + elif stat.S_ISDIR(mode): type = 'dir' + elif stat.S_ISCHR(mode): + type = 'dev' + data['devnums'] = ('c',) + self._getdevnums() + elif stat.S_ISBLK(mode): + type = 'dev' + data['devnums'] = ('b',) + self._getdevnums() + elif stat.S_ISFIFO(mode): type = 'fifo' + elif stat.S_ISLNK(mode): + type = 'sym' + data['linkname'] = self.conn.os.readlink(self.path) + elif stat.S_ISSOCK(mode): type = 'sock' + else: raise RPathException("Unknown type for %s" % self.path) + data['type'] = type + data['perms'] = stat.S_IMODE(mode) + data['uid'] = statblock[stat.ST_UID] + data['gid'] = statblock[stat.ST_GID] + + if not (type == 'sym' or type == 'dev'): + # mtimes on symlinks and dev files don't work consistently + data['mtime'] = long(statblock[stat.ST_MTIME]) + + if Globals.preserve_atime and not type == 'sym': + data['atime'] = long(statblock[stat.ST_ATIME]) + self.data = data + + def check_consistency(self): + """Raise an error if consistency of rp broken + + This is useful for debugging when the cache and disk get out + of sync and you need to find out where it happened. + + """ + temptype = self.data['type'] + self.setdata() + assert temptype == self.data['type'], \ + "\nName: %s\nOld: %s --> New: %s\n" % \ + (self.path, temptype, self.data['type']) + + def _getdevnums(self): + """Return tuple for special file (major, minor)""" + assert self.conn is Globals.local_connection + if Globals.exclude_device_files: + # No point in finding numbers because it will be excluded anyway + return () + s = os.lstat(self.path).st_rdev + return (s >> 8, s & 0xff) + + def chmod(self, permissions): + """Wrapper around os.chmod""" + self.conn.os.chmod(self.path, permissions) + self.data['perms'] = permissions + + def settime(self, accesstime, modtime): + """Change file modification times""" + Log("Setting time of %s to %d" % (self.path, modtime), 7) + self.conn.os.utime(self.path, (accesstime, modtime)) + self.data['atime'] = accesstime + self.data['mtime'] = modtime + + def setmtime(self, modtime): + """Set only modtime (access time to present)""" + Log("Setting time of %s to %d" % (self.path, modtime), 7) + self.conn.os.utime(self.path, (time.time(), modtime)) + self.data['mtime'] = modtime + + def chown(self, uid, gid): + """Set file's uid and gid""" + self.conn.os.chown(self.path, uid, gid) + self.data['uid'] = uid + self.data['gid'] = gid + + def mkdir(self): + Log("Making directory " + self.path, 6) + self.conn.os.mkdir(self.path) + self.setdata() + + def rmdir(self): + Log("Removing directory " + self.path, 6) + self.conn.os.rmdir(self.path) + self.data = {'type': None} + + def listdir(self): + """Return list of string paths returned by os.listdir""" + return self.conn.os.listdir(self.path) + + def symlink(self, linktext): + """Make symlink at self.path pointing to linktext""" + self.conn.os.symlink(linktext, self.path) + self.setdata() + assert self.issym() + + def mkfifo(self): + """Make a fifo at self.path""" + self.conn.os.mkfifo(self.path) + self.setdata() + assert self.isfifo() + + def touch(self): + """Make sure file at self.path exists""" + Log("Touching " + self.path, 7) + self.conn.open(self.path, "w").close() + self.setdata() + assert self.isreg() + + def hasfullperms(self): + """Return true if current process has full permissions on the file""" + if self.isowner(): return self.getperms() % 01000 >= 0700 + elif self.isgroup(): return self.getperms() % 0100 >= 070 + else: return self.getperms() % 010 >= 07 + + def readable(self): + """Return true if current process has read permissions on the file""" + if self.isowner(): return self.getperms() % 01000 >= 0400 + elif self.isgroup(): return self.getperms() % 0100 >= 040 + else: return self.getperms() % 010 >= 04 + + def executable(self): + """Return true if current process has execute permissions""" + if self.isowner(): return self.getperms() % 0200 >= 0100 + elif self.isgroup(): return self.getperms() % 020 >= 010 + else: return self.getperms() % 02 >= 01 + + def isowner(self): + """Return true if current process is owner of rp or root""" + uid = self.conn.Globals.get('process_uid') + return uid == 0 or uid == self.data['uid'] + + def isgroup(self): + """Return true if current process is in group of rp""" + return self.conn.Globals.get('process_gid') == self.data['gid'] + + def delete(self): + """Delete file at self.path + + The destructive stepping allows this function to delete + directories even if they have files and we lack permissions. + + """ + Log("Deleting %s" % self.path, 7) + self.setdata() + if not self.lstat(): return # must have been deleted in meantime + elif self.isdir(): + def helper(dsrp, base_init_output, branch_reduction): + if dsrp.isdir(): dsrp.rmdir() + else: dsrp.delete() + dsiter = DestructiveStepping.Iterate_from(self, None) + itm = IterTreeReducer(lambda x: None, lambda x,y: None, None, + helper) + for dsrp in dsiter: itm(dsrp) + itm.getresult() + else: self.conn.os.unlink(self.path) + self.setdata() + + def quote(self): + """Return quoted self.path for use with os.system()""" + return '"%s"' % self.regex_chars_to_quote.sub( + lambda m: "\\"+m.group(0), self.path) + + def normalize(self): + """Return RPath canonical version of self.path + + This just means that redundant /'s will be removed, including + the trailing one, even for directories. ".." components will + be retained. + + """ + newpath = "/".join(filter(lambda x: x and x != ".", + self.path.split("/"))) + if self.path[0] == "/": newpath = "/" + newpath + elif not newpath: newpath = "." + return self.__class__(self.conn, newpath, ()) + + def dirsplit(self): + """Returns a tuple of strings (dirname, basename) + + Basename is never '' unless self is root, so it is unlike + os.path.basename. If path is just above root (so dirname is + root), then dirname is ''. In all other cases dirname is not + the empty string. Also, dirsplit depends on the format of + self, so basename could be ".." and dirname could be a + subdirectory. For an atomic relative path, dirname will be + '.'. + + """ + normed = self.normalize() + if normed.path.find("/") == -1: return (".", normed.path) + comps = normed.path.split("/") + return "/".join(comps[:-1]), comps[-1] + + def append(self, ext): + """Return new RPath with same connection by adjoing ext""" + return self.__class__(self.conn, self.base, self.index + (ext,)) + + def new_index(self, index): + """Return similar RPath but with new index""" + return self.__class__(self.conn, self.base, index) + + def open(self, mode): + """Return open file. Supports modes "w" and "r".""" + return self.conn.open(self.path, mode) + + def write_from_fileobj(self, fp): + """Reads fp and writes to self.path. Closes both when done""" + Log("Writing file object to " + self.path, 7) + assert not self.lstat(), "File %s already exists" % self.path + outfp = self.open("wb") + RPath.copyfileobj(fp, outfp) + if fp.close() or outfp.close(): + raise RPathException("Error closing file") + self.setdata() + + def isincfile(self): + """Return true if path looks like an increment file""" + dotsplit = self.path.split(".") + if len(dotsplit) < 3: return None + timestring, ext = dotsplit[-2:] + if Time.stringtotime(timestring) is None: return None + return (ext == "snapshot" or ext == "dir" or + ext == "missing" or ext == "diff") + + def getinctype(self): + """Return type of an increment file""" + return self.path.split(".")[-1] + + def getinctime(self): + """Return timestring of an increment file""" + return self.path.split(".")[-2] + + def getincbase(self): + """Return the base filename of an increment file in rp form""" + if self.index: + return self.__class__(self.conn, self.base, self.index[:-1] + + ((".".join(self.index[-1].split(".")[:-2])),)) + else: return self.__class__(self.conn, + ".".join(self.base.split(".")[:-2]), ()) + + def getincbase_str(self): + """Return the base filename string of an increment file""" + return self.getincbase().dirsplit()[1] + + def makedev(self, type, major, minor): + """Make a special file with specified type, and major/minor nums""" + cmdlist = ['mknod', self.path, type, str(major), str(minor)] + if self.conn.os.spawnvp(os.P_WAIT, 'mknod', cmdlist) != 0: + RPathException("Error running %s" % cmdlist) + if type == 'c': datatype = 'chr' + elif type == 'b': datatype = 'blk' + else: raise RPathException + self.data = {'type': datatype, 'devnums': (type, major, minor)} + + def getRORPath(self, include_contents = None): + """Return read only version of self""" + rorp = RORPath(self.index, self.data) + if include_contents: rorp.setfile(self.open("rb")) + return rorp + + +class RPathFileHook: + """Look like a file, but add closing hook""" + def __init__(self, file, closing_thunk): + self.file = file + self.closing_thunk = closing_thunk + + def read(self, length = -1): return self.file.read(length) + def write(self, buf): return self.file.write(buf) + + def close(self): + """Close file and then run closing thunk""" + result = self.file.close() + self.closing_thunk() + return result diff --git a/rdiff-backup/rdiff_backup/static.py b/rdiff-backup/rdiff_backup/static.py new file mode 100644 index 0000000..2e97cd0 --- /dev/null +++ b/rdiff-backup/rdiff_backup/static.py @@ -0,0 +1,30 @@ +execfile("globals.py") + +####################################################################### +# +# static - MakeStatic and MakeClass +# +# These functions are used to make all the instance methods in a class +# into static or class methods. +# + +class StaticMethodsError(Exception): + pass + +def MakeStatic(cls): + """turn instance methods into static ones + + The methods (that don't begin with _) of any class that + subclasses this will be turned into static methods. + + """ + for name in dir(cls): + if name[0] != "_": + cls.__dict__[name] = staticmethod(cls.__dict__[name]) + + +def MakeClass(cls): + """Turn instance methods into classmethods. Ignore _ like above""" + for name in dir(cls): + if name[0] != "_": + cls.__dict__[name] = classmethod(cls.__dict__[name]) -- cgit v1.2.1