summaryrefslogtreecommitdiff
path: root/rdiff-backup/src
diff options
context:
space:
mode:
authorben <ben@2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109>2002-03-21 07:22:43 +0000
committerben <ben@2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109>2002-03-21 07:22:43 +0000
commit8c37a5bdfdd46d5cfad6e9d67925ddef9ca382bf (patch)
tree8f19be83962ef31d8ad58429d575c6f17d89c0ea /rdiff-backup/src
parent8259a0d8a9ad1396a93cd6320943dc33446ac6ed (diff)
downloadrdiff-backup-8c37a5bdfdd46d5cfad6e9d67925ddef9ca382bf.tar.gz
First checkin
git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@2 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
Diffstat (limited to 'rdiff-backup/src')
-rwxr-xr-xrdiff-backup/src/Make37
-rw-r--r--rdiff-backup/src/connection.py467
-rw-r--r--rdiff-backup/src/destructive_stepping.py250
-rw-r--r--rdiff-backup/src/filelist.py106
-rw-r--r--rdiff-backup/src/globals.py172
-rw-r--r--rdiff-backup/src/header.py18
-rw-r--r--rdiff-backup/src/highlevel.py288
-rw-r--r--rdiff-backup/src/increment.py180
-rw-r--r--rdiff-backup/src/iterfile.py235
-rw-r--r--rdiff-backup/src/lazy.py343
-rw-r--r--rdiff-backup/src/log.py142
-rwxr-xr-xrdiff-backup/src/main.py401
-rw-r--r--rdiff-backup/src/manage.py99
-rw-r--r--rdiff-backup/src/rdiff.py175
-rw-r--r--rdiff-backup/src/restore.py158
-rw-r--r--rdiff-backup/src/rlist.py240
-rw-r--r--rdiff-backup/src/robust.py537
-rw-r--r--rdiff-backup/src/rorpiter.py248
-rw-r--r--rdiff-backup/src/rpath.py704
-rw-r--r--rdiff-backup/src/setconnections.py205
-rw-r--r--rdiff-backup/src/static.py30
-rw-r--r--rdiff-backup/src/ttime.py129
22 files changed, 5164 insertions, 0 deletions
diff --git a/rdiff-backup/src/Make b/rdiff-backup/src/Make
new file mode 100755
index 0000000..cadf9ea
--- /dev/null
+++ b/rdiff-backup/src/Make
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+
+"""Read component files of rdiff-backup, and glue them together after
+removing unnecessary bits."""
+
+import os
+
+def mystrip(filename):
+ """Open filename, read input, strip appropriately, and return contents"""
+ fp = open(filename, "r")
+ lines = fp.readlines()
+ fp.close()
+
+ i = 0
+ while(lines[i][:60] !=
+ "############################################################"):
+ i = i+1
+
+ return "".join(lines[i:]).strip() + "\n\n\n"
+
+
+
+files = ["globals.py", "static.py", "lazy.py", "log.py", "ttime.py",
+ "iterfile.py", "rlist.py", "rdiff.py", "connection.py",
+ "rpath.py", "robust.py", "rorpiter.py",
+ "destructive_stepping.py", "increment.py", "restore.py",
+ "manage.py", "filelist.py", "highlevel.py",
+ "setconnections.py", "main.py"]
+
+os.system("cp header.py rdiff-backup")
+
+outfp = open("rdiff-backup", "a")
+for file in files:
+ outfp.write(mystrip(file))
+outfp.close()
+
+os.system("chmod 755 rdiff-backup")
diff --git a/rdiff-backup/src/connection.py b/rdiff-backup/src/connection.py
new file mode 100644
index 0000000..83fc874
--- /dev/null
+++ b/rdiff-backup/src/connection.py
@@ -0,0 +1,467 @@
+execfile("rdiff.py")
+import types, os, tempfile, cPickle, shutil, traceback
+
+#######################################################################
+#
+# connection - Code that deals with remote execution
+#
+
+class ConnectionError(Exception):
+ pass
+
+class ConnectionQuit(Exception):
+ pass
+
+
+class Connection:
+ """Connection class - represent remote execution
+
+ The idea is that, if c is an instance of this class, c.foo will
+ return the object on the remote side. For functions, c.foo will
+ return a function that, when called, executes foo on the remote
+ side, sending over the arguments and sending back the result.
+
+ """
+ def __repr__(self): return self.__str__()
+
+
+class LocalConnection(Connection):
+ """Local connection
+
+ This is a dummy connection class, so that LC.foo just evaluates to
+ foo using global scope.
+
+ """
+ def __init__(self):
+ """This prevents two instances of LocalConnection"""
+ assert not Globals.local_connection
+ self.conn_number = 0 # changed by SetConnections for server
+
+ def __getattr__(self, name):
+ try: return globals()[name]
+ except KeyError:
+ builtins = globals()["__builtins__"]
+ try:
+ if type(builtins) is types.ModuleType:
+ return builtins.__dict__[name]
+ else: return builtins[name]
+ except KeyError: raise NameError, name
+
+ def __setattr__(self, name, value):
+ globals()[name] = value
+
+ def __delattr__(self, name):
+ del globals()[name]
+
+ def __str__(self): return "LocalConnection"
+
+ def reval(self, function_string, *args):
+ return apply(eval(function_string), args)
+
+ def quit(self): pass
+
+Globals.local_connection = LocalConnection()
+Globals.connections.append(Globals.local_connection)
+# Following changed by server in SetConnections
+Globals.connection_dict[0] = Globals.local_connection
+
+
+class ConnectionRequest:
+ """Simple wrapper around a PipeConnection request"""
+ def __init__(self, function_string, num_args):
+ self.function_string = function_string
+ self.num_args = num_args
+
+ def __str__(self):
+ return "ConnectionRequest: %s with %d arguments" % \
+ (self.function_string, self.num_args)
+
+
+class LowLevelPipeConnection(Connection):
+ """Routines for just sending objects from one side of pipe to another
+
+ Each thing sent down the pipe is paired with a request number,
+ currently limited to be between 0 and 255. The size of each thing
+ should be less than 2^56.
+
+ Each thing also has a type, indicated by one of the following
+ characters:
+
+ o - generic object
+ i - iterator/generator of RORPs
+ f - file object
+ b - string
+ q - quit signal
+ t - TempFile
+ R - RPath
+ r - RORPath only
+ c - PipeConnection object
+
+ """
+ def __init__(self, inpipe, outpipe):
+ """inpipe is a file-type open for reading, outpipe for writing"""
+ self.inpipe = inpipe
+ self.outpipe = outpipe
+
+ def __str__(self):
+ """Return string version
+
+ This is actually an important function, because otherwise
+ requests to represent this object would result in "__str__"
+ being executed on the other side of the connection.
+
+ """
+ return "LowLevelPipeConnection"
+
+ def _put(self, obj, req_num):
+ """Put an object into the pipe (will send raw if string)"""
+ Log.conn("sending", obj, req_num)
+ if type(obj) is types.StringType: self._putbuf(obj, req_num)
+ elif isinstance(obj, Connection): self._putconn(obj, req_num)
+ elif isinstance(obj, TempFile): self._puttempfile(obj, req_num)
+ elif isinstance(obj, RPath): self._putrpath(obj, req_num)
+ elif isinstance(obj, RORPath): self._putrorpath(obj, req_num)
+ elif ((hasattr(obj, "read") or hasattr(obj, "write"))
+ and hasattr(obj, "close")): self._putfile(obj, req_num)
+ elif hasattr(obj, "next"): self._putiter(obj, req_num)
+ else: self._putobj(obj, req_num)
+
+ def _putobj(self, obj, req_num):
+ """Send a generic python obj down the outpipe"""
+ self._write("o", cPickle.dumps(obj, 1), req_num)
+
+ def _putbuf(self, buf, req_num):
+ """Send buffer buf down the outpipe"""
+ self._write("b", buf, req_num)
+
+ def _putfile(self, fp, req_num):
+ """Send a file to the client using virtual files"""
+ self._write("f", str(VirtualFile.new(fp)), req_num)
+
+ def _putiter(self, iterator, req_num):
+ """Put an iterator through the pipe"""
+ self._write("i", str(VirtualFile.new(RORPIter.ToFile(iterator))),
+ req_num)
+
+ def _puttempfile(self, tempfile, req_num):
+ """Put a tempfile into pipe. See _putrpath"""
+ tf_repr = (tempfile.conn.conn_number, tempfile.base,
+ tempfile.index, tempfile.data)
+ self._write("t", cPickle.dumps(tf_repr, 1), req_num)
+
+ def _putrpath(self, rpath, req_num):
+ """Put an rpath into the pipe
+
+ The rpath's connection will be encoded as its conn_number. It
+ and the other information is put in a tuple.
+
+ """
+ rpath_repr = (rpath.conn.conn_number, rpath.base,
+ rpath.index, rpath.data)
+ self._write("R", cPickle.dumps(rpath_repr, 1), req_num)
+
+ def _putrorpath(self, rorpath, req_num):
+ """Put an rorpath into the pipe
+
+ This is only necessary because if there is a .file attached,
+ it must be excluded from the pickling
+
+ """
+ rorpath_repr = (rorpath.index, rorpath.data)
+ self._write("r", cPickle.dumps(rorpath_repr, 1), req_num)
+
+ def _putconn(self, pipeconn, req_num):
+ """Put a connection into the pipe
+
+ A pipe connection is represented just as the integer (in
+ string form) of its connection number it is *connected to*.
+
+ """
+ self._write("c", str(pipeconn.conn_number), req_num)
+
+ def _putquit(self):
+ """Send a string that takes down server"""
+ self._write("q", "", 255)
+
+ def _write(self, headerchar, data, req_num):
+ """Write header and then data to the pipe"""
+ self.outpipe.write(headerchar + chr(req_num) + self._l2s(len(data)))
+ self.outpipe.write(data)
+ self.outpipe.flush()
+
+ def _read(self, length):
+ """Read length bytes from inpipe, returning result"""
+ return self.inpipe.read(length)
+
+ def _s2l(self, s):
+ """Convert string to long int"""
+ assert len(s) == 7
+ l = 0L
+ for i in range(7): l = l*256 + ord(s[i])
+ return l
+
+ def _l2s(self, l):
+ """Convert long int to string"""
+ s = ""
+ for i in range(7):
+ l, remainder = divmod(l, 256)
+ s = chr(remainder) + s
+ assert remainder == 0
+ return s
+
+ def _get(self):
+ """Read an object from the pipe and return (req_num, value)"""
+ header_string = self.inpipe.read(9)
+ assert len(header_string) == 9, \
+ "Error reading from pipe (problem probably originated remotely)"
+ try:
+ format_string, req_num, length = (header_string[0],
+ ord(header_string[1]),
+ self._s2l(header_string[2:]))
+ except IndexError: raise ConnectionError()
+ if format_string == "o": result = cPickle.loads(self._read(length))
+ elif format_string == "b": result = self._read(length)
+ elif format_string == "f":
+ result = VirtualFile(self, int(self._read(length)))
+ elif format_string == "i":
+ result = RORPIter.FromFile(BufferedRead(
+ VirtualFile(self, int(self._read(length)))))
+ elif format_string == "t":
+ result = self._gettempfile(self._read(length))
+ elif format_string == "r":
+ result = self._getrorpath(self._read(length))
+ elif format_string == "R": result = self._getrpath(self._read(length))
+ elif format_string == "c":
+ result = Globals.connection_dict[int(self._read(length))]
+ else:
+ assert format_string == "q", header_string
+ raise ConnectionQuit("Received quit signal")
+ Log.conn("received", result, req_num)
+ return (req_num, result)
+
+ def _getrorpath(self, raw_rorpath_buf):
+ """Reconstruct RORPath object from raw data"""
+ index, data = cPickle.loads(raw_rorpath_buf)
+ return RORPath(index, data)
+
+ def _gettempfile(self, raw_tf_buf):
+ """Return TempFile object indicated by raw_tf_buf"""
+ conn_number, base, index, data = cPickle.loads(raw_tf_buf)
+ return TempFile(Globals.connection_dict[conn_number],
+ base, index, data)
+
+ def _getrpath(self, raw_rpath_buf):
+ """Return RPath object indicated by raw_rpath_buf"""
+ conn_number, base, index, data = cPickle.loads(raw_rpath_buf)
+ return RPath(Globals.connection_dict[conn_number], base, index, data)
+
+ def _close(self):
+ """Close the pipes associated with the connection"""
+ self.outpipe.close()
+ self.inpipe.close()
+
+
+class PipeConnection(LowLevelPipeConnection):
+ """Provide server and client functions for a Pipe Connection
+
+ Both sides act as modules that allows for remote execution. For
+ instance, self.conn.pow(2,8) will execute the operation on the
+ server side.
+
+ The only difference between the client and server is that the
+ client makes the first request, and the server listens first.
+
+ """
+ def __init__(self, inpipe, outpipe, conn_number = 0):
+ """Init PipeConnection
+
+ conn_number should be a unique (to the session) integer to
+ identify the connection. For instance, all connections to the
+ client have conn_number 0. Other connections can use this
+ number to route commands to the correct process.
+
+ """
+ LowLevelPipeConnection.__init__(self, inpipe, outpipe)
+ self.conn_number = conn_number
+ self.unused_request_numbers = {}
+ for i in range(256): self.unused_request_numbers[i] = None
+
+ def __str__(self): return "PipeConnection %d" % self.conn_number
+
+ def get_response(self, desired_req_num):
+ """Read from pipe, responding to requests until req_num.
+
+ Sometimes after a request is sent, the other side will make
+ another request before responding to the original one. In
+ that case, respond to the request. But return once the right
+ response is given.
+
+ """
+ while 1:
+ try: req_num, object = self._get()
+ except ConnectionQuit:
+ self._put("quitting", self.get_new_req_num())
+ return
+ if req_num == desired_req_num: return object
+ else:
+ assert isinstance(object, ConnectionRequest)
+ self.answer_request(object, req_num)
+
+ def answer_request(self, request, req_num):
+ """Put the object requested by request down the pipe"""
+ del self.unused_request_numbers[req_num]
+ argument_list = []
+ for i in range(request.num_args):
+ arg_req_num, arg = self._get()
+ assert arg_req_num == req_num
+ argument_list.append(arg)
+ try: result = apply(eval(request.function_string), argument_list)
+ except: result = self.extract_exception()
+ self._put(result, req_num)
+ self.unused_request_numbers[req_num] = None
+
+ def extract_exception(self):
+ """Return active exception"""
+ Log("Sending back exception: \n" +
+ "".join(traceback.format_tb(sys.exc_info()[2])), 2)
+ return sys.exc_info()[1]
+
+ def Server(self):
+ """Start server's read eval return loop"""
+ Globals.server = 1
+ Globals.connections.append(self)
+ Log("Starting server", 6)
+ self.get_response(-1)
+
+ def reval(self, function_string, *args):
+ """Execute command on remote side
+
+ The first argument should be a string that evaluates to a
+ function, like "pow", and the remaining are arguments to that
+ function.
+
+ """
+ req_num = self.get_new_req_num()
+ self._put(ConnectionRequest(function_string, len(args)), req_num)
+ for arg in args: self._put(arg, req_num)
+ result = self.get_response(req_num)
+ self.unused_request_numbers[req_num] = None
+ if isinstance(result, Exception): raise result
+ else: return result
+
+ def get_new_req_num(self):
+ """Allot a new request number and return it"""
+ if not self.unused_request_numbers:
+ raise ConnectionError("Exhaused possible connection numbers")
+ req_num = self.unused_request_numbers.keys()[0]
+ del self.unused_request_numbers[req_num]
+ return req_num
+
+ def quit(self):
+ """Close the associated pipes and tell server side to quit"""
+ assert not Globals.server
+ self._putquit()
+ self._get()
+ self._close()
+
+ def __getattr__(self, name):
+ """Intercept attributes to allow for . invocation"""
+ return EmulateCallable(self, name)
+
+
+class RedirectedConnection(Connection):
+ """Represent a connection more than one move away
+
+ For instance, suppose things are connected like this: S1---C---S2.
+ If Server1 wants something done by Server2, it will have to go
+ through the Client. So on S1's side, S2 will be represented by a
+ RedirectedConnection.
+
+ """
+ def __init__(self, conn_number, routing_number = 0):
+ """RedirectedConnection initializer
+
+ Returns a RedirectedConnection object for the given
+ conn_number, where commands are routed through the connection
+ with the given routing_number. 0 is the client, so the
+ default shouldn't have to be changed.
+
+ """
+ self.conn_number = conn_number
+ self.routing_number = routing_number
+ self.routing_conn = Globals.connection_dict[routing_number]
+
+ def __str__(self):
+ return "RedirectedConnection %d,%d" % (self.conn_number,
+ self.routing_number)
+
+ def __getattr__(self, name):
+ return EmulateCallable(self.routing_conn,
+ "Globals.get_dict_val('connection_dict', %d).%s"
+ % (self.conn_number, name))
+
+
+class EmulateCallable:
+ """This is used by PipeConnection in calls like conn.os.chmod(foo)"""
+ def __init__(self, connection, name):
+ self.connection = connection
+ self.name = name
+ def __call__(self, *args):
+ return apply(self.connection.reval, (self.name,) + args)
+ def __getattr__(self, attr_name):
+ return EmulateCallable(self.connection,
+ "%s.%s" % (self.name, attr_name))
+
+
+class VirtualFile:
+ """When the client asks for a file over the connection, it gets this
+
+ The returned instance then forwards requests over the connection.
+ The class's dictionary is used by the server to associate each
+ with a unique file number.
+
+ """
+ #### The following are used by the server
+ vfiles = {}
+ counter = 0
+
+ def getbyid(cls, id):
+ return cls.vfiles[id]
+ getbyid = classmethod(getbyid)
+
+ def readfromid(cls, id, length):
+ return cls.vfiles[id].read(length)
+ readfromid = classmethod(readfromid)
+
+ def writetoid(cls, id, buffer):
+ return cls.vfiles[id].write(buffer)
+ writetoid = classmethod(writetoid)
+
+ def closebyid(cls, id):
+ fp = cls.vfiles[id]
+ del cls.vfiles[id]
+ return fp.close()
+ closebyid = classmethod(closebyid)
+
+ def new(cls, fileobj):
+ """Associate a new VirtualFile with a read fileobject, return id"""
+ count = cls.counter
+ cls.vfiles[count] = fileobj
+ cls.counter = count + 1
+ return count
+ new = classmethod(new)
+
+
+ #### And these are used by the client
+ def __init__(self, connection, id):
+ self.connection = connection
+ self.id = id
+
+ def read(self, length = -1):
+ return self.connection.VirtualFile.readfromid(self.id, length)
+
+ def write(self, buf):
+ return self.connection.VirtualFile.writetoid(self.id, buf)
+
+ def close(self):
+ return self.connection.VirtualFile.closebyid(self.id)
diff --git a/rdiff-backup/src/destructive_stepping.py b/rdiff-backup/src/destructive_stepping.py
new file mode 100644
index 0000000..80d274e
--- /dev/null
+++ b/rdiff-backup/src/destructive_stepping.py
@@ -0,0 +1,250 @@
+from __future__ import generators
+execfile("rorpiter.py")
+
+#######################################################################
+#
+# destructive-stepping - Deal with side effects from traversing trees
+#
+
+class DSRPath(RPath):
+ """Destructive Stepping RPath
+
+ Sometimes when we traverse the directory tree, even when we just
+ want to read files, we have to change things, like the permissions
+ of a file or directory in order to read it, or the file's access
+ times. This class is like an RPath, but the permission and time
+ modifications are delayed, so that they can be done at the very
+ end when they won't be disturbed later.
+
+ """
+ def __init__(self, *args):
+ self.perms_delayed = self.times_delayed = None
+ RPath.__init__(self, *args)
+
+ def __getstate__(self):
+ """Return picklable state. See RPath __getstate__."""
+ assert self.conn is Globals.local_connection # Can't pickle a conn
+ pickle_dict = {}
+ for attrib in ['index', 'data', 'perms_delayed', 'times_delayed',
+ 'newperms', 'newtimes', 'path', 'base']:
+ if self.__dict__.has_key(attrib):
+ pickle_dict[attrib] = self.__dict__[attrib]
+ return pickle_dict
+
+ def __setstate__(self, pickle_dict):
+ """Set state from object produced by getstate"""
+ self.conn = Globals.local_connection
+ for attrib in pickle_dict.keys():
+ self.__dict__[attrib] = pickle_dict[attrib]
+
+ def delay_perm_writes(self):
+ """Signal that permission writing should be delayed until the end"""
+ self.perms_delayed = 1
+ self.newperms = None
+
+ def delay_time_changes(self):
+ """Signal that time changes should also be delayed until the end"""
+ self.times_delayed = 1
+ self.newtimes = None
+
+ def chmod(self, permissions):
+ """Change permissions, delaying if self.perms_delayed is set"""
+ if self.perms_delayed:
+ self.newperms = 1
+ self.data['perms'] = permissions
+ else: RPath.chmod(self, permissions)
+
+ def chmod_bypass(self, permissions):
+ """Change permissions without updating the data dictionary"""
+ self.conn.os.chmod(self.path, permissions)
+ self.perms_delayed = self.newperms = 1
+
+ def remember_times(self):
+ """Mark times as changed so they can be restored later"""
+ self.times_delayed = self.newtimes = 1
+
+ def settime(self, accesstime, modtime):
+ """Change times, delaying if self.times_delayed is set"""
+ if self.times_delayed:
+ self.newtimes = 1
+ self.data['atime'] = accesstime
+ self.data['mtime'] = modtime
+ else: RPath.settime(self, accesstime, modtime)
+
+ def settime_bypass(self, accesstime, modtime):
+ """Change times without updating data dictionary"""
+ self.conn.os.utime(self.path, (accesstime, modtime))
+
+ def setmtime(self, modtime):
+ """Change mtime, delaying if self.times_delayed is set"""
+ if self.times_delayed:
+ self.newtimes = 1
+ self.data['mtime'] = modtime
+ else: RPath.setmtime(self, modtime)
+
+ def setmtime_bypass(self, modtime):
+ """Change mtime without updating data dictionary"""
+ self.conn.os.utime(self.path, (time.time(), modtime))
+
+ def restoretimes(self):
+ """Write times in self.data back to file"""
+ RPath.settime(self, self.data['atime'], self.data['mtime'])
+
+ def restoreperms(self):
+ """Write permissions in self.data back to file"""
+ RPath.chmod(self, self.data['perms'])
+
+ def write_changes(self):
+ """Write saved up permission/time changes"""
+ if not self.lstat(): return # File has been deleted in meantime
+
+ if self.perms_delayed and self.newperms:
+ self.conn.os.chmod(self.path, self.getperms())
+ if self.times_delayed:
+ if self.data.has_key('atime'):
+ self.settime_bypass(self.getatime(), self.getmtime())
+ elif self.newtimes and self.data.has_key('mtime'):
+ self.setmtime_bypass(self.getmtime())
+
+
+class DestructiveStepping:
+ """Destructive stepping"""
+ def initialize(dsrpath, source):
+ """Change permissions of dsrpath, possibly delay writes
+
+ Abort if we need to access something and can't. If the file
+ is on the source partition, just log warning and return true.
+ Return false if everything good to go.
+
+ """
+ if not source or Globals.change_source_perms:
+ dsrpath.delay_perm_writes()
+
+ def warn(err):
+ Log("Received error '%s' when dealing with file %s, skipping..."
+ % (err, dsrpath.path), 1)
+
+ def abort():
+ Log.FatalError("Missing access to file %s - aborting." %
+ dsrpath.path)
+
+ def try_chmod(perms):
+ """Try to change the perms. If fail, return error."""
+ try: dsrpath.chmod_bypass(perms)
+ except os.error, err: return err
+ return None
+
+ if dsrpath.isreg() and not dsrpath.readable():
+ if source:
+ if Globals.change_source_perms and dsrpath.isowner():
+ err = try_chmod(0400)
+ if err:
+ warn(err)
+ return 1
+ else:
+ warn("No read permissions")
+ return 1
+ elif not Globals.change_mirror_perms or try_chmod(0600): abort()
+ elif dsrpath.isdir():
+ if source and (not dsrpath.readable() or not dsrpath.executable()):
+ if Globals.change_source_perms and dsrpath.isowner():
+ err = try_chmod(0500)
+ if err:
+ warn(err)
+ return 1
+ else:
+ warn("No read or exec permissions")
+ return 1
+ elif not source and not dsrpath.hasfullperms():
+ if Globals.change_mirror_perms: try_chmod(0700)
+
+ # Permissions above; now try to preserve access times if necessary
+ if (source and (Globals.preserve_atime or
+ Globals.change_source_perms) or
+ not source):
+ # These are the circumstances under which we will have to
+ # touch up a file's times after we are done with it
+ dsrpath.remember_times()
+ return None
+
+ def Finalizer(initial_state = None):
+ """Return a finalizer that can work on an iterator of dsrpaths
+
+ The reason we have to use an IterTreeReducer is that some files
+ should be updated immediately, but for directories we sometimes
+ need to update all the files in the directory before finally
+ coming back to it.
+
+ """
+ return IterTreeReducer(lambda x: None, lambda x,y: None, None,
+ lambda dsrpath, x, y: dsrpath.write_changes(),
+ initial_state)
+
+ def isexcluded(dsrp, source):
+ """Return true if given DSRPath is excluded/ignored
+
+ If source = 1, treat as source file, otherwise treat as
+ destination file.
+
+ """
+ if Globals.exclude_device_files and dsrp.isdev(): return 1
+
+ if source: exclude_regexps = Globals.exclude_regexps
+ else: exclude_regexps = Globals.exclude_mirror_regexps
+
+ for regexp in exclude_regexps:
+ if regexp.match(dsrp.path):
+ Log("Excluding %s" % dsrp.path, 6)
+ return 1
+ return None
+
+ def Iterate_from(baserp, source, starting_index = None):
+ """Iterate dsrps from baserp, skipping any matching exclude_regexps
+
+ includes only dsrps with indicies greater than starting_index
+ if starting_index is not None.
+
+ """
+ def helper_starting_from(dsrpath):
+ """Like helper, but only start iterating after starting_index"""
+ if dsrpath.index > starting_index:
+ # Past starting_index, revert to normal helper
+ for dsrp in helper(dsrpath): yield dsrp
+ elif dsrpath.index == starting_index[:len(dsrpath.index)]:
+ # May encounter starting index on this branch
+ if (not DestructiveStepping.isexcluded(dsrpath, source) and
+ not DestructiveStepping.initialize(dsrpath, source)):
+ if dsrpath.isdir():
+ dir_listing = dsrpath.listdir()
+ dir_listing.sort()
+ for filename in dir_listing:
+ for dsrp in helper_starting_from(
+ dsrpath.append(filename)):
+ yield dsrp
+
+ def helper(dsrpath):
+ if (not DestructiveStepping.isexcluded(dsrpath, source) and
+ not DestructiveStepping.initialize(dsrpath, source)):
+ yield dsrpath
+ if dsrpath.isdir():
+ dir_listing = dsrpath.listdir()
+ dir_listing.sort()
+ for filename in dir_listing:
+ for dsrp in helper(dsrpath.append(filename)):
+ yield dsrp
+
+ base_dsrpath = DSRPath(baserp.conn, baserp.base,
+ baserp.index, baserp.data)
+ if starting_index is None: return helper(base_dsrpath)
+ else: return helper_starting_from(base_dsrpath)
+
+ def Iterate_with_Finalizer(baserp, source):
+ """Like Iterate_from, but finalize each dsrp afterwards"""
+ finalize = DestructiveStepping.Finalizer()
+ for dsrp in DestructiveStepping.Iterate_from(baserp, source):
+ yield dsrp
+ finalize(dsrp)
+ finalize.getresult()
+
+
+MakeStatic(DestructiveStepping)
diff --git a/rdiff-backup/src/filelist.py b/rdiff-backup/src/filelist.py
new file mode 100644
index 0000000..7a660c3
--- /dev/null
+++ b/rdiff-backup/src/filelist.py
@@ -0,0 +1,106 @@
+from __future__ import generators
+execfile("manage.py")
+
+#######################################################################
+#
+# filelist - Some routines that help with operations over files listed
+# in standard input instead of over whole directories.
+#
+
+class FilelistError(Exception): pass
+
+class Filelist:
+ """Many of these methods have analogs in highlevel.py"""
+ def File2Iter(fp, baserp):
+ """Convert file obj with one pathname per line into rpiter
+
+ Closes fp when done. Given files are added to baserp.
+
+ """
+ while 1:
+ line = fp.readline()
+ if not line: break
+ if line[-1] == "\n": line = line[:-1] # strip trailing newline
+ if not line: continue # skip blank lines
+ elif line[0] == "/": raise FilelistError(
+ "Read in absolute file name %s." % line)
+ yield baserp.append(line)
+ assert not fp.close(), "Error closing filelist fp"
+
+ def Mirror(src_rpath, dest_rpath, rpiter):
+ """Copy files in fileiter from src_rpath to dest_rpath"""
+ sigiter = dest_rpath.conn.Filelist.get_sigs(dest_rpath, rpiter)
+ diffiter = Filelist.get_diffs(src_rpath, sigiter)
+ dest_rpath.conn.Filelist.patch(dest_rpath, diffiter)
+ dest_rpath.setdata()
+
+ def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath):
+ """Mirror + put increment in tree based at inc_rpath"""
+ sigiter = dest_rpath.conn.Filelist.get_sigs(dest_rpath, rpiter)
+ diffiter = Filelist.get_diffs(src_rpath, sigiter)
+ dest_rpath.conn.Filelist.patch_and_increment(dest_rpath, diffiter,
+ inc_rpath)
+ dest_rpath.setdata()
+
+ def get_sigs(dest_rpbase, rpiter):
+ """Get signatures of file analogs in rpiter
+
+ This is meant to be run on the destination side. Only the
+ extention part of the rps in rpiter will be used; the base is
+ ignored.
+
+ """
+ def dest_iter(src_iter):
+ for src_rp in src_iter: yield dest_rpbase.new_index(src_rp.index)
+ return RORPIter.Signatures(dest_iter())
+
+ def get_diffs(src_rpbase, sigiter):
+ """Get diffs based on sigiter and files in src_rpbase
+
+ This should be run on the local side.
+
+ """
+ for sig_rorp in sigiter:
+ new_rp = src_rpbase.new_index(sig_rorp.index)
+ yield RORPIter.diffonce(sig_rorp, new_rp)
+
+ def patch(dest_rpbase, diffiter):
+ """Process diffs in diffiter and update files in dest_rbpase.
+
+ Run remotely.
+
+ """
+ for diff_rorp in diffiter:
+ basisrp = dest_rpbase.new_index(diff_rorp.index)
+ if basisrp.lstat(): Filelist.make_subdirs(basisrp)
+ Log("Processing %s" % basisrp.path, 7)
+ RORPIter.patchonce(dest_rpbase, basisrp, diff_rorp)
+
+ def patch_and_increment(dest_rpbase, diffiter, inc_rpbase):
+ """Apply diffs in diffiter to dest_rpbase, and increment to inc_rpbase
+
+ Also to be run remotely.
+
+ """
+ for diff_rorp in diffiter:
+ basisrp = dest_rpbase.new_index(diff_rorp.index)
+ if diff_rorp.lstat(): Filelist.make_subdirs(basisrp)
+ Log("Processing %s" % basisrp.path, 7)
+ # XXX This isn't done yet...
+
+ def make_subdirs(rpath):
+ """Make sure that all the directories under the rpath exist
+
+ This function doesn't try to get the permissions right on the
+ underlying directories, just do the minimum to make sure the
+ file can be created.
+
+ """
+ dirname = rpath.dirsplit()[0]
+ if dirname == '.' or dirname == '': return
+ dir_rp = RPath(rpath.conn, dirname)
+ Filelist.make_subdirs(dir_rp)
+ if not dir_rp.lstat(): dir_rp.mkdir()
+
+
+MakeStatic(Filelist)
diff --git a/rdiff-backup/src/globals.py b/rdiff-backup/src/globals.py
new file mode 100644
index 0000000..d9cd64a
--- /dev/null
+++ b/rdiff-backup/src/globals.py
@@ -0,0 +1,172 @@
+import re, os
+
+#######################################################################
+#
+# globals - aggregate some configuration options
+#
+
+class Globals:
+
+ # The current version of rdiff-backup
+ version = "0.6.0"
+
+ # This determines how many bytes to read at a time when copying
+ blocksize = 32768
+
+ # This is used by the BufferedRead class to determine how many
+ # bytes to request from the underlying file per read(). Larger
+ # values may save on connection overhead and latency.
+ conn_bufsize = 4096
+
+ # True if script is running as a server
+ server = None
+
+ # uid and gid of the owner of the rdiff-backup process. This can
+ # vary depending on the connection.
+ process_uid = os.getuid()
+ process_gid = os.getgid()
+
+ # If true, when copying attributes, also change target's uid/gid
+ change_ownership = None
+
+ # If true, change the permissions of unwriteable mirror files
+ # (such as directories) so that they can be written, and then
+ # change them back.
+ change_mirror_perms = 1
+
+ # If true, temporarily change permissions of unreadable files in
+ # the source directory to make sure we can read all files.
+ change_source_perms = None
+
+ # If true, try to reset the atimes of the source partition.
+ preserve_atime = None
+
+ # This is a list of compiled regular expressions. If one of them
+ # matches a file in the source area, do not process that file.
+ exclude_regexps = []
+
+ # Another list of compiled regexps; this time the file is excluded
+ # if it matches something in the destination area.
+ exclude_mirror_regexps = []
+
+ # If this is true, rdiff-backup will exclude any dev files it
+ # sees, in the same way it excludes files matching the exclude
+ # regexps.
+ exclude_device_files = None
+
+ # This will be set as soon as the LocalConnection class loads
+ local_connection = None
+
+ # If this is true, instead of processing whole directory, just
+ # examine files read in from standard input.
+ include_from_stdin = None
+
+ # All connections should be added to the following list, so
+ # further global changes can be propagated to the remote systems.
+ # The first element should be Globals.local_connection. For a
+ # server, the second is the connection to the client.
+ connections = []
+
+ # Each process should have a connection number unique to the
+ # session. The client has connection number 0.
+ connection_number = 0
+
+ # Dictionary pairing connection numbers with connections. Set in
+ # SetConnections for all connections.
+ connection_dict = {}
+
+ # True if the script is the end that reads the source directory
+ # for backups. It is true for purely local sessions.
+ isbackup_reader = None
+
+ # Connection of the real backup reader (for which isbackup_reader
+ # is true)
+ backup_reader = None
+
+ # True if the script is the end that writes to the increment and
+ # mirror directories. True for purely local sessions.
+ isbackup_writer = None
+
+ # Connection of the backup writer
+ backup_writer = None
+
+ # This list is used by the set function below. When a new
+ # connection is created with init_connection, its Globals class
+ # will match this one for all the variables mentioned in this
+ # list.
+ changed_settings = []
+
+ # rdiff-backup will try to checkpoint its state every
+ # checkpoint_interval seconds. Then when resuming, at most this
+ # amount of time is lost.
+ checkpoint_interval = 20
+
+ # The RPath of the rdiff-backup-data directory.
+ rbdir = None
+
+ # Indicates if a resume or a lack of resume is forced. This
+ # should be None for the default. 0 means don't resume, and 1
+ # means resume.
+ resume = None
+
+ # If there has been an aborted backup fewer than this many seconds
+ # ago, attempt to resume it where it left off instead of starting
+ # a new one.
+ resume_window = 7200
+
+ # This string is used when recognizing and creating time strings.
+ # If the time_separator is ":", then W3 datetime strings like
+ # 2001-12-07T04:22:01-07:00 are produced. It can be set to "_" to
+ # make filenames that don't contain colons, which aren't allowed
+ # under MS windows NT.
+ time_separator = ":"
+
+ def get(cls, name):
+ """Return the value of something in this class"""
+ return cls.__dict__[name]
+ get = classmethod(get)
+
+ def set(cls, name, val):
+ """Set the value of something in this class
+
+ Use this instead of writing the values directly if the setting
+ matters to remote sides. This function updates the
+ changed_settings list, so other connections know to copy the
+ changes.
+
+ """
+ cls.changed_settings.append(name)
+ cls.__dict__[name] = val
+ set = classmethod(set)
+
+ def set_integer(cls, name, val):
+ """Like set, but make sure val is an integer"""
+ try: intval = int(val)
+ except ValueError:
+ Log.FatalError("Variable %s must be set to an integer -\n"
+ "received %s instead." % (name, val))
+ cls.set(name, intval)
+ set_integer = classmethod(set_integer)
+
+ def get_dict_val(cls, name, key):
+ """Return val from dictionary in this class"""
+ return cls.__dict__[name][key]
+ get_dict_val = classmethod(get_dict_val)
+
+ def set_dict_val(cls, name, key, val):
+ """Set value for dictionary in this class"""
+ cls.__dict__[name][key] = val
+ set_dict_val = classmethod(set_dict_val)
+
+ def add_regexp(cls, regstr, mirror=None):
+ """Add a regular expression to the exclude list"""
+ for conn in Globals.connections:
+ conn.Globals.add_regexp_local(regstr, mirror)
+ add_regexp = classmethod(add_regexp)
+
+ def add_regexp_local(cls, regstr, mirror):
+ """Add the regex only to the local Globals class"""
+ compiled = re.compile(regstr)
+ if mirror: Globals.exclude_mirror_regexps.append(compiled)
+ else: Globals.exclude_regexps.append(compiled)
+ add_regexp_local = classmethod(add_regexp_local)
diff --git a/rdiff-backup/src/header.py b/rdiff-backup/src/header.py
new file mode 100644
index 0000000..31b3ff0
--- /dev/null
+++ b/rdiff-backup/src/header.py
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+#
+# rdiff-backup -- Mirror files while keeping incremental changes
+# Version 0.6.0 released March 14, 2002
+# Copyright (C) 2001 Ben Escoto <bescoto@stanford.edu>
+#
+# This program is licensed under the GNU General Public License (GPL).
+# Distributions of rdiff-backup usually include a copy of the GPL in a
+# file called COPYING. The GPL is also available online at
+# http://www.gnu.org/copyleft/gpl.html.
+#
+# Please send mail to me or the mailing list if you find bugs or have
+# any suggestions.
+
+from __future__ import nested_scopes, generators
+import os, stat, time, sys, getopt, re, cPickle, types, shutil, sha, marshal, traceback, popen2, tempfile
+
+
diff --git a/rdiff-backup/src/highlevel.py b/rdiff-backup/src/highlevel.py
new file mode 100644
index 0000000..55fe007
--- /dev/null
+++ b/rdiff-backup/src/highlevel.py
@@ -0,0 +1,288 @@
+from __future__ import generators
+execfile("filelist.py")
+
+#######################################################################
+#
+# highlevel - High level functions for mirroring, mirror & inc, etc.
+#
+
+class SkipFileException(Exception):
+ """Signal that the current file should be skipped but then continue
+
+ This exception will often be raised when there is problem reading
+ an individual file, but it makes sense for the rest of the backup
+ to keep going.
+
+ """
+ pass
+
+
+class HighLevel:
+ """High level static functions
+
+ The design of some of these functions is represented on the
+ accompanying diagram.
+
+ """
+ def Mirror(src_rpath, dest_rpath, checkpoint = 1, session_info = None):
+ """Turn dest_rpath into a copy of src_rpath
+
+ Checkpoint true means to checkpoint periodically, otherwise
+ not. If session_info is given, try to resume Mirroring from
+ that point.
+
+ """
+ SourceS = src_rpath.conn.HLSourceStruct
+ DestS = dest_rpath.conn.HLDestinationStruct
+
+ SourceS.set_session_info(session_info)
+ DestS.set_session_info(session_info)
+ src_init_dsiter = SourceS.split_initial_dsiter(src_rpath)
+ dest_sigiter = DestS.get_sigs(dest_rpath, src_init_dsiter)
+ diffiter = SourceS.get_diffs_and_finalize(dest_sigiter)
+ DestS.patch_and_finalize(dest_rpath, diffiter, checkpoint)
+
+ dest_rpath.setdata()
+
+ def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath,
+ session_info = None):
+ """Mirror + put increments in tree based at inc_rpath"""
+ SourceS = src_rpath.conn.HLSourceStruct
+ DestS = dest_rpath.conn.HLDestinationStruct
+
+ SourceS.set_session_info(session_info)
+ DestS.set_session_info(session_info)
+ if not session_info: dest_rpath.conn.SaveState.touch_last_file()
+ src_init_dsiter = SourceS.split_initial_dsiter(src_rpath)
+ dest_sigiter = DestS.get_sigs(dest_rpath, src_init_dsiter)
+ diffiter = SourceS.get_diffs_and_finalize(dest_sigiter)
+ DestS.patch_increment_and_finalize(dest_rpath, diffiter, inc_rpath)
+
+ dest_rpath.setdata()
+ inc_rpath.setdata()
+
+ def Restore(rest_time, mirror_base, baseinc_tup, target_base):
+ """Like Restore.RestoreRecursive but check arguments"""
+ if not isinstance(target_base, DSRPath):
+ target_base = DSRPath(target_base.conn, target_base.base,
+ target_base.index, target_base.data)
+ Restore.RestoreRecursive(rest_time, mirror_base,
+ baseinc_tup, target_base)
+
+MakeStatic(HighLevel)
+
+
+class HLSourceStruct:
+ """Hold info used by HL on the source side"""
+ _session_info = None # set to si if resuming
+ def set_session_info(cls, session_info):
+ cls._session_info = session_info
+
+ def iterate_from(cls, rpath):
+ """Supply more aruments to DestructiveStepping.Iterate_from"""
+ if cls._session_info:
+ return DestructiveStepping.Iterate_from(rpath, 1,
+ cls._session_info.last_index)
+ else: return DestructiveStepping.Iterate_from(rpath, 1)
+
+ def split_initial_dsiter(cls, rpath):
+ """Set iterators of all dsrps from rpath, returning one"""
+ dsiter = cls.iterate_from(rpath)
+ initial_dsiter1, cls.initial_dsiter2 = Iter.multiplex(dsiter, 2)
+ return initial_dsiter1
+
+ def get_diffs_and_finalize(cls, sigiter):
+ """Return diffs and finalize any dsrp changes remaining
+
+ Return a rorpiterator with files included of signatures of
+ dissimilar files. This is the last operation run on the local
+ filestream, so finalize dsrp writes.
+
+ """
+ collated = RORPIter.CollateIterators(cls.initial_dsiter2, sigiter)
+ finalizer = DestructiveStepping.Finalizer()
+ def diffs():
+ for dsrp, dest_sig in collated:
+ try:
+ if dest_sig:
+ if dest_sig.isplaceholder(): yield dest_sig
+ else: yield RORPIter.diffonce(dest_sig, dsrp)
+ if dsrp: finalizer(dsrp)
+ except (IOError, OSError, RdiffException):
+ Log.exception()
+ Log("Error processing %s, skipping" %
+ str(dest_sig.index), 2)
+ finalizer.getresult()
+ return diffs()
+
+MakeClass(HLSourceStruct)
+
+
+class HLDestinationStruct:
+ """Hold info used by HL on the destination side"""
+ _session_info = None # set to si if resuming
+ def set_session_info(cls, session_info):
+ cls._session_info = session_info
+
+ def iterate_from(cls, rpath):
+ """Supply more arguments to DestructiveStepping.Iterate_from"""
+ if cls._session_info:
+ return DestructiveStepping.Iterate_from(rpath, None,
+ cls._session_info.last_index)
+ else: return DestructiveStepping.Iterate_from(rpath, None)
+
+ def split_initial_dsiter(cls, rpath):
+ """Set initial_dsiters (iteration of all dsrps from rpath)"""
+ dsiter = cls.iterate_from(rpath)
+ result, cls.initial_dsiter2 = Iter.multiplex(dsiter, 2)
+ return result
+
+ def get_dissimilar(cls, baserp, src_init_iter, dest_init_iter):
+ """Get dissimilars
+
+ Returns an iterator which enumerates the dsrps which are
+ different on the source and destination ends. The dsrps do
+ not necessarily exist on the destination end.
+
+ Also, to prevent the system from getting backed up on the
+ remote end, if we don't get enough dissimilars, stick in a
+ placeholder every so often, like fiber. The more
+ placeholders, the more bandwidth used, but if there aren't
+ enough, lots of memory will be used because files will be
+ accumulating on the source side. How much will accumulate
+ will depend on the Globals.conn_bufsize value.
+
+ """
+ collated = RORPIter.CollateIterators(src_init_iter, dest_init_iter)
+ def generate_dissimilar():
+ counter = 0
+ for src_rorp, dest_dsrp in collated:
+ if not dest_dsrp:
+ dsrp = DSRPath(baserp.conn, baserp.base, src_rorp.index)
+ if dsrp.lstat():
+ Log("Warning: Found unexpected destination file %s."
+ % dsrp.path, 2)
+ if DestructiveStepping.isexcluded(dsrp, None): continue
+ counter = 0
+ yield dsrp
+ elif not src_rorp or not src_rorp == dest_dsrp:
+ counter = 0
+ yield dest_dsrp
+ else: # source and destinition both exist and are same
+ if counter == 20:
+ placeholder = RORPath(src_rorp.index)
+ placeholder.make_placeholder()
+ counter = 0
+ yield placeholder
+ else: counter += 1
+ return generate_dissimilar()
+
+ def get_sigs(cls, baserp, src_init_iter):
+ """Return signatures of all dissimilar files"""
+ dest_iters1 = cls.split_initial_dsiter(baserp)
+ dissimilars = cls.get_dissimilar(baserp, src_init_iter, dest_iters1)
+ return RORPIter.Signatures(dissimilars)
+
+ def get_dsrp(cls, dest_rpath, index):
+ """Return initialized dsrp based on dest_rpath with given index"""
+ dsrp = DSRPath(dest_rpath.conn, dest_rpath.base, index)
+ DestructiveStepping.initialize(dsrp, None)
+ return dsrp
+
+ def get_finalizer(cls):
+ """Return finalizer, starting from session info if necessary"""
+ init_state = cls._session_info and cls._session_info.finalizer_state
+ return DestructiveStepping.Finalizer(init_state)
+
+ def get_ITR(cls, inc_rpath):
+ """Return ITR, starting from state if necessary"""
+ init_state = cls._session_info and cls._session_info.ITR_state
+ return Inc.make_patch_increment_ITR(inc_rpath, init_state)
+
+ def patch_and_finalize(cls, dest_rpath, diffs, checkpoint = 1):
+ """Apply diffs and finalize"""
+ collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2)
+ finalizer = cls.get_finalizer()
+ dsrp = None
+
+ def error_checked():
+ """Inner writing loop, check this for errors"""
+ indexed_tuple = collated.next()
+ Log("Processing %s" % str(indexed_tuple), 7)
+ diff_rorp, dsrp = indexed_tuple
+ if not dsrp:
+ dsrp = cls.get_dsrp(dest_rpath, diff_rorp.index)
+ DestructiveStepping.initialize(dsrp, None)
+ if diff_rorp and not diff_rorp.isplaceholder():
+ RORPIter.patchonce_action(None, dsrp, diff_rorp).execute()
+ finalizer(dsrp)
+ return dsrp
+
+ try:
+ while 1:
+ try: dsrp = cls.check_skip_error(error_checked)
+ except StopIteration: break
+ if checkpoint: SaveState.checkpoint_mirror(finalizer, dsrp)
+ except: cls.handle_last_error(dsrp, finalizer)
+ finalizer.getresult()
+ if checkpoint: SaveState.checkpoint_remove()
+
+ def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath):
+ """Apply diffs, write increment if necessary, and finalize"""
+ collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2)
+ finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath)
+ dsrp = None
+
+ def error_checked():
+ """Inner writing loop, catch variety of errors from this"""
+ indexed_tuple = collated.next()
+ Log("Processing %s" % str(indexed_tuple), 7)
+ diff_rorp, dsrp = indexed_tuple
+ if not dsrp:
+ dsrp = cls.get_dsrp(dest_rpath, indexed_tuple.index)
+ DestructiveStepping.initialize(dsrp, None)
+ indexed_tuple = IndexedTuple(indexed_tuple.index,
+ (diff_rorp, dsrp))
+ if diff_rorp and diff_rorp.isplaceholder():
+ indexed_tuple = IndexedTuple(indexed_tuple.index,
+ (None, dsrp))
+ ITR(indexed_tuple)
+ finalizer(dsrp)
+ return dsrp
+
+ try:
+ while 1:
+ try: dsrp = cls.check_skip_error(error_checked)
+ except StopIteration: break
+ SaveState.checkpoint_inc_backup(ITR, finalizer, dsrp)
+ except: cls.handle_last_error(dsrp, finalizer, ITR)
+ ITR.getresult()
+ finalizer.getresult()
+ SaveState.checkpoint_remove()
+
+ def check_skip_error(cls, thunk):
+ """Run thunk, catch certain errors skip files"""
+ try: return thunk()
+ except (IOError, OSError, SkipFileException), exp:
+ Log.exception()
+ if (not isinstance(exp, IOError) or
+ (isinstance(exp, IOError) and
+ (exp[0] in [2, # Means that a file is missing
+ 5, # Reported by docv (see list)
+ 13, # Permission denied IOError
+ 26] # Requested by Campbell (see list) -
+ # happens on some NT systems
+ ))):
+ Log("Skipping file", 2)
+ return None
+ else: raise
+
+ def handle_last_error(cls, dsrp, finalizer, ITR = None):
+ """If catch fatal error, try to checkpoint before exiting"""
+ Log.exception(1)
+ if ITR: SaveState.checkpoint_inc_backup(ITR, finalizer, dsrp, 1)
+ else: SaveState.checkpoint_mirror(finalizer, dsrp, 1)
+ SaveState.touch_last_file_definitive()
+ raise
+
+MakeClass(HLDestinationStruct)
diff --git a/rdiff-backup/src/increment.py b/rdiff-backup/src/increment.py
new file mode 100644
index 0000000..4ed6a39
--- /dev/null
+++ b/rdiff-backup/src/increment.py
@@ -0,0 +1,180 @@
+execfile("destructive_stepping.py")
+
+#######################################################################
+#
+# increment - Provides Inc class, which writes increment files
+#
+# This code is what writes files ending in .diff, .snapshot, etc.
+#
+
+class Inc:
+ """Class containing increment functions"""
+ def Increment_action(new, mirror, incpref):
+ """Main file incrementing function, returns RobustAction
+
+ new is the file on the active partition,
+ mirror is the mirrored file from the last backup,
+ incpref is the prefix of the increment file.
+
+ This function basically moves mirror -> incpref.
+
+ """
+ if not (new and new.lstat() or mirror.lstat()):
+ return Robust.null_action # Files deleted in meantime, do nothing
+
+ Log("Incrementing mirror file " + mirror.path, 5)
+ if ((new and new.isdir()) or mirror.isdir()) and not incpref.isdir():
+ incpref.mkdir()
+
+ if not mirror.lstat(): return Inc.makemissing_action(incpref)
+ elif mirror.isdir(): return Inc.makedir_action(mirror, incpref)
+ elif new.isreg() and mirror.isreg():
+ return Inc.makediff_action(new, mirror, incpref)
+ else: return Inc.makesnapshot_action(mirror, incpref)
+
+ def Increment(new, mirror, incpref):
+ Inc.Increment_action(new, mirror, incpref).execute()
+
+ def makemissing_action(incpref):
+ """Signify that mirror file was missing"""
+ return RobustAction(lambda: None,
+ Inc.get_inc_ext(incpref, "missing").touch,
+ lambda exp: None)
+
+ def makesnapshot_action(mirror, incpref):
+ """Copy mirror to incfile, since new is quite different"""
+ snapshotrp = Inc.get_inc_ext(incpref, "snapshot")
+ return Robust.copy_with_attribs_action(mirror, snapshotrp)
+
+ def makediff_action(new, mirror, incpref):
+ """Make incfile which is a diff new -> mirror"""
+ diff = Inc.get_inc_ext(incpref, "diff")
+ return Robust.chain([Rdiff.write_delta_action(new, mirror, diff),
+ Robust.copy_attribs_action(mirror, diff)])
+
+ def makedir_action(mirrordir, incpref):
+ """Make file indicating directory mirrordir has changed"""
+ dirsign = Inc.get_inc_ext(incpref, "dir")
+ def final():
+ dirsign.touch()
+ RPath.copy_attribs(mirrordir, dirsign)
+ return RobustAction(lambda: None, final, dirsign.delete)
+
+ def get_inc_ext(rp, typestr):
+ """Return RPath/DSRPath like rp but with inc/time extension
+
+ If the file exists, then probably a previous backup has been
+ aborted. We then keep asking FindTime to get a time later
+ than the one that already has an inc file.
+
+ """
+ def get_newinc(timestr):
+ """Get new increment rp with given time suffix"""
+ addtostr = lambda s: "%s.%s.%s" % (s, timestr, typestr)
+ if rp.index:
+ return rp.__class__(rp.conn, rp.base, rp.index[:-1] +
+ (addtostr(rp.index[-1]),))
+ else: return rp.__class__(rp.conn, addtostr(rp.base), rp.index)
+
+ inctime = 0
+ while 1:
+ inctime = Resume.FindTime(rp.index, inctime)
+ incrp = get_newinc(Time.timetostring(inctime))
+ if not incrp.lstat(): return incrp
+
+ def make_patch_increment_ITR(inc_rpath, initial_state = None):
+ """Return IterTreeReducer that patches and increments
+
+ This has to be an ITR because directories that have files in
+ them changed are flagged with an increment marker. There are
+ four possibilities as to the order:
+
+ 1. Normal file -> Normal file: right away
+ 2. Directory -> Directory: wait until files in the directory
+ are processed, as we won't know whether to add a marker
+ until the end.
+ 3. Normal file -> Directory: right away, so later files will
+ have a directory to go into.
+ 4. Directory -> Normal file: Wait until the end, so we can
+ process all the files in the directory.
+
+ """
+ def base_init(indexed_tuple):
+ """Patch if appropriate, return (a,b) tuple
+
+ a is true if found directory and thus didn't take action
+
+ if a is false, b is true if some changes were made
+
+ if a is true, b is the rp of a temporary file used to hold
+ the diff_rorp's data (for dir -> normal file change), and
+ false if none was necessary.
+
+ """
+ diff_rorp, dsrp = indexed_tuple
+ incpref = inc_rpath.new_index(indexed_tuple.index)
+ if dsrp.isdir(): return init_dir(dsrp, diff_rorp, incpref)
+ else: return init_non_dir(dsrp, diff_rorp, incpref)
+
+ def init_dir(dsrp, diff_rorp, incpref):
+ """Initial processing of a directory
+
+ Make the corresponding directory right away, but wait
+ until the end to write the replacement. However, if the
+ diff_rorp contains data, we must write it locally before
+ continuing, or else that data will be lost in the stream.
+
+ """
+ if not (incpref.lstat() and incpref.isdir()): incpref.mkdir()
+ if diff_rorp and diff_rorp.isreg() and diff_rorp.file:
+ tf = TempFileManager.new(dsrp)
+ RPathStatic.copy_with_attribs(diff_rorp, tf)
+ tf.set_attached_filetype(diff_rorp.get_attached_filetype())
+ return (1, tf)
+ else: return (1, None)
+
+ def init_non_dir(dsrp, diff_rorp, incpref):
+ """Initial processing of non-directory
+
+ If a reverse diff is called for it is generated by apply
+ the forwards diff first on a temporary file.
+
+ """
+ if diff_rorp:
+ if dsrp.isreg() and diff_rorp.isreg():
+ tf = TempFileManager.new(dsrp)
+ def init_thunk():
+ Rdiff.patch_with_attribs_action(dsrp, diff_rorp,
+ tf).execute()
+ Inc.Increment_action(tf, dsrp, incpref).execute()
+ Robust.make_tf_robustaction(init_thunk, (tf,),
+ (dsrp,)).execute()
+ else:
+ Robust.chain([Inc.Increment_action(diff_rorp, dsrp,
+ incpref),
+ RORPIter.patchonce_action(
+ None, dsrp, diff_rorp)]).execute()
+ return (None, 1)
+ return (None, None)
+
+ def base_final(base_tuple, base_init_tuple, changed):
+ """Patch directory if not done, return true iff made change"""
+ if base_init_tuple[0]: # was directory
+ diff_rorp, dsrp = base_tuple
+ if changed or diff_rorp:
+ if base_init_tuple[1]: diff_rorp = base_init_tuple[1]
+ Inc.Increment(diff_rorp, dsrp,
+ inc_rpath.new_index(base_tuple.index))
+ if diff_rorp:
+ RORPIter.patchonce_action(None, dsrp,
+ diff_rorp).execute()
+ if isinstance(diff_rorp, TempFile): diff_rorp.delete()
+ return 1
+ return None
+ else: # changed iff base_init_tuple says it was
+ return base_init_tuple[1]
+
+ return IterTreeReducer(base_init, lambda x,y: x or y, None,
+ base_final, initial_state)
+
+MakeStatic(Inc)
diff --git a/rdiff-backup/src/iterfile.py b/rdiff-backup/src/iterfile.py
new file mode 100644
index 0000000..21629b2
--- /dev/null
+++ b/rdiff-backup/src/iterfile.py
@@ -0,0 +1,235 @@
+execfile("ttime.py")
+import cPickle
+
+#######################################################################
+#
+# iterfile - Convert an iterator to a file object and vice-versa
+#
+
+class IterFileException(Exception): pass
+
+class UnwrapFile:
+ """Contains some basic methods for parsing a file containing an iter"""
+ def __init__(self, file):
+ self.file = file
+
+ def _s2l(self, s):
+ """Convert string to long int"""
+ assert len(s) == 7
+ l = 0L
+ for i in range(7): l = l*256 + ord(s[i])
+ return l
+
+ def _get(self):
+ """Return pair (type, data) next in line on the file
+
+ type is a single character which is either "o" for object, "f"
+ for file, "c" for a continution of a file, or None if no more
+ data can be read. Data is either the file's data, if type is
+ "c" or "f", or the actual object if the type is "o".
+
+ """
+ header = self.file.read(8)
+ if not header: return None, None
+ assert len(header) == 8, "Header is only %d bytes" % len(header)
+ type, length = header[0], self._s2l(header[1:])
+ buf = self.file.read(length)
+ if type == "o": return type, cPickle.loads(buf)
+ else: return type, buf
+
+
+class IterWrappingFile(UnwrapFile):
+ """An iterator generated from a file.
+
+ Initialize with a file type object, and then it will return the
+ elements of the file in order.
+
+ """
+ def __init__(self, file):
+ UnwrapFile.__init__(self, file)
+ self.currently_in_file = None
+
+ def __iter__(self): return self
+
+ def next(self):
+ if self.currently_in_file:
+ self.currently_in_file.close() # no error checking by this point
+ type, data = self._get()
+ if not type: raise StopIteration
+ if type == "o": return data
+ elif type == "f":
+ file = IterVirtualFile(self, data)
+ if data: self.currently_in_file = file
+ else: self.currently_in_file = None
+ return file
+ else: raise IterFileException("Bad file type %s" % type)
+
+
+class IterVirtualFile(UnwrapFile):
+ """Another version of a pretend file
+
+ This is returned by IterWrappingFile when a file is embedded in
+ the main file that the IterWrappingFile is based around.
+
+ """
+ def __init__(self, iwf, initial_data):
+ """Initializer
+
+ initial_data is the data from the first block of the file.
+ iwf is the iter wrapping file that spawned this
+ IterVirtualFile.
+
+ """
+ UnwrapFile.__init__(self, iwf.file)
+ self.iwf = iwf
+ self.bufferlist = [initial_data]
+ self.bufferlen = len(initial_data)
+ self.closed = None
+
+ def check_consistency(self):
+ l = len("".join(self.bufferlist))
+ assert l == self.bufferlen, \
+ "Length of IVF bufferlist doesn't match (%s, %s)" % \
+ (l, self.bufferlen)
+
+ def read(self, length):
+ assert not self.closed
+ if self.iwf.currently_in_file:
+ while length >= self.bufferlen:
+ if not self.addtobuffer(): break
+
+ real_len = min(length, self.bufferlen)
+ combined_buffer = "".join(self.bufferlist)
+ assert len(combined_buffer) == self.bufferlen, \
+ (len(combined_buffer), self.bufferlen)
+ self.bufferlist = [combined_buffer[real_len:]]
+ self.bufferlen = self.bufferlen - real_len
+ return combined_buffer[:real_len]
+
+ def addtobuffer(self):
+ """Read a chunk from the file and add it to the buffer"""
+ assert self.iwf.currently_in_file
+ type, data = self._get()
+ assert type == "c", "Type is %s instead of c" % type
+ if data:
+ self.bufferlen = self.bufferlen + len(data)
+ self.bufferlist.append(data)
+ return 1
+ else:
+ self.iwf.currently_in_file = None
+ return None
+
+ def close(self):
+ """Currently just reads whats left and discards it"""
+ while self.iwf.currently_in_file:
+ self.addtobuffer()
+ self.bufferlist = []
+ self.bufferlen = 0
+ self.closed = 1
+
+
+class FileWrappingIter:
+ """A file interface wrapping around an iterator
+
+ This is initialized with an iterator, and then converts it into a
+ stream of characters. The object will evaluate as little of the
+ iterator as is necessary to provide the requested bytes.
+
+ The actual file is a sequence of marshaled objects, each preceded
+ by 8 bytes which identifies the following the type of object, and
+ specifies its length. File objects are not marshalled, but the
+ data is written in chunks of Globals.blocksize, and the following
+ blocks can identify themselves as continuations.
+
+ """
+ def __init__(self, iter):
+ """Initialize with iter"""
+ self.iter = iter
+ self.bufferlist = []
+ self.bufferlen = 0L
+ self.currently_in_file = None
+ self.closed = None
+
+ def read(self, length):
+ """Return next length bytes in file"""
+ assert not self.closed
+ while self.bufferlen < length:
+ if not self.addtobuffer(): break
+
+ combined_buffer = "".join(self.bufferlist)
+ assert len(combined_buffer) == self.bufferlen
+ real_len = min(self.bufferlen, length)
+ self.bufferlen = self.bufferlen - real_len
+ self.bufferlist = [combined_buffer[real_len:]]
+ return combined_buffer[:real_len]
+
+ def addtobuffer(self):
+ """Updates self.bufferlist and self.bufferlen, adding on a chunk
+
+ Returns None if we have reached the end of the iterator,
+ otherwise return true.
+
+ """
+ if self.currently_in_file:
+ buf = "c" + self.addfromfile()
+ else:
+ try: currentobj = self.iter.next()
+ except StopIteration: return None
+ if hasattr(currentobj, "read") and hasattr(currentobj, "close"):
+ self.currently_in_file = currentobj
+ buf = "f" + self.addfromfile()
+ else:
+ pickle = cPickle.dumps(currentobj, 1)
+ buf = "o" + self._l2s(len(pickle)) + pickle
+
+ self.bufferlist.append(buf)
+ self.bufferlen = self.bufferlen + len(buf)
+ return 1
+
+ def addfromfile(self):
+ """Read a chunk from the current file and return it"""
+ buf = self.currently_in_file.read(Globals.blocksize)
+ if not buf:
+ assert not self.currently_in_file.close()
+ self.currently_in_file = None
+ return self._l2s(len(buf)) + buf
+
+ def _l2s(self, l):
+ """Convert long int to string of 7 characters"""
+ s = ""
+ for i in range(7):
+ l, remainder = divmod(l, 256)
+ s = chr(remainder) + s
+ assert remainder == 0
+ return s
+
+ def close(self): self.closed = 1
+
+
+class BufferedRead:
+ """Buffer the .read() calls to the given file
+
+ This is used to lessen overhead and latency when a file is sent
+ over a connection.
+
+ """
+ def __init__(self, file):
+ self.file = file
+ self.buffer = ""
+ self.bufsize = Globals.conn_bufsize
+
+ def read(self, l = -1):
+ if l < 0: # Read as much as possible
+ result = self.buffer + self.file.read()
+ self.buffer = ""
+ return result
+
+ if len(self.buffer) < l: # Try to make buffer as long as l
+ self.buffer += self.file.read(max(self.bufsize,
+ l - len(self.buffer)))
+ actual_size = min(l, len(self.buffer))
+ result = self.buffer[:actual_size]
+ self.buffer = self.buffer[actual_size:]
+ return result
+
+ def close(self): return self.file.close()
diff --git a/rdiff-backup/src/lazy.py b/rdiff-backup/src/lazy.py
new file mode 100644
index 0000000..28e92c3
--- /dev/null
+++ b/rdiff-backup/src/lazy.py
@@ -0,0 +1,343 @@
+from __future__ import generators
+execfile("static.py")
+import os, stat, types
+
+#######################################################################
+#
+# lazy - Define some lazy data structures and functions acting on them
+#
+
+class Iter:
+ """Hold static methods for the manipulation of lazy iterators"""
+
+ def filter(predicate, iterator):
+ """Like filter in a lazy functional programming language"""
+ for i in iterator:
+ if predicate(i): yield i
+
+ def map(function, iterator):
+ """Like map in a lazy functional programming language"""
+ for i in iterator: yield function(i)
+
+ def foreach(function, iterator):
+ """Run function on each element in iterator"""
+ for i in iterator: function(i)
+
+ def cat(*iters):
+ """Lazily concatenate iterators"""
+ for iter in iters:
+ for i in iter: yield i
+
+ def cat2(iter_of_iters):
+ """Lazily concatenate iterators, iterated by big iterator"""
+ for iter in iter_of_iters:
+ for i in iter: yield i
+
+ def empty(iter):
+ """True if iterator has length 0"""
+ for i in iter: return None
+ return 1
+
+ def equal(iter1, iter2, verbose = None, operator = lambda x, y: x == y):
+ """True if iterator 1 has same elements as iterator 2
+
+ Use equality operator, or == if it is unspecified.
+
+ """
+ for i1 in iter1:
+ try: i2 = iter2.next()
+ except StopIteration:
+ if verbose: print "End when i1 = %s" % i1
+ return None
+ if not operator(i1, i2):
+ if verbose: print "%s not equal to %s" % (i1, i2)
+ return None
+ try: i2 = iter2.next()
+ except StopIteration: return 1
+ if verbose: print "End when i2 = %s" % i2
+ return None
+
+ def Or(iter):
+ """True if any element in iterator is true. Short circuiting"""
+ i = None
+ for i in iter:
+ if i: return i
+ return i
+
+ def And(iter):
+ """True if all elements in iterator are true. Short circuiting"""
+ i = 1
+ for i in iter:
+ if not i: return i
+ return i
+
+ def len(iter):
+ """Return length of iterator"""
+ i = 0
+ while 1:
+ try: iter.next()
+ except StopIteration: return i
+ i = i+1
+
+ def foldr(f, default, iter):
+ """foldr the "fundamental list recursion operator"?"""
+ try: next = iter.next()
+ except StopIteration: return default
+ return f(next, Iter.foldr(f, default, iter))
+
+ def foldl(f, default, iter):
+ """the fundamental list iteration operator.."""
+ while 1:
+ try: next = iter.next()
+ except StopIteration: return default
+ default = f(default, next)
+
+ def multiplex(iter, num_of_forks, final_func = None, closing_func = None):
+ """Split a single iterater into a number of streams
+
+ The return val will be a list with length num_of_forks, each
+ of which will be an iterator like iter. final_func is the
+ function that will be called on each element in iter just as
+ it is being removed from the buffer. closing_func is called
+ when all the streams are finished.
+
+ """
+ if num_of_forks == 2 and not final_func and not closing_func:
+ im2 = IterMultiplex2(iter)
+ return (im2.yielda(), im2.yieldb())
+ if not final_func: final_func = lambda i: None
+ if not closing_func: closing_func = lambda: None
+
+ # buffer is a list of elements that some iterators need and others
+ # don't
+ buffer = []
+
+ # buffer[forkposition[i]] is the next element yieled by iterator
+ # i. If it is -1, yield from the original iter
+ starting_forkposition = [-1] * num_of_forks
+ forkposition = starting_forkposition[:]
+ called_closing_func = [None]
+
+ def get_next(fork_num):
+ """Return the next element requested by fork_num"""
+ if forkposition[fork_num] == -1:
+ try: buffer.insert(0, iter.next())
+ except StopIteration:
+ # call closing_func if necessary
+ if (forkposition == starting_forkposition and
+ not called_closing_func[0]):
+ closing_func()
+ called_closing_func[0] = None
+ raise StopIteration
+ for i in range(num_of_forks): forkposition[i] += 1
+
+ return_val = buffer[forkposition[fork_num]]
+ forkposition[fork_num] -= 1
+
+ blen = len(buffer)
+ if not (blen-1) in forkposition:
+ # Last position in buffer no longer needed
+ assert forkposition[fork_num] == blen-2
+ final_func(buffer[blen-1])
+ del buffer[blen-1]
+ return return_val
+
+ def make_iterator(fork_num):
+ while(1): yield get_next(fork_num)
+
+ return tuple(map(make_iterator, range(num_of_forks)))
+
+MakeStatic(Iter)
+
+
+class IterMultiplex2:
+ """Multiplex an iterator into 2 parts
+
+ This is a special optimized case of the Iter.multiplex function,
+ used when there is no closing_func or final_func, and we only want
+ to split it into 2. By profiling, this is a time sensitive class.
+
+ """
+ def __init__(self, iter):
+ self.a_leading_by = 0 # How many places a is ahead of b
+ self.buffer = []
+ self.iter = iter
+
+ def yielda(self):
+ """Return first iterator"""
+ buf, iter = self.buffer, self.iter
+ while(1):
+ if self.a_leading_by >= 0: # a is in front, add new element
+ elem = iter.next() # exception will be passed
+ buf.append(elem)
+ else: elem = buf.pop(0) # b is in front, subtract an element
+ self.a_leading_by += 1
+ yield elem
+
+ def yieldb(self):
+ """Return second iterator"""
+ buf, iter = self.buffer, self.iter
+ while(1):
+ if self.a_leading_by <= 0: # b is in front, add new element
+ elem = iter.next() # exception will be passed
+ buf.append(elem)
+ else: elem = buf.pop(0) # a is in front, subtract an element
+ self.a_leading_by -= 1
+ yield elem
+
+
+class IterTreeReducer:
+ """Tree style reducer object for iterator
+
+ The indicies of a RORPIter form a tree type structure. This class
+ can be used on each element of an iter in sequence and the result
+ will be as if the corresponding tree was reduced. This tries to
+ bridge the gap between the tree nature of directories, and the
+ iterator nature of the connection between hosts and the temporal
+ order in which the files are processed.
+
+ The elements of the iterator are required to have a tuple-style
+ .index, called "indexed elem" below.
+
+ """
+ def __init__(self, base_init, branch_reducer,
+ branch_base, base_final, initial_state = None):
+ """ITR initializer
+
+ base_init is a function of one argument, an indexed elem. It
+ is called immediately on any elem in the iterator. It should
+ return some value type A.
+
+ branch_reducer and branch_base are used to form a value on a
+ bunch of reduced branches, in the way that a linked list of
+ type C can be folded to form a value type B.
+
+ base_final is called when leaving a tree. It takes three
+ arguments, the indexed elem, the output (type A) of base_init,
+ the output of branch_reducer on all the branches (type B) and
+ returns a value type C.
+
+ """
+ self.base_init = base_init
+ self.branch_reducer = branch_reducer
+ self.base_final = base_final
+ self.branch_base = branch_base
+
+ if initial_state: self.setstate(initial_state)
+ else:
+ self.state = IterTreeReducerState(branch_base)
+ self.subreducer = None
+
+ def setstate(self, state):
+ """Update with new state, recursive if necessary"""
+ self.state = state
+ if state.substate: self.subreducer = self.newinstance(state.substate)
+ else: self.subreducer = None
+
+ def getstate(self): return self.state
+
+ def getresult(self):
+ """Return results of calculation"""
+ if not self.state.calculated: self.calculate_final_val()
+ return self.state.final_val
+
+ def intree(self, index):
+ """Return true if index is still in current tree"""
+ return self.state.base_index == index[:len(self.state.base_index)]
+
+ def newinstance(self, state = None):
+ """Return reducer of same type as self
+
+ If state is None, sets substate of self.state, otherwise
+ assume this is already set.
+
+ """
+ new = self.__class__(self.base_init, self.branch_reducer,
+ self.branch_base, self.base_final, state)
+ if state is None: self.state.substate = new.state
+ return new
+
+ def process_w_subreducer(self, indexed_elem):
+ """Give object to subreducer, if necessary update branch_val"""
+ if not self.subreducer:
+ self.subreducer = self.newinstance()
+ if not self.subreducer(indexed_elem):
+ self.state.branch_val = self.branch_reducer(self.state.branch_val,
+ self.subreducer.getresult())
+ self.subreducer = self.newinstance()
+ assert self.subreducer(indexed_elem)
+
+ def calculate_final_val(self):
+ """Set final value"""
+ if self.subreducer:
+ self.state.branch_val = self.branch_reducer(self.state.branch_val,
+ self.subreducer.getresult())
+ if self.state.current_index is None:
+ # No input, set None as default value
+ self.state.final_val = None
+ else:
+ self.state.final_val = self.base_final(self.state.base_elem,
+ self.state.base_init_val,
+ self.state.branch_val)
+ self.state.calculated = 1
+
+ def __call__(self, indexed_elem):
+ """Process elem, current position in iterator
+
+ Returns true if elem successfully processed, false if elem is
+ not in the current tree and thus the final result is
+ available.
+
+ """
+ index = indexed_elem.index
+ assert type(index) is types.TupleType
+
+ if self.state.current_index is None: # must be at base
+ self.state.base_init_val = self.base_init(indexed_elem)
+ # Do most crash-prone op first, so we don't leave inconsistent
+ self.state.current_index = index
+ self.state.base_index = index
+ self.state.base_elem = indexed_elem
+ return 1
+ elif not index > self.state.current_index:
+ Log("Warning: oldindex %s >= newindex %s" %
+ (self.state.current_index, index), 2)
+
+ if not self.intree(index):
+ self.calculate_final_val()
+ return None
+ else:
+ self.process_w_subreducer(indexed_elem)
+ self.state.current_index = index
+ return 1
+
+
+class IterTreeReducerState:
+ """Holds the state for IterTreeReducers
+
+ An IterTreeReducer cannot be pickled directly because it holds
+ some anonymous functions. This class contains the relevant data
+ that is likely to be picklable, so the ITR can be saved and loaded
+ if the associated functions are known.
+
+ """
+ def __init__(self, branch_base):
+ """ITRS initializer
+
+ Class variables:
+ self.current_index - last index processing started on, or None
+ self.base_index - index of first element processed
+ self.base_elem - first element processed
+ self.branch_val - default branch reducing value
+
+ self.calculated - true iff the final value has been calculated
+ self.base_init_val - return value of base_init function
+ self.final_val - Final value once it's calculated
+ self.substate - IterTreeReducerState when subreducer active
+
+ """
+ self.current_index = None
+ self.calculated = None
+ self.branch_val = branch_base
+ self.substate = None
+
diff --git a/rdiff-backup/src/log.py b/rdiff-backup/src/log.py
new file mode 100644
index 0000000..5416fd2
--- /dev/null
+++ b/rdiff-backup/src/log.py
@@ -0,0 +1,142 @@
+import time, sys
+execfile("lazy.py")
+
+#######################################################################
+#
+# log - Manage logging
+#
+
+class LoggerError(Exception): pass
+
+class Logger:
+ """All functions which deal with logging"""
+ def __init__(self):
+ self.log_file_open = None
+ self.log_file_local = None
+ self.verbosity = self.term_verbosity = 3
+ # termverbset is true if the term_verbosity has been explicity set
+ self.termverbset = None
+
+ def setverbosity(self, verbosity_string):
+ """Set verbosity levels. Takes a number string"""
+ try: self.verbosity = int(verbosity_string)
+ except ValueError:
+ Log.FatalError("Verbosity must be a number, received '%s' "
+ "instead." % verbosity_string)
+ if not self.termverbset: self.term_verbosity = self.verbosity
+
+ def setterm_verbosity(self, termverb_string):
+ """Set verbosity to terminal. Takes a number string"""
+ try: self.term_verbosity = int(termverb_string)
+ except ValueError:
+ Log.FatalError("Terminal verbosity must be a number, received "
+ "'%s' insteaxd." % termverb_string)
+ self.termverbset = 1
+
+ def open_logfile(self, rpath):
+ """Inform all connections of an open logfile.
+
+ rpath.conn will write to the file, and the others will pass
+ write commands off to it.
+
+ """
+ for conn in Globals.connections:
+ conn.Log.open_logfile_allconn(rpath.conn)
+ rpath.conn.Log.open_logfile_local(rpath)
+
+ def open_logfile_allconn(self, log_file_conn):
+ """Run on all connections to signal log file is open"""
+ self.log_file_open = 1
+ self.log_file_conn = log_file_conn
+
+ def open_logfile_local(self, rpath):
+ """Open logfile locally - should only be run on one connection"""
+ assert self.log_file_conn is Globals.local_connection
+ self.log_file_local = 1
+ self.logrp = rpath
+ self.logfp = rpath.open("a")
+
+ def close_logfile(self):
+ """Close logfile and inform all connections"""
+ if self.log_file_open:
+ for conn in Globals.connections:
+ conn.Log.close_logfile_allconn()
+ self.log_file_conn.Log.close_logfile_local()
+
+ def close_logfile_allconn(self):
+ """Run on every connection"""
+ self.log_file_open = None
+
+ def close_logfile_local(self):
+ """Run by logging connection - close logfile"""
+ assert self.log_file_conn is Globals.local_connection
+ assert not self.logfp.close()
+
+ def format(self, message, verbosity):
+ """Format the message, possibly adding date information"""
+ if verbosity < 9: return message + "\n"
+ else: return "%s %s\n" % (time.asctime(time.localtime(time.time())),
+ message)
+
+ def __call__(self, message, verbosity):
+ """Log message that has verbosity importance"""
+ if verbosity <= self.verbosity: self.log_to_file(message)
+ if verbosity <= self.term_verbosity:
+ self.log_to_term(message, verbosity)
+
+ def log_to_file(self, message):
+ """Write the message to the log file, if possible"""
+ if self.log_file_open:
+ if self.log_file_local:
+ self.logfp.write(self.format(message, self.verbosity))
+ else: self.log_file_conn.Log.log_to_file(message)
+
+ def log_to_term(self, message, verbosity):
+ """Write message to stdout/stderr"""
+ if verbosity <= 2 or Globals.server: termfp = sys.stderr
+ else: termfp = sys.stdout
+ termfp.write(self.format(message, self.term_verbosity))
+
+ def conn(self, direction, result, req_num):
+ """Log some data on the connection
+
+ The main worry with this function is that something in here
+ will create more network traffic, which will spiral to
+ infinite regress. So, for instance, logging must only be done
+ to the terminal, because otherwise the log file may be remote.
+
+ """
+ if self.term_verbosity < 9: return
+ if type(result) is types.StringType: result_repr = repr(result)
+ else: result_repr = str(result)
+ if Globals.server: conn_str = "Server"
+ else: conn_str = "Client"
+ self.log_to_term("%s %s (%d): %s" %
+ (conn_str, direction, req_num, result_repr), 9)
+
+ def FatalError(self, message):
+ self("Fatal Error: " + message, 1)
+ Globals.Main.cleanup()
+ sys.exit(1)
+
+ def exception(self, only_terminal = 0):
+ """Log an exception and traceback at verbosity 2
+
+ If only_terminal is None, log normally. If it is 1, then only
+ log to disk if log file is local (self.log_file_open = 1). If
+ it is 2, don't log to disk at all.
+
+ """
+ assert only_terminal in (0, 1, 2)
+ if (only_terminal == 0 or
+ (only_terminal == 1 and self.log_file_open)):
+ logging_func = self.__call__
+ else: logging_func = self.log_to_term
+
+ exc_info = sys.exc_info()
+ logging_func("Exception %s raised of class %s" %
+ (exc_info[1], exc_info[0]), 2)
+ logging_func("".join(traceback.format_tb(exc_info[2])), 2)
+
+
+Log = Logger()
diff --git a/rdiff-backup/src/main.py b/rdiff-backup/src/main.py
new file mode 100755
index 0000000..24455f6
--- /dev/null
+++ b/rdiff-backup/src/main.py
@@ -0,0 +1,401 @@
+#!/usr/bin/python
+
+execfile("highlevel.py")
+import getopt, sys, re
+
+#######################################################################
+#
+# main - Start here: Read arguments, set global settings, etc.
+#
+
+class Main:
+ def __init__(self):
+ self.action = None
+ self.remote_cmd, self.remote_schema = None, None
+ self.force = None
+ self.exclude_regstrs = ["/proc"]
+ self.exclude_mirror_regstrs = []
+
+ def parse_cmdlineoptions(self):
+ """Parse argument list and set global preferences"""
+ try: optlist, self.args = getopt.getopt(sys.argv[1:], "blmv:Vs",
+ ["backup-mode", "version", "verbosity=", "exclude=",
+ "exclude-mirror=", "server", "test-server",
+ "remote-cmd=", "mirror-only", "force",
+ "change-source-perms", "list-increments",
+ "remove-older-than=", "remote-schema=",
+ "include-from-stdin", "terminal-verbosity=",
+ "exclude-device-files", "resume", "no-resume",
+ "resume-window=", "windows-time-format",
+ "checkpoint-interval="])
+ except getopt.error:
+ self.commandline_error("Error parsing commandline options")
+
+ for opt, arg in optlist:
+ if opt == "-b" or opt == "--backup-mode": self.action = "backup"
+ elif opt == "--change-source-perms":
+ Globals.set('change_source_perms', 1)
+ elif opt == "--checkpoint-interval":
+ Globals.set_integer('checkpoint_interval', arg)
+ elif opt == "--exclude": self.exclude_regstrs.append(arg)
+ elif opt == "--exclude-device-files":
+ Globals.set('exclude_device_files', 1)
+ elif opt == "--exclude-mirror":
+ self.exclude_mirror_regstrs.append(arg)
+ elif opt == "--force": self.force = 1
+ elif opt == "--include-from-stdin": Globals.include_from_stdin = 1
+ elif opt == "-l" or opt == "--list-increments":
+ self.action = "list-increments"
+ elif opt == "-m" or opt == "--mirror-only": self.action = "mirror"
+ elif opt == '--no-resume': Globals.resume = 0
+ elif opt == "--remote-cmd": self.remote_cmd = arg
+ elif opt == "--remote-schema": self.remote_schema = arg
+ elif opt == "--remove-older-than":
+ self.remove_older_than_string = arg
+ self.action = "remove-older-than"
+ elif opt == '--resume': Globals.resume = 1
+ elif opt == '--resume-window':
+ Globals.set_integer('resume_window', arg)
+ elif opt == "-s" or opt == "--server": self.action = "server"
+ elif opt == "--terminal-verbosity":
+ Log.setterm_verbosity(arg)
+ elif opt == "--test-server": self.action = "test-server"
+ elif opt == "-V" or opt == "--version":
+ print "rdiff-backup " + Globals.version
+ sys.exit(0)
+ elif opt == "-v" or opt == "--verbosity":
+ Log.setverbosity(arg)
+ elif opt == '--windows-time-format':
+ Globals.set('time_separator', "_")
+ else: Log.FatalError("Unknown option %s" % opt)
+
+ def set_action(self):
+ """Check arguments and try to set self.action"""
+ l = len(self.args)
+ if not self.action:
+ if l == 0: self.commandline_error("No arguments given")
+ elif l == 1: self.action = "restore"
+ elif l == 2:
+ if RPath(Globals.local_connection, self.args[0]).isincfile():
+ self.action = "restore"
+ else: self.action = "backup"
+ else: self.commandline_error("Too many arguments given")
+
+ if l == 0 and self.action != "server" and self.action != "test-server":
+ self.commandline_error("No arguments given")
+ if l > 0 and self.action == "server":
+ self.commandline_error("Too many arguments given")
+ if l < 2 and (self.action == "backup" or self.action == "mirror"):
+ self.commandline_error("Two arguments are required "
+ "(source, destination).")
+ if l == 2 and (self.action == "list-increments" or
+ self.action == "remove-older-than"):
+ self.commandline_error("Only use one argument, "
+ "the root of the backup directory")
+ if l > 2: self.commandline_error("Too many arguments given")
+
+ def commandline_error(self, message):
+ sys.stderr.write("Error: %s\n" % message)
+ sys.stderr.write("See the rdiff-backup manual page for instructions\n")
+ sys.exit(1)
+
+ def misc_setup(self, rps):
+ """Set default change ownership flag, umask, excludes"""
+ if ((len(rps) == 2 and rps[1].conn.os.getuid() == 0) or
+ (len(rps) < 2 and os.getuid() == 0)):
+ # Allow change_ownership if destination connection is root
+ for conn in Globals.connections:
+ conn.Globals.set('change_ownership', 1)
+ for rp in rps: rp.setdata() # Update with userinfo
+
+ os.umask(077)
+ for regex_string in self.exclude_regstrs:
+ Globals.add_regexp(regex_string, None)
+ for regex_string in self.exclude_mirror_regstrs:
+ Globals.add_regexp(regex_string, 1)
+
+ def take_action(self, rps):
+ """Do whatever self.action says"""
+ if self.action == "server":
+ PipeConnection(sys.stdin, sys.stdout).Server()
+ elif self.action == "backup": self.Backup(rps[0], rps[1])
+ elif self.action == "restore": apply(self.Restore, rps)
+ elif self.action == "mirror": self.Mirror(rps[0], rps[1])
+ elif self.action == "test-server":
+ SetConnections.TestConnections()
+ elif self.action == "list-increments":
+ self.ListIncrements(rps[0])
+ elif self.action == "remove-older-than":
+ self.RemoveOlderThan(rps[0])
+ else: raise AssertionError("Unknown action " + self.action)
+
+ def cleanup(self):
+ """Do any last minute cleaning before exiting"""
+ Log("Cleaning up", 6)
+ Log.close_logfile()
+ if not Globals.server: SetConnections.CloseConnections()
+
+ def Main(self):
+ """Start everything up!"""
+ self.parse_cmdlineoptions()
+ self.set_action()
+ rps = SetConnections.InitRPs(self.args,
+ self.remote_schema, self.remote_cmd)
+ self.misc_setup(rps)
+ self.take_action(rps)
+ self.cleanup()
+
+
+ def Mirror(self, src_rp, dest_rp):
+ """Turn dest_path into a copy of src_path"""
+ Log("Mirroring %s to %s" % (src_rp.path, dest_rp.path), 5)
+ self.mirror_check_paths(src_rp, dest_rp)
+ HighLevel.Mirror(src_rp, dest_rp, None) # No checkpointing - no rbdir
+
+ def mirror_check_paths(self, rpin, rpout):
+ """Check paths and return rpin, rpout"""
+ if not rpin.lstat():
+ Log.FatalError("Source directory %s does not exist" % rpin.path)
+ if rpout.lstat() and not self.force:
+ Log.FatalError(
+"""Destination %s exists so continuing could mess it up. Run
+rdiff-backup with the --force option if you want to mirror anyway.""" %
+ rpout.path)
+
+
+ def Backup(self, rpin, rpout):
+ """Backup, possibly incrementally, src_path to dest_path."""
+ SetConnections.BackupInitConnections(rpin.conn, rpout.conn)
+ self.backup_init_dirs(rpin, rpout)
+ Time.setcurtime()
+ RSI = Resume.ResumeCheck()
+ if self.prevtime:
+ Time.setprevtime(self.prevtime)
+ SaveState.init_filenames(1)
+ HighLevel.Mirror_and_increment(rpin, rpout, self.incdir, RSI)
+ else:
+ SaveState.init_filenames(None)
+ HighLevel.Mirror(rpin, rpout, 1, RSI)
+ self.backup_touch_curmirror(rpin, rpout)
+
+ def backup_init_dirs(self, rpin, rpout):
+ """Make sure rpin and rpout are valid, init data dir and logging"""
+ if rpout.lstat() and not rpout.isdir():
+ if not self.force:
+ Log.FatalError("Destination %s exists and is not a "
+ "directory" % rpout.path)
+ else:
+ Log("Deleting %s" % rpout.path, 3)
+ rpout.delete()
+
+ if not rpin.lstat():
+ Log.FatalError("Source directory %s does not exist" % rpin.path)
+ elif not rpin.isdir():
+ Log.FatalError("Source %s is not a directory" % rpin.path)
+
+ self.datadir = rpout.append("rdiff-backup-data")
+ SetConnections.UpdateGlobal('rbdir', self.datadir)
+ self.incdir = RPath(rpout.conn, os.path.join(self.datadir.path,
+ "increments"))
+ self.prevtime = self.backup_get_mirrortime()
+
+ if rpout.lstat() and not self.datadir.lstat() and not self.force:
+ Log.FatalError(
+"""Destination directory %s exists, but does not look like a
+rdiff-backup directory. Running rdiff-backup like this could mess up
+what is currently in it. If you want to overwrite it, run
+rdiff-backup with the --force option.""" % rpout.path)
+
+ if not rpout.lstat():
+ try: rpout.mkdir()
+ except os.error:
+ Log.FatalError("Unable to create directory %s" % rpout.path)
+ if not self.datadir.lstat(): self.datadir.mkdir()
+ Globals.add_regexp(self.datadir.path, 1)
+ Globals.add_regexp(rpin.append("rdiff-backup-data").path, None)
+ if Log.verbosity > 0:
+ Log.open_logfile(self.datadir.append("backup.log"))
+ self.backup_warn_if_infinite_regress(rpin, rpout)
+
+ def backup_warn_if_infinite_regress(self, rpin, rpout):
+ """Warn user if destination area contained in source area"""
+ if rpout.conn is rpin.conn: # it's meaningful to compare paths
+ if ((len(rpout.path) > len(rpin.path)+1 and
+ rpout.path[:len(rpin.path)] == rpin.path and
+ rpout.path[len(rpin.path)] == '/') or
+ (rpin.path == "." and rpout.path[0] != '/' and
+ rpout.path[:2] != '..')):
+ # Just a few heuristics, we don't have to get every case
+ if not DestructiveStepping.isexcluded(rpout, 1):
+ Log(
+"""Warning: The destination directory '%s' may be contained in the
+source directory '%s'. This could cause an infinite regress. You
+may need to use the --exclude option.""" % (rpout.path, rpin.path), 2)
+
+ def backup_get_mirrorrps(self):
+ """Return list of current_mirror rps"""
+ if not self.datadir.isdir(): return []
+ mirrorfiles = filter(lambda f: f.startswith("current_mirror."),
+ self.datadir.listdir())
+ mirrorrps = map(lambda x: self.datadir.append(x), mirrorfiles)
+ return filter(lambda rp: rp.isincfile(), mirrorrps)
+
+ def backup_get_mirrortime(self):
+ """Return time in seconds of previous mirror, or None if cannot"""
+ mirrorrps = self.backup_get_mirrorrps()
+ if not mirrorrps: return None
+ if len(mirrorrps) > 1:
+ Log(
+"""Warning: duplicate current_mirror files found. Perhaps something
+went wrong during your last backup? Using """ + mirrorrps[-1].path, 2)
+
+ timestr = self.datadir.append(mirrorrps[-1].path).getinctime()
+ return Time.stringtotime(timestr)
+
+ def backup_touch_curmirror(self, rpin, rpout):
+ """Make a file like current_mirror.time.snapshot to record time
+
+ Also updates rpout so mod times don't get messed up.
+
+ """
+ map(RPath.delete, self.backup_get_mirrorrps())
+ mirrorrp = self.datadir.append("current_mirror.%s.%s" %
+ (Time.curtimestr, "snapshot"))
+ Log("Touching mirror marker %s" % mirrorrp.path, 6)
+ mirrorrp.touch()
+ RPath.copy_attribs(rpin, rpout)
+
+
+ def Restore(self, src_rp, dest_rp = None):
+ """Main restoring function - take src_path to dest_path"""
+ Log("Starting Restore", 5)
+ rpin, rpout = self.restore_check_paths(src_rp, dest_rp)
+ inc_tup = self.restore_get_inctup(rpin)
+ mirror_base = self.restore_get_mirror(rpin)
+ rtime = Time.stringtotime(rpin.getinctime())
+ Log.open_logfile(self.datadir.append("restore.log"))
+ HighLevel.Restore(rtime, mirror_base, inc_tup, rpout)
+
+ def restore_check_paths(self, rpin, rpout):
+ """Check paths and return pair of corresponding rps"""
+ if not rpin.lstat():
+ Log.FatalError("Increment file %s does not exist" % src_path)
+ if not rpin.isincfile():
+ Log.FatalError("""File %s does not look like an increment file.
+
+Try restoring from an increment file (the filenames look like
+"foobar.2001-09-01T04:49:04-07:00.diff").""")
+
+ if not rpout: rpout = RPath(Globals.local_connection,
+ rpin.getincbase_str())
+ if rpout.lstat():
+ Log.FatalError("Restore target %s already exists. "
+ "Will not overwrite." % rpout.path)
+ return rpin, rpout
+
+ def restore_get_inctup(self, rpin):
+ """Return increment tuple (incrp, list of incs)"""
+ rpin_dir = rpin.dirsplit()[0]
+ if not rpin_dir: rpin_dir = "/"
+ rpin_dir_rp = RPath(rpin.conn, rpin_dir)
+ incbase = rpin.getincbase()
+ incbasename = incbase.dirsplit()[1]
+ inclist = filter(lambda rp: rp.isincfile() and
+ rp.getincbase_str() == incbasename,
+ map(rpin_dir_rp.append, rpin_dir_rp.listdir()))
+ return IndexedTuple((), (incbase, inclist))
+
+ def restore_get_mirror(self, rpin):
+ """Return mirror file and set the data dir
+
+ The idea here is to keep backing up on the path until we find
+ something named "rdiff-backup-data". Then use that as a
+ reference to calculate the oldfile. This could fail if the
+ increment file is pointed to in a funny way, using symlinks or
+ somesuch.
+
+ """
+ pathcomps = os.path.join(rpin.conn.os.getcwd(),
+ rpin.getincbase().path).split("/")
+ for i in range(1, len(pathcomps)):
+ datadirrp = RPath(rpin.conn, "/".join(pathcomps[:i+1]))
+ if pathcomps[i] == "rdiff-backup-data" and datadirrp.isdir():
+ break
+ else: Log.FatalError("Unable to find rdiff-backup-data dir")
+
+ self.datadir = datadirrp
+ Globals.add_regexp(self.datadir.path, 1)
+ rootrp = RPath(rpin.conn, "/".join(pathcomps[:i]))
+ if not rootrp.lstat():
+ Log.FatalError("Root of mirror area %s does not exist" %
+ rootrp.path)
+ else: Log("Using root mirror %s" % rootrp.path, 6)
+
+ from_datadir = pathcomps[i+1:]
+ if len(from_datadir) == 1: result = rootrp
+ elif len(from_datadir) > 1:
+ result = RPath(rootrp.conn, apply(os.path.join,
+ [rootrp.path] + from_datadir[1:]))
+ else: raise RestoreError("Problem finding mirror file")
+
+ Log("Using mirror file %s" % result.path, 6)
+ return result
+
+
+ def ListIncrements(self, rootrp):
+ """Print out a summary of the increments and their times"""
+ datadir = self.li_getdatadir(rootrp,
+ """Unable to open rdiff-backup-data dir.
+
+The argument to rdiff-backup -l or rdiff-backup --list-increments
+should be the root of the target backup directory, of which
+rdiff-backup-data is a subdirectory. So, if you ran
+
+rdiff-backup /home/foo /mnt/back/bar
+
+earlier, try:
+
+rdiff-backup -l /mnt/back/bar
+""")
+ print Manage.describe_root_incs(datadir)
+
+ def li_getdatadir(self, rootrp, errormsg):
+ """Return data dir if can find it, otherwise use errormsg"""
+ datadir = rootrp.append("rdiff-backup-data")
+ if not datadir.lstat() or not datadir.isdir():
+ Log.FatalError(errormsg)
+ return datadir
+
+
+ def RemoveOlderThan(self, rootrp):
+ """Remove all increment files older than a certain time"""
+ datadir = self.li_getdatadir(rootrp,
+ """Unable to open rdiff-backup-data dir.
+
+Try finding the increments first using --list-increments.""")
+ time = self.rot_get_earliest_time()
+ timep = Time.timetopretty(time)
+ Log("Deleting increment(s) before %s" % timep, 4)
+ incobjs = filter(lambda x: x.time < time, Manage.get_incobjs(datadir))
+ incobjs_time = ", ".join(map(IncObj.pretty_time, incobjs))
+ if not incobjs:
+ Log.FatalError("No increments older than %s found" % timep)
+ elif len(incobjs) > 1 and not self.force:
+ Log.FatalError("Found %d relevant increments, dated %s.\n"
+ "If you want to delete multiple increments in this way, "
+ "use the --force." % (len(incobjs), incobjs_time))
+
+ Log("Deleting increment%sat %s" % (len(incobjs) == 1 and " " or "s ",
+ incobjs_time), 3)
+ Manage.delete_earlier_than(datadir, time)
+
+ def rot_get_earliest_time(self):
+ """Return earliest time in seconds that will not be deleted"""
+ seconds = Time.intstringtoseconds(self.remove_older_than_string)
+ return time.time() - seconds
+
+
+
+if __name__ == "__main__":
+ Globals.Main = Main()
+ Globals.Main.Main()
diff --git a/rdiff-backup/src/manage.py b/rdiff-backup/src/manage.py
new file mode 100644
index 0000000..c0f4a85
--- /dev/null
+++ b/rdiff-backup/src/manage.py
@@ -0,0 +1,99 @@
+execfile("restore.py")
+
+#######################################################################
+#
+# manage - list, delete, and otherwise manage increments
+#
+
+class ManageException(Exception): pass
+
+class Manage:
+ def get_incobjs(datadir):
+ """Return Increments objects given the rdiff-backup data directory"""
+ return map(IncObj, Manage.find_incrps_with_base(datadir, "increments"))
+
+ def find_incrps_with_base(dir_rp, basename):
+ """Return list of incfiles with given basename in dir_rp"""
+ rps = map(dir_rp.append, dir_rp.listdir())
+ incrps = filter(RPath.isincfile, rps)
+ result = filter(lambda rp: rp.getincbase_str() == basename, incrps)
+ Log("find_incrps_with_base: found %d incs" % len(result), 6)
+ return result
+
+ def describe_root_incs(datadir):
+ """Return a string describing all the the root increments"""
+ result = []
+ currentrps = Manage.find_incrps_with_base(datadir, "current_mirror")
+ if not currentrps:
+ Log("Warning: no current mirror marker found", 1)
+ elif len(currentrps) > 1:
+ Log("Warning: multiple mirror markers found", 1)
+ for rp in currentrps:
+ result.append("Found mirror marker %s" % rp.path)
+ result.append("Indicating latest mirror taken at %s" %
+ Time.stringtopretty(rp.getinctime()))
+ result.append("---------------------------------------------"
+ "-------------")
+
+ # Sort so they are in reverse order by time
+ time_w_incobjs = map(lambda io: (-io.time, io),
+ Manage.get_incobjs(datadir))
+ time_w_incobjs.sort()
+ incobjs = map(lambda x: x[1], time_w_incobjs)
+ result.append("Found %d increments:" % len(incobjs))
+ result.append("\n------------------------------------------\n".join(
+ map(IncObj.full_description, incobjs)))
+ return "\n".join(result)
+
+ def delete_earlier_than(baserp, time):
+ """Deleting increments older than time in directory baserp
+
+ time is in seconds. It will then delete any empty directories
+ in the tree. To process the entire backup area, the
+ rdiff-backup-data directory should be the root of the tree.
+
+ """
+ def yield_files(rp):
+ yield rp
+ if rp.isdir():
+ for filename in rp.listdir():
+ for sub_rp in yield_files(rp.append(filename)):
+ yield sub_rp
+
+ for rp in yield_files(baserp):
+ if ((rp.isincfile() and
+ Time.stringtotime(rp.getinctime()) < time) or
+ (rp.isdir() and not rp.listdir())):
+ Log("Deleting increment file %s" % rp.path, 5)
+ rp.delete()
+
+MakeStatic(Manage)
+
+
+class IncObj:
+ """Increment object - represent a completed increment"""
+ def __init__(self, incrp):
+ """IncObj initializer
+
+ incrp is an RPath of a path like increments.TIMESTR.dir
+ standing for the root of the increment.
+
+ """
+ if not incrp.isincfile():
+ raise ManageException("%s is not an inc file" % incrp.path)
+ self.incrp = incrp
+ self.time = Time.stringtotime(incrp.getinctime())
+
+ def getbaserp(self):
+ """Return rp of the incrp without extensions"""
+ return self.incrp.getincbase()
+
+ def pretty_time(self):
+ """Return a formatted version of inc's time"""
+ return Time.timetopretty(self.time)
+
+ def full_description(self):
+ """Return string describing increment"""
+ s = ["Increment file %s" % self.incrp.path,
+ "Date: %s" % self.pretty_time()]
+ return "\n".join(s)
diff --git a/rdiff-backup/src/rdiff.py b/rdiff-backup/src/rdiff.py
new file mode 100644
index 0000000..c27d4f2
--- /dev/null
+++ b/rdiff-backup/src/rdiff.py
@@ -0,0 +1,175 @@
+execfile("rlist.py")
+import os, popen2
+
+#######################################################################
+#
+# rdiff - Invoke rdiff utility to make signatures, deltas, or patch
+#
+
+class RdiffException(Exception): pass
+
+class Rdiff:
+ """Contains static methods for rdiff operations
+
+ All these operations should be done in a relatively safe manner
+ using RobustAction and the like.
+
+ """
+ def get_signature(rp):
+ """Take signature of rpin file and return in file object"""
+ Log("Getting signature of %s" % rp.path, 7)
+ return rp.conn.RdiffPopen(['rdiff', 'signature', rp.path])
+
+ def get_delta_sigfileobj(sig_fileobj, rp_new):
+ """Like get_delta but signature is in a file object"""
+ sig_tf = TempFileManager.new(rp_new, None)
+ sig_tf.write_from_fileobj(sig_fileobj)
+ rdiff_popen_obj = Rdiff.get_delta(sig_tf, rp_new)
+ rdiff_popen_obj.set_thunk(sig_tf.delete)
+ return rdiff_popen_obj
+
+ def get_delta(rp_signature, rp_new):
+ """Take signature rp and new rp, return delta file object"""
+ assert rp_signature.conn is rp_new.conn
+ Log("Getting delta of %s with signature %s" %
+ (rp_new.path, rp_signature.path), 7)
+ return rp_new.conn.RdiffPopen(['rdiff', 'delta',
+ rp_signature.path, rp_new.path])
+
+ def write_delta_action(basis, new, delta):
+ """Return action writing delta which brings basis to new"""
+ sig_tf = TempFileManager.new(new, None)
+ delta_tf = TempFileManager.new(delta)
+ def init():
+ Log("Writing delta %s from %s -> %s" %
+ (basis.path, new.path, delta.path), 7)
+ sig_tf.write_from_fileobj(Rdiff.get_signature(basis))
+ delta_tf.write_from_fileobj(Rdiff.get_delta(sig_tf, new))
+ sig_tf.delete()
+ return Robust.make_tf_robustaction(init, (sig_tf, delta_tf),
+ (None, delta))
+
+ def write_delta(basis, new, delta):
+ """Write rdiff delta which brings basis to new"""
+ Rdiff.write_delta_action(basis, new, delta).execute()
+
+ def patch_action(rp_basis, rp_delta, rp_out = None, out_tf = None):
+ """Return RobustAction which patches rp_basis with rp_delta
+
+ If rp_out is None, put output in rp_basis. Will use TempFile
+ out_tf it is specified.
+
+ """
+ if not rp_out: rp_out = rp_basis
+ else: assert rp_out.conn is rp_basis.conn
+ if not (isinstance(rp_delta, RPath) and isinstance(rp_basis, RPath)
+ and rp_basis.conn is rp_delta.conn):
+ return Rdiff.patch_fileobj_action(rp_basis, rp_delta.open('rb'),
+ rp_out, out_tf)
+
+ if out_tf is None: out_tf = TempFileManager.new(rp_out)
+ def init():
+ Log("Patching %s using %s to %s via %s" %
+ (rp_basis.path, rp_delta.path, rp_out.path, out_tf.path), 7)
+ cmdlist = ["rdiff", "patch", rp_basis.path,
+ rp_delta.path, out_tf.path]
+ return_val = rp_basis.conn.os.spawnvp(os.P_WAIT, 'rdiff', cmdlist)
+ out_tf.setdata()
+ if return_val != 0 or not out_tf.lstat():
+ RdiffException("Error running %s" % cmdlist)
+ return Robust.make_tf_robustaction(init, (out_tf,), (rp_out,))
+
+ def patch_fileobj_action(rp_basis, delta_fileobj,
+ rp_out = None, out_tf = None):
+ """Like patch_action but diff is given in fileobj form
+
+ Nest a writing of a tempfile with the actual patching to
+ create a new action. We have to nest so that the tempfile
+ will be around until the patching finishes.
+
+ """
+ if not rp_out: rp_out = rp_basis
+ delta_tf = TempFileManager.new(rp_out, None)
+ def init(): delta_tf.write_from_fileobj(delta_fileobj)
+ return Robust.chain_nested([RobustAction(init, delta_tf.delete,
+ lambda exp: delta_tf.delete),
+ Rdiff.patch_action(rp_basis, delta_tf,
+ rp_out, out_tf)])
+
+ def patch_with_attribs_action(rp_basis, rp_delta, rp_out = None):
+ """Like patch_action, but also transfers attributs from rp_delta"""
+ if not rp_out: rp_out = rp_basis
+ tf = TempFileManager.new(rp_out)
+ return Robust.chain_nested(
+ [Rdiff.patch_action(rp_basis, rp_delta, rp_out, tf),
+ Robust.copy_attribs_action(rp_delta, tf)])
+
+ def copy_action(rpin, rpout):
+ """Use rdiff to copy rpin to rpout, conserving bandwidth"""
+ if not rpin.isreg() or not rpout.isreg() or rpin.conn is rpout.conn:
+ # rdiff not applicable, fallback to regular copying
+ return Robust.copy_action(rpin, rpout)
+
+ Log("Rdiff copying %s to %s" % (rpin.path, rpout.path), 6)
+ delta_tf = TempFileManager.new(rpout, None)
+ return Robust.chain(Rdiff.write_delta_action(rpout, rpin, delta_tf),
+ Rdiff.patch_action(rpout, delta_tf),
+ RobustAction(lambda: None, delta_tf.delete,
+ lambda exp: delta_tf.delete))
+
+MakeStatic(Rdiff)
+
+
+class RdiffPopen:
+ """Spawn process and treat stdout as file object
+
+ Instead of using popen, which evaluates arguments with the shell
+ and thus may lead to security holes (thanks to Jamie Heilman for
+ this point), use the popen2 class and discard stdin.
+
+ When closed, this object checks to make sure the process exited
+ cleanly, and executes closing_thunk.
+
+ """
+ def __init__(self, cmdlist, closing_thunk = None):
+ """RdiffFilehook initializer
+
+ fileobj is the file we are emulating
+ thunk is called with no parameters right after the file is closed
+
+ """
+ assert type(cmdlist) is types.ListType
+ self.p3obj = popen2.Popen3(cmdlist)
+ self.fileobj = self.p3obj.fromchild
+ self.closing_thunk = closing_thunk
+ self.cmdlist = cmdlist
+
+ def set_thunk(self, closing_thunk):
+ """Set closing_thunk if not already"""
+ assert not self.closing_thunk
+ self.closing_thunk = closing_thunk
+
+ def read(self, length = -1): return self.fileobj.read(length)
+
+ def close(self):
+ closeval = self.fileobj.close()
+ if self.closing_thunk: self.closing_thunk()
+ exitval = self.p3obj.poll()
+ if exitval == 0: return closeval
+ elif exitval == 256:
+ Log("Failure probably because %s couldn't be found in PATH."
+ % self.cmdlist[0], 2)
+ assert 0, "rdiff not found"
+ elif exitval == -1:
+ # There may a race condition where a process closes
+ # but doesn't provide its exitval fast enough.
+ Log("Waiting for process to close", 8)
+ time.sleep(0.2)
+ exitval = self.p3obj.poll()
+ if exitval == 0: return closeval
+ raise RdiffException("%s exited with non-zero value %d" %
+ (self.cmdlist, exitval))
+
+
+
+
diff --git a/rdiff-backup/src/restore.py b/rdiff-backup/src/restore.py
new file mode 100644
index 0000000..1f7d24e
--- /dev/null
+++ b/rdiff-backup/src/restore.py
@@ -0,0 +1,158 @@
+from __future__ import generators
+execfile("increment.py")
+import tempfile
+
+#######################################################################
+#
+# restore - Read increment files and restore to original
+#
+
+class RestoreError(Exception): pass
+
+class Restore:
+ def RestoreFile(rest_time, rpbase, inclist, rptarget):
+ """Non-recursive restore function
+
+ rest_time is the time in seconds to restore to,
+ rpbase is the base name of the file being restored,
+ inclist is a list of rpaths containing all the relevant increments,
+ and rptarget is the rpath that will be written with the restored file.
+
+ """
+ inclist = Restore.sortincseq(rest_time, inclist)
+ if not inclist and not (rpbase and rpbase.lstat()):
+ return # no increments were applicable
+ Log("Restoring %s with increments %s to %s" %
+ (rpbase and rpbase.path,
+ Restore.inclist2str(inclist), rptarget.path), 5)
+ if not inclist or inclist[0].getinctype() == "diff":
+ assert rpbase and rpbase.lstat(), \
+ "No base to go with incs %s" % Restore.inclist2str(inclist)
+ RPath.copy_with_attribs(rpbase, rptarget)
+ for inc in inclist: Restore.applyinc(inc, rptarget)
+
+ def inclist2str(inclist):
+ """Return string version of inclist for logging"""
+ return ",".join(map(lambda x: x.path, inclist))
+
+ def sortincseq(rest_time, inclist):
+ """Sort the inc sequence, and throw away irrelevant increments"""
+ incpairs = map(lambda rp: (Time.stringtotime(rp.getinctime()), rp),
+ inclist)
+ # Only consider increments at or after the time being restored
+ incpairs = filter(lambda pair: pair[0] >= rest_time, incpairs)
+
+ # Now throw away older unnecessary increments
+ incpairs.sort()
+ i = 0
+ while(i < len(incpairs)):
+ # Only diff type increments require later versions
+ if incpairs[i][1].getinctype() != "diff": break
+ i = i+1
+ incpairs = incpairs[:i+1]
+
+ # Return increments in reversed order
+ incpairs.reverse()
+ return map(lambda pair: pair[1], incpairs)
+
+ def applyinc(inc, target):
+ """Apply increment rp inc to targetrp target"""
+ Log("Applying increment %s to %s" % (inc.path, target.path), 6)
+ inctype = inc.getinctype()
+ if inctype == "diff":
+ if not target.lstat():
+ raise RestoreError("Bad increment sequence at " + inc.path)
+ Rdiff.patch_action(target, inc).execute()
+ elif inctype == "dir":
+ if not target.isdir():
+ if target.lstat():
+ raise RestoreError("File %s already exists" % target.path)
+ target.mkdir()
+ elif inctype == "missing": return
+ elif inctype == "snapshot": RPath.copy(inc, target)
+ else: raise RestoreError("Unknown inctype %s" % inctype)
+ RPath.copy_attribs(inc, target)
+
+ def RestoreRecursive(rest_time, mirror_base, baseinc_tup, target_base):
+ """Recursive restore function.
+
+ rest_time is the time in seconds to restore to;
+ mirror_base is an rpath of the mirror directory corresponding
+ to the one to be restored;
+ baseinc_tup is the inc tuple (incdir, list of incs) to be
+ restored;
+ and target_base in the dsrp of the target directory.
+
+ """
+ assert isinstance(target_base, DSRPath)
+ collated = RORPIter.CollateIterators(
+ DestructiveStepping.Iterate_from(mirror_base, None),
+ Restore.yield_inc_tuples(baseinc_tup))
+ mirror_finalizer = DestructiveStepping.Finalizer()
+ target_finalizer = DestructiveStepping.Finalizer()
+
+ for mirror, inc_tup in collated:
+ if not inc_tup:
+ inclist = []
+ target = target_base.new_index(mirror.index)
+ else:
+ inclist = inc_tup[1]
+ target = target_base.new_index(inc_tup.index)
+ DestructiveStepping.initialize(target, None)
+ Restore.RestoreFile(rest_time, mirror, inclist, target)
+ target_finalizer(target)
+ if mirror: mirror_finalizer(mirror)
+ target_finalizer.getresult()
+ mirror_finalizer.getresult()
+
+ def yield_inc_tuples(inc_tuple):
+ """Iterate increment tuples starting with inc_tuple
+
+ An increment tuple is an IndexedTuple (pair). The first will
+ be the rpath of a directory, and the second is a list of all
+ the increments associated with that directory. If there are
+ increments that do not correspond to a directory, the first
+ element will be None. All the rpaths involved correspond to
+ files in the increment directory.
+
+ """
+ oldindex, rpath = inc_tuple.index, inc_tuple[0]
+ yield inc_tuple
+ if not rpath or not rpath.isdir(): return
+
+ inc_list_dict = {} # Index tuple lists by index
+ dirlist = rpath.listdir()
+
+ def affirm_dict_indexed(index):
+ """Make sure the inc_list_dict has given index"""
+ if not inc_list_dict.has_key(index):
+ inc_list_dict[index] = [None, []]
+
+ def add_to_dict(filename):
+ """Add filename to the inc tuple dictionary"""
+ rp = rpath.append(filename)
+ if rp.isincfile():
+ basename = rp.getincbase_str()
+ affirm_dict_indexed(basename)
+ inc_list_dict[basename][1].append(rp)
+ elif rp.isdir():
+ affirm_dict_indexed(filename)
+ inc_list_dict[filename][0] = rp
+
+ def list2tuple(index):
+ """Return inc_tuple version of dictionary entry by index"""
+ inclist = inc_list_dict[index]
+ if not inclist[1]: return None # no increments, so ignore
+ return IndexedTuple(oldindex + (index,), inclist)
+
+ for filename in dirlist: add_to_dict(filename)
+ keys = inc_list_dict.keys()
+ keys.sort()
+ for index in keys:
+ new_inc_tuple = list2tuple(index)
+ if not new_inc_tuple: continue
+ elif new_inc_tuple[0]: # corresponds to directory
+ for i in Restore.yield_inc_tuples(new_inc_tuple): yield i
+ else: yield new_inc_tuple
+
+MakeStatic(Restore)
diff --git a/rdiff-backup/src/rlist.py b/rdiff-backup/src/rlist.py
new file mode 100644
index 0000000..c0f8ee9
--- /dev/null
+++ b/rdiff-backup/src/rlist.py
@@ -0,0 +1,240 @@
+from __future__ import generators
+import marshal, sha, types
+execfile("iterfile.py")
+
+#######################################################################
+#
+# rlist - Define the CachingIter, and sig/diff/patch ops on iterators
+#
+
+class CachingIter:
+ """Cache parts of an iter using a list
+
+ Turn an iter into something that you can prepend elements into,
+ and also read from without apparently changing the state.
+
+ """
+ def __init__(self, iter_or_list):
+ if type(iter_or_list) is types.ListType:
+ self.iter = iter(iter_or_list)
+ else: self.iter = iter_or_list
+ self.next = self.iter.next
+ self.head = []
+
+ def __iter__(self): return self
+
+ def _next(self):
+ """Take elements from the head list
+
+ When there are elements waiting before the main iterator, this
+ is the next function. If not, iter.next returns to being next.
+
+ """
+ head = self.head
+ a = head[0]
+ del head[0]
+ if not head: self.next = self.iter.next
+ return a
+
+ def nextrange(self, m):
+ """Return next m elements in list"""
+ l = head[:m]
+ del head[:m]
+ for i in xrange(m - len(l)): l.append(self.iter.next())
+ return l
+
+ def peek(self):
+ """Return next element without removing it from iterator"""
+ n = self.next()
+ self.push(n)
+ return n
+
+ def push(self, elem):
+ """Insert an element into the iterator at the beginning"""
+ if not self.head: self.next = self._next
+ self.head.insert(0, elem)
+
+ def pushrange(self, elem_list):
+ """Insert list of multiple elements at the beginning"""
+ if not self.head: self.next = self._next
+ self.head[:0] = elem_list
+
+ def cache(self, m):
+ """Move next m elements from iter to internal list
+
+ If m is None, append the entire rest of the iterator.
+
+ """
+ h, it = self.head, self.iter
+ if m is None:
+ for i in it: h.append(i)
+ else:
+ for i in xrange(m): h.append(it.next())
+
+ def __getitem__(self, key):
+ """Support a[i:j] style notation. Non destructive"""
+ if type(key) is types.SliceType:
+ if key.stop > len(self.head): self.cache(key.stop - len(self.head))
+ return self.head[key.start, key.stop]
+ else:
+ if key >= len(self.head): self.cache(key + 1 - len(self.head))
+ return self.head[key]
+
+
+
+class RListDelta:
+ """Note a difference from one iterator (A) to another (B)
+
+ The min, max pairs are indicies which stand for the half-open
+ interval (min, max], and elemlist is a list of all the elements in
+ A which fall within this interval.
+
+ These are produced by the function RList.Deltas(...)
+
+ """
+ def __init__(self, (min, max), elemlist):
+ self.min, self.max = min, max
+ self.elemlist = elemlist
+
+
+
+class RList:
+ """Tools for signatures, diffing, and patching an iterator
+
+ This class requires that the iterators involved are yielding
+ objects that have .index and .data attributes. Two objects with
+ the same .data attribute are supposed to be equivalent. The
+ iterator must also yield the objects in increasing order with
+ respect to the .index attribute.
+
+ """
+ blocksize = 100
+
+ def Signatures(iter):
+ """Return iterator of signatures from stream of pairs
+
+ Each signature is an ordered pair (last index sig applies to,
+ SHA digest of data)
+
+ """
+ i, s = 0, sha.new()
+ for iter_elem in iter:
+ s.update(marshal.dumps(iter_elem.data))
+ i = i+1
+ if i == RList.blocksize:
+ yield (iter_elem.index, s.digest())
+ i, s = 0, sha.new()
+ if i != 0: yield (iter_elem.index, s.digest())
+
+ def sig_one_block(iter_or_list):
+ """Return the digest portion of a signature on given list"""
+ s = sha.new()
+ for iter_elem in iter_or_list: s.update(marshal.dumps(iter_elem.data))
+ return s.digest()
+
+ def Deltas(remote_sigs, iter):
+ """Return iterator of Delta objects that bring iter to remote"""
+ def get_before(index, iter):
+ """Return elements in iter whose index is before or equal index
+ iter needs to be pushable
+ """
+ l = []
+ while 1:
+ try: iter_elem = iter.next()
+ except StopIteration: return l
+ if iter_elem.index > index: break
+ l.append(iter_elem)
+ iter.push(iter_elem)
+ return l
+
+ if not isinstance(iter, CachingIter): iter = CachingIter(iter)
+ oldindex = None
+ for (rs_index, rs_digest) in remote_sigs:
+ l = get_before(rs_index, iter)
+ if rs_digest != RList.sig_one_block(l):
+ yield RListDelta((oldindex, rs_index), l)
+ oldindex = rs_index
+
+ def patch_once(basis, delta):
+ """Apply one delta to basis to return original iterator
+
+ This returns original iterator up to and including the max range
+ of delta, then stop. basis should be pushable.
+
+ """
+ # Return elements of basis until start of delta range
+ for basis_elem in basis:
+ if basis_elem.index > delta.min:
+ basis.push(basis_elem)
+ break
+ yield basis_elem
+
+ # Yield elements of delta...
+ for elem in delta.elemlist: yield elem
+
+ # Finally, discard basis until end of delta range
+ for basis_elem in basis:
+ if basis_elem.index > delta.max:
+ basis.push(basis_elem)
+ break
+
+ def Patch(basis, deltas):
+ """Apply a delta stream to basis iterator, yielding original"""
+ if not isinstance(basis, CachingIter): basis = CachingIter(basis)
+ for d in deltas:
+ for elem in RList.patch_once(basis, d): yield elem
+ for elem in basis: yield elem
+
+ def get_difference_once(basis, delta):
+ """From one delta, find differences from basis
+
+ Will return pairs (basis_elem, new_elem) where basis_elem is
+ the element from the basis iterator and new_elem is the
+ element from the other iterator. If either is missing None
+ will take its place. If both are present iff two have the
+ same index.
+
+ """
+ # Discard any elements of basis before delta starts
+ for basis_elem in basis:
+ if basis_elem.index > delta.min:
+ basis.push(basis_elem)
+ break
+
+ # In range compare each one by one
+ di, boverflow, doverflow = 0, None, None
+ while 1:
+ # Set indicies and data, or mark if at end of range already
+ try:
+ basis_elem = basis.next()
+ if basis_elem.index > delta.max:
+ basis.push(basis_elem)
+ boverflow = 1
+ except StopIteration: boverflow = 1
+ if di >= len(delta.elemlist): doverflow = 1
+ else: delta_elem = delta.elemlist[di]
+
+ if boverflow and doverflow: break
+ elif boverflow:
+ yield (None, delta_elem)
+ di = di+1
+ elif doverflow: yield (basis_elem, None)
+
+ # Now can assume that everything is in range
+ elif basis_elem.index > delta_elem.index:
+ yield (None, delta_elem)
+ basis.push(basis_elem)
+ di = di+1
+ elif basis_elem.index == delta_elem.index:
+ if basis_elem.data != delta_elem.data:
+ yield (basis_elem, delta_elem)
+ di = di+1
+ else: yield (basis_elem, None)
+
+ def Dissimilar(basis, deltas):
+ """Return iter of differences from delta iter and basis iter"""
+ if not isinstance(basis, CachingIter): basis = CachingIter(basis)
+ for d in deltas:
+ for triple in RList.get_difference_once(basis, d): yield triple
+
+MakeStatic(RList)
diff --git a/rdiff-backup/src/robust.py b/rdiff-backup/src/robust.py
new file mode 100644
index 0000000..c23ff6a
--- /dev/null
+++ b/rdiff-backup/src/robust.py
@@ -0,0 +1,537 @@
+import tempfile
+execfile("rpath.py")
+
+#######################################################################
+#
+# robust - code which prevents mirror from being corrupted, error-recovery
+#
+# Ideally no matter an instance of rdiff-backup gets aborted, no
+# information should get lost. The target directory should be left in
+# a coherent state, and later instances of rdiff-backup should clean
+# things up so there is no sign that anything ever got aborted or
+# failed.
+#
+# Thus, files should be updated in an atomic way as possible. Each
+# file should be updated (and the corresponding diff files written) or
+# not, and it should be clear which happened. In general, I don't
+# think this is possible, since the creation of the diff files and the
+# changing of updated files cannot be guarateed to happen together.
+# It is possible, I think, to record various information to files
+# which would allow a later process to figure out what the last
+# operation was, but this would add several file operations to the
+# processing of each file, and I don't think, would be a good
+# tradeoff.
+#
+# The compromise reached here is that diff files should be created
+# just before the mirror files are updated, and each file update
+# should be done with a rename operation on a file in the same
+# directory. Furthermore, every once in a while, rdiff-backup will
+# record which file it just finished processing. If any fatal errors
+# are caught, it will also record the last processed file. Future
+# instances may not know exactly when the previous instance was
+# aborted, but they will be able to narrow down the possibilities.
+
+class RobustAction:
+ """Represents a file operation to be accomplished later"""
+ def __init__(self, init_thunk, final_thunk, error_thunk):
+ """RobustAction initializer
+
+ All the thunks are functions whose return value will be
+ ignored. init_thunk should not make any irreversible changes
+ but prepare for the writing of the important data. final_thunk
+ should be as short as possible and do the real work.
+ error_thunk is run if there is an error in init_thunk or
+ final_thunk. Errors in init_thunk should be corrected by
+ error_thunk as if nothing had been run in the first place.
+ The functions take no arguments except for error_thunk, which
+ receives the exception as its only argument.
+
+ """
+ self.init_thunk = init_thunk
+ self.final_thunk = final_thunk
+ self.error_thunk = error_thunk
+
+ def execute(self):
+ """Actually run the operation"""
+ try:
+ self.init_thunk()
+ self.final_thunk()
+ except Exception, exp: # Catch all errors
+ Log.exception()
+ self.error_thunk(exp)
+ raise exp
+
+
+class Robust:
+ """Contains various file operations made safer using tempfiles"""
+ null_action = RobustAction(lambda: None, lambda: None, lambda e: None)
+ def chain(robust_action_list):
+ """Return chain tying together a number of robust actions
+
+ The whole chain will be aborted if some error occurs in
+ initialization stage of any of the component actions.
+
+ """
+ ras_with_completed_inits = []
+ def init():
+ for ra in robust_action_list:
+ ras_with_completed_inits.append(ra)
+ ra.init_thunk()
+ def final():
+ for ra in robust_action_list: ra.final_thunk()
+ def error(exp):
+ for ra in ras_with_completed_inits: ra.error_thunk(exp)
+ return RobustAction(init, final, error)
+
+ def chain_nested(robust_action_list):
+ """Like chain but final actions performed in reverse order"""
+ ras_with_completed_inits = []
+ def init():
+ for ra in robust_action_list:
+ ras_with_completed_inits.append(ra)
+ ra.init_thunk()
+ def final():
+ ralist_copy = robust_action_list[:]
+ ralist_copy.reverse()
+ for ra in ralist_copy: ra.final_thunk()
+ def error(exp):
+ for ra in ras_with_completed_inits: ra.error_thunk(exp)
+ return RobustAction(init, final, error)
+
+ def make_tf_robustaction(init_thunk, tempfiles, final_renames = None):
+ """Shortcut RobustAction creator when only tempfiles involved
+
+ Often the robust action will just consist of some initial
+ stage, renaming tempfiles in the final stage, and deleting
+ them if there is an error. This function makes it easier to
+ create RobustActions of that type.
+
+ """
+ assert type(tempfiles) is types.TupleType, tempfiles
+ if final_renames is None: final = lambda: None
+ else:
+ assert len(tempfiles) == len(final_renames)
+ def final(): # rename tempfiles to final positions
+ for i in range(len(tempfiles)):
+ final_name = final_renames[i]
+ if final_name:
+ if final_name.isdir(): # Cannot rename over directory
+ final_name.delete()
+ tempfiles[i].rename(final_name)
+ def error(exp):
+ for tf in tempfiles: tf.delete()
+ return RobustAction(init_thunk, final, error)
+
+ def copy_action(rorpin, rpout):
+ """Return robust action copying rorpin to rpout
+
+ The source can be a rorp or an rpath. Does not recurse. If
+ directories copied, then just exit (output directory not
+ overwritten).
+
+ """
+ tfl = [None] # Need mutable object that init and final can access
+ def init():
+ if not (rorpin.isdir() and rpout.isdir()): # already a dir
+ tfl[0] = TempFileManager.new(rpout)
+ if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb"))
+ else: RPath.copy(rorpin, tf)
+ def final():
+ if tfl[0] and tfl[0].lstat():
+ if rpout.isdir(): rpout.delete()
+ tfl[0].rename(rpout)
+ return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete())
+
+ def copy_with_attribs_action(rorpin, rpout):
+ """Like copy_action but also copy attributes"""
+ tfl = [None] # Need mutable object that init and final can access
+ def init():
+ if not (rorpin.isdir() and rpout.isdir()): # already a dir
+ tfl[0] = TempFileManager.new(rpout)
+ if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb"))
+ else: RPath.copy(rorpin, tfl[0])
+ if tfl[0].lstat(): # Some files, like sockets, won't be created
+ RPathStatic.copy_attribs(rorpin, tfl[0])
+ def final():
+ if rorpin.isdir() and rpout.isdir():
+ RPath.copy_attribs(rorpin, rpout)
+ elif tfl[0] and tfl[0].lstat():
+ if rpout.isdir(): rpout.delete()
+ tfl[0].rename(rpout)
+ return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete())
+
+ def copy_attribs_action(rorpin, rpout):
+ """Return action which just copies attributes
+
+ Copying attributes is already pretty atomic, so just run
+ normal sequence.
+
+ """
+ def final(): RPath.copy_attribs(rorpin, rpout)
+ return RobustAction(lambda: None, final, lambda e: None)
+
+ def symlink_action(rpath, linktext):
+ """Return symlink action by moving one file over another"""
+ tf = TempFileManager.new(rpath)
+ def init(): tf.symlink(linktext)
+ return Robust.make_tf_robustaction(init, (tf,), (rpath,))
+
+ def destructive_write_action(rp, s):
+ """Return action writing string s to rpath rp in robust way
+
+ This will overwrite any data currently in rp.
+
+ """
+ tf = TempFileManager.new(rp)
+ def init():
+ fp = tf.open("wb")
+ fp.write(s)
+ assert not fp.close()
+ tf.setdata()
+ return Robust.make_tf_robustaction(init, (tf,), (rp,))
+
+MakeStatic(Robust)
+
+
+class TempFileManager:
+ """Manage temp files"""
+
+ # This is a connection-specific list of temp files, to be cleaned
+ # up before rdiff-backup exits.
+ _tempfiles = []
+
+ # To make collisions less likely, this gets put in the file name
+ # and incremented whenever a new file is requested.
+ _tfindex = 0
+
+ def new(cls, rp_base, same_dir = 1):
+ """Return new tempfile that isn't in use.
+
+ If same_dir, tempfile will be in same directory as rp_base.
+ Otherwise, use tempfile module to get filename.
+
+ """
+ conn = rp_base.conn
+ if conn is not Globals.local_connection:
+ return conn.TempFileManager.new(rp_base, same_dir)
+
+ def find_unused(conn, dir):
+ """Find an unused tempfile with connection conn in directory dir"""
+ while 1:
+ if cls._tfindex > 100000000:
+ Log("Resetting index", 2)
+ cls._tfindex = 0
+ tf = TempFile(conn, os.path.join(dir,
+ "rdiff-backup.tmp.%d" % cls._tfindex))
+ cls._tfindex = cls._tfindex+1
+ if not tf.lstat(): return tf
+
+ if same_dir: tf = find_unused(conn, rp_base.dirsplit()[0])
+ else: tf = TempFile(conn, tempfile.mktemp())
+ cls._tempfiles.append(tf)
+ return tf
+
+ def remove_listing(cls, tempfile):
+ """Remove listing of tempfile"""
+ if Globals.local_connection is not tempfile.conn:
+ tempfile.conn.TempFileManager.remove_listing(tempfile)
+ elif tempfile in cls._tempfiles: cls._tempfiles.remove(tempfile)
+
+ def delete_all(cls):
+ """Delete all remaining tempfiles"""
+ for tf in cls._tempfiles[:]: tf.delete()
+
+MakeClass(TempFileManager)
+
+
+class TempFile(RPath):
+ """Like an RPath, but keep track of which ones are still here"""
+ def rename(self, rp_dest):
+ """Rename temp file to permanent location, possibly overwriting"""
+ if self.isdir() and not rp_dest.isdir():
+ # Cannot move a directory directly over another file
+ rp_dest.delete()
+ if (isinstance(rp_dest, DSRPath) and rp_dest.perms_delayed
+ and not self.hasfullperms()):
+ # If we are moving to a delayed perm directory, delay
+ # permission change on destination.
+ rp_dest.chmod(self.getperms())
+ self.chmod(0700)
+ RPathStatic.rename(self, rp_dest)
+ TempFileManager.remove_listing(self)
+
+ def delete(self):
+ RPath.delete(self)
+ TempFileManager.remove_listing(self)
+
+
+class SaveState:
+ """Save state in the middle of backups for resuming later"""
+ _last_file_sym = None # RPath of sym pointing to last file processed
+ _last_file_definitive_rp = None # Touch this if last file is really last
+ _last_checkpoint_time = 0 # time in seconds of last checkpoint
+ _checkpoint_rp = None # RPath of checkpoint data pickle
+
+ def init_filenames(cls, incrementing):
+ """Set rpaths of markers. Assume rbdir already set.
+
+ If incrementing, then indicate increment operation, otherwise
+ indicate mirror.
+
+ """
+ if not Globals.isbackup_writer:
+ return Globals.backup_writer.SaveState.init_filenames(incrementing)
+
+ assert Globals.local_connection is Globals.rbdir.conn, \
+ Globals.rbdir.conn
+ if incrementing: cls._last_file_sym = Globals.rbdir.append(
+ "last-file-incremented.%s.snapshot" % Time.curtimestr)
+ else: cls._last_file_sym = Globals.rbdir.append(
+ "last-file-mirrored.%s.snapshot" % Time.curtimestr)
+ cls._checkpoint_rp = Globals.rbdir.append(
+ "checkpoint-data.%s.snapshot" % Time.curtimestr)
+ cls._last_file_definitive_rp = Globals.rbdir.append(
+ "last-file-definitive.%s.snapshot" % Time.curtimestr)
+
+ def touch_last_file(cls):
+ """Touch last file marker, indicating backup has begun"""
+ cls._last_file_sym.touch()
+
+ def touch_last_file_definitive(cls):
+ """Create last-file-definitive marker
+
+ When a backup gets aborted, there may be time to indicate the
+ last file successfully processed, and this should be touched.
+ Sometimes when the abort is hard, there may be a last file
+ indicated, but further files since then have been processed,
+ in which case this shouldn't be touched.
+
+ """
+ cls._last_file_definitive_rp.touch()
+
+ def record_last_file_action(cls, last_file_rorp):
+ """Action recording last file to be processed as symlink in rbdir
+
+ last_file_rorp is None means that no file is known to have
+ been processed.
+
+ """
+ if last_file_rorp:
+ symtext = apply(os.path.join,
+ ('increments',) + last_file_rorp.index)
+ return Robust.symlink_action(cls._last_file_sym, symtext)
+ else: return RobustAction(lambda: None, cls.touch_last_file,
+ lambda exp: None)
+
+ def checkpoint_inc_backup(cls, ITR, finalizer, last_file_rorp,
+ override = None):
+ """Save states of tree reducer and finalizer during inc backup
+
+ If override is true, checkpoint even if one isn't due.
+
+ """
+ if not override and not cls.checkpoint_needed(): return
+ assert cls._checkpoint_rp, "_checkpoint_rp not set yet"
+
+ cls._last_checkpoint_time = time.time()
+ Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
+ state_string = cPickle.dumps((ITR.getstate(), finalizer.getstate()))
+ Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp,
+ state_string),
+ cls.record_last_file_action(last_file_rorp)]).execute()
+
+ def checkpoint_mirror(cls, finalizer, last_file_rorp, override = None):
+ """For a mirror, only finalizer and last_file should be saved"""
+ if not override and not cls.checkpoint_needed(): return
+ if not cls._checkpoint_rp:
+ Log("Warning, _checkpoint_rp not set yet", 2)
+ return
+
+ cls._last_checkpoint_time = time.time()
+ Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
+ state_string = cPickle.dumps(finalizer.getstate())
+ Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp,
+ state_string),
+ cls.record_last_file_action(last_file_rorp)]).execute()
+
+ def checkpoint_needed(cls):
+ """Returns true if another checkpoint is called for"""
+ return (time.time() > cls._last_checkpoint_time +
+ Globals.checkpoint_interval)
+
+ def checkpoint_remove(cls):
+ """Remove all checkpointing data after successful operation"""
+ for rp in Resume.get_relevant_rps(): rp.delete()
+
+MakeClass(SaveState)
+
+
+class Resume:
+ """Check for old aborted backups and resume if necessary"""
+ _session_info_list = None # List of ResumeSessionInfo's, sorted by time
+ def FindTime(cls, index, later_than = 0):
+ """For a given index, find the appropriate time to use for inc
+
+ If it is clear which time to use (because it is determined by
+ definitive records, or there are no aborted backup, etc.) then
+ just return the appropriate time. Otherwise, if an aborted
+ backup was last checkpointed before the index, assume that it
+ didn't get there, and go for the older time. If an inc file
+ is already present, the function will be rerun with later time
+ specified.
+
+ """
+ if Time.prevtime > later_than: return Time.prevtime # usual case
+
+ for si in cls.get_sis_covering_index(index):
+ if si.time > later_than: return si.time
+ raise SkipFileException("Index %s already covered, skipping" %
+ str(index))
+
+ def get_sis_covering_index(cls, index):
+ """Return sorted list of SessionInfos which may cover index
+
+ Aborted backup may be relevant unless index is lower and we
+ are sure that it didn't go further.
+
+ """
+ return filter(lambda session_info:
+ not ((session_info.last_index is None or
+ session_info.last_index < index) and
+ session_info.last_definitive),
+ cls._session_info_list)
+
+ def SetSessionInfo(cls):
+ """Read data directory and initialize _session_info"""
+ silist = []
+ rp_quad_dict = cls.group_rps_by_time(cls.get_relevant_rps())
+ times = rp_quad_dict.keys()
+ times.sort()
+ for time in times:
+ silist.append(cls.quad_to_si(time, rp_quad_dict[time]))
+ cls._session_info_list = silist
+
+ def get_relevant_rps(cls):
+ """Return list of relevant rpaths in rbdata directory"""
+ relevant_bases = ['last-file-incremented', 'last-file-mirrored',
+ 'checkpoint-data', 'last-file-definitive']
+ rps = map(Globals.rbdir.append, Globals.rbdir.listdir())
+ return filter(lambda rp: rp.isincfile()
+ and rp.getincbase_str() in relevant_bases, rps)
+
+ def group_rps_by_time(cls, rplist):
+ """Take list of rps return time dict {time: quadlist}
+
+ Times in seconds are the keys, values are triples of rps
+ [last-file-incremented, last-file-mirrored, checkpoint-data,
+ last-is-definitive].
+
+ """
+ result = {}
+ for rp in rplist:
+ time = Time.stringtotime(rp.getinctime())
+ if result.has_key(time): quadlist = result[time]
+ else: quadlist = [None, None, None, None]
+ base_string = rp.getincbase_str()
+ if base_string == 'last-file-incremented': quadlist[0] = rp
+ elif base_string == 'last-file-mirrored': quadlist[1] = rp
+ elif base_string == 'last-file-definitive': quadlist[3] = 1
+ else:
+ assert base_string == 'checkpoint-data'
+ quadlist[2] = rp
+ result[time] = quadlist
+ return result
+
+ def quad_to_si(cls, time, quad):
+ """Take time, quadlist, return associated ResumeSessionInfo"""
+ increment_sym, mirror_sym, checkpoint_rp, last_definitive = quad
+ assert not (increment_sym and mirror_sym) # both shouldn't exist
+ ITR, finalizer = None, None
+ if increment_sym:
+ mirror = None
+ last_index = cls.sym_to_index(increment_sym)
+ if checkpoint_rp:
+ ITR, finalizer = cls.unpickle_checkpoint(checkpoint_rp)
+ elif mirror_sym:
+ mirror = 1
+ last_index = cls.sym_to_index(mirror_sym)
+ if checkpoint_rp:
+ finalizer = cls.unpickle_checkpoint(checkpoint_rp)
+ return ResumeSessionInfo(mirror, time, last_index, last_definitive,
+ finalizer, ITR)
+
+ def sym_to_index(cls, sym_rp):
+ """Read last file sym rp, return last file index
+
+ If sym_rp is not a sym at all, return None, indicating that no
+ file index was ever conclusively processed.
+
+ """
+ if not sym_rp.issym(): return None
+ link_components = sym_rp.readlink().split("/")
+ assert link_components[0] == 'increments'
+ return tuple(link_components[1:])
+
+ def unpickle_checkpoint(cls, checkpoint_rp):
+ """Read data from checkpoint_rp and return unpickled data
+
+ Return value is pair finalizer state for a mirror checkpoint,
+ and (patch increment ITR, finalizer state) for increment
+ checkpoint.
+
+ """
+ fp = checkpoint_rp.open("rb")
+ data = fp.read()
+ fp.close()
+ return cPickle.loads(data)
+
+ def ResumeCheck(cls):
+ """Return relevant ResumeSessionInfo if there's one we should resume
+
+ Also if find RSI to resume, reset current time to old resume
+ time.
+
+ """
+ cls.SetSessionInfo()
+ if not cls._session_info_list:
+ if Globals.resume == 1:
+ Log.FatalError("User specified resume, but no data on "
+ "previous backup found.")
+ else: return None
+ else:
+ si = cls._session_info_list[-1]
+ if (Globals.resume == 1 or
+ (time.time() <= (si.time + Globals.resume_window) and
+ not Globals.resume == 0)):
+ Log("Resuming aborted backup dated %s" %
+ Time.timetopretty(si.time), 2)
+ Time.setcurtime(si.time)
+ return si
+ else:
+ Log("Last backup dated %s was aborted, but we aren't "
+ "resuming it." % Time.timetopretty(si.time), 2)
+ return None
+ assert 0
+
+MakeClass(Resume)
+
+
+class ResumeSessionInfo:
+ """Hold information about a previously aborted session"""
+ def __init__(self, mirror, time, last_index,
+ last_definitive, finalizer_state = None, ITR_state = None):
+ """Class initializer
+
+ time - starting time in seconds of backup
+ mirror - true if backup was a mirror, false if increment
+ last_index - Last confirmed index processed by backup, or None
+ last_definitive - True is we know last_index is really last
+ finalizer_state - finalizer reducer state if available
+ ITR_state - For increment, ITM reducer state (assume mirror if NA)
+
+ """
+ self.time = time
+ self.mirror = mirror
+ self.last_index = last_index
+ self.last_definitive = last_definitive
+ self.ITR_state, self.finalizer_state, = ITR_state, finalizer_state
diff --git a/rdiff-backup/src/rorpiter.py b/rdiff-backup/src/rorpiter.py
new file mode 100644
index 0000000..5740ef8
--- /dev/null
+++ b/rdiff-backup/src/rorpiter.py
@@ -0,0 +1,248 @@
+execfile("robust.py")
+from __future__ import generators
+import tempfile
+
+#######################################################################
+#
+# rorpiter - Operations on Iterators of Read Only Remote Paths
+#
+
+class RORPIterException(Exception): pass
+
+class RORPIter:
+ """Functions relating to iterators of Read Only RPaths
+
+ The main structure will be an iterator that yields RORPaths.
+ Every RORPath has a "raw" form that makes it more amenable to
+ being turned into a file. The raw form of the iterator yields
+ each RORPath in the form of the tuple (index, data_dictionary,
+ files), where files is the number of files attached (usually 1 or
+ 0). After that, if a file is attached, it yields that file.
+
+ """
+ def ToRaw(rorp_iter):
+ """Convert a rorp iterator to raw form"""
+ for rorp in rorp_iter:
+ if rorp.file:
+ yield (rorp.index, rorp.data, 1)
+ yield rorp.file
+ else: yield (rorp.index, rorp.data, 0)
+
+ def FromRaw(raw_iter):
+ """Convert raw rorp iter back to standard form"""
+ for index, data, num_files in raw_iter:
+ rorp = RORPath(index, data)
+ if num_files:
+ assert num_files == 1, "Only one file accepted right now"
+ rorp.setfile(RORPIter.getnext(raw_iter))
+ yield rorp
+
+ def ToFile(rorp_iter):
+ """Return file version of iterator"""
+ return FileWrappingIter(RORPIter.ToRaw(rorp_iter))
+
+ def FromFile(fileobj):
+ """Recover rorp iterator from file interface"""
+ return RORPIter.FromRaw(IterWrappingFile(fileobj))
+
+ def IterateRPaths(base_rp):
+ """Return an iterator yielding RPaths with given base rp"""
+ yield base_rp
+ if base_rp.isdir():
+ dirlisting = base_rp.listdir()
+ dirlisting.sort()
+ for filename in dirlisting:
+ for rp in RORPIter.IterateRPaths(base_rp.append(filename)):
+ yield rp
+
+ def Signatures(rp_iter):
+ """Yield signatures of rpaths in given rp_iter"""
+ for rp in rp_iter:
+ if rp.isplaceholder(): yield rp
+ else:
+ rorp = rp.getRORPath()
+ if rp.isreg(): rorp.setfile(Rdiff.get_signature(rp))
+ yield rorp
+
+ def GetSignatureIter(base_rp):
+ """Return a signature iterator recurring over the base_rp"""
+ return RORPIter.Signatures(RORPIter.IterateRPaths(base_rp))
+
+ def CollateIterators(*rorp_iters):
+ """Collate RORPath iterators by index
+
+ So it takes two or more iterators of rorps and returns an
+ iterator yielding tuples like (rorp1, rorp2) with the same
+ index. If one or the other lacks that index, it will be None
+
+ """
+ # overflow[i] means that iter[i] has been exhausted
+ # rorps[i] is None means that it is time to replenish it.
+ iter_num = len(rorp_iters)
+ if iter_num == 2:
+ return RORPIter.Collate2Iters(rorp_iters[0], rorp_iters[1])
+ overflow = [None] * iter_num
+ rorps = overflow[:]
+
+ def setrorps(overflow, rorps):
+ """Set the overflow and rorps list"""
+ for i in range(iter_num):
+ if not overflow[i] and rorps[i] is None:
+ try: rorps[i] = rorp_iters[i].next()
+ except StopIteration:
+ overflow[i] = 1
+ rorps[i] = None
+
+ def getleastindex(rorps):
+ """Return the first index in rorps, assuming rorps isn't empty"""
+ return min(map(lambda rorp: rorp.index,
+ filter(lambda x: x, rorps)))
+
+ def yield_tuples(iter_num, overflow, rorps):
+ while 1:
+ setrorps(overflow, rorps)
+ if not None in overflow: break
+
+ index = getleastindex(rorps)
+ yieldval = []
+ for i in range(iter_num):
+ if rorps[i] and rorps[i].index == index:
+ yieldval.append(rorps[i])
+ rorps[i] = None
+ else: yieldval.append(None)
+ yield IndexedTuple(index, yieldval)
+ return yield_tuples(iter_num, overflow, rorps)
+
+ def Collate2Iters(riter1, riter2):
+ """Special case of CollateIterators with 2 arguments
+
+ This does the same thing but is faster because it doesn't have
+ to consider the >2 iterator case. Profiler says speed is
+ important here.
+
+ """
+ relem1, relem2 = None, None
+ while 1:
+ if not relem1:
+ try: relem1 = riter1.next()
+ except StopIteration:
+ if relem2: yield IndexedTuple(index2, (None, relem2))
+ for relem2 in riter2:
+ yield IndexedTuple(relem2.index, (None, relem2))
+ break
+ index1 = relem1.index
+ if not relem2:
+ try: relem2 = riter2.next()
+ except StopIteration:
+ if relem1: yield IndexedTuple(index1, (relem1, None))
+ for relem1 in riter1:
+ yield IndexedTuple(relem1.index, (relem1, None))
+ break
+ index2 = relem2.index
+
+ if index1 < index2:
+ yield IndexedTuple(index1, (relem1, None))
+ relem1 = None
+ elif index1 == index2:
+ yield IndexedTuple(index1, (relem1, relem2))
+ relem1, relem2 = None, None
+ else: # index2 is less
+ yield IndexedTuple(index2, (None, relem2))
+ relem2 = None
+
+ def getnext(iter):
+ """Return the next element of an iterator, raising error if none"""
+ try: next = iter.next()
+ except StopIteration: raise RORPIterException("Unexpected end to iter")
+ return next
+
+ def GetDiffIter(sig_iter, new_iter):
+ """Return delta iterator from sig_iter to new_iter
+
+ The accompanying file for each will be a delta as produced by
+ rdiff, unless the destination file does not exist, in which
+ case it will be the file in its entirety.
+
+ sig_iter may be composed of rorps, but new_iter should have
+ full RPaths.
+
+ """
+ collated_iter = RORPIter.CollateIterators(sig_iter, new_iter)
+ for rorp, rp in collated_iter: yield RORPIter.diffonce(rorp, rp)
+
+ def diffonce(sig_rorp, new_rp):
+ """Return one diff rorp, based from signature rorp and orig rp"""
+ if sig_rorp and sig_rorp.isreg() and new_rp and new_rp.isreg():
+ diff_rorp = new_rp.getRORPath()
+ diff_rorp.setfile(Rdiff.get_delta_sigfileobj(sig_rorp.open("rb"),
+ new_rp))
+ diff_rorp.set_attached_filetype('diff')
+ return diff_rorp
+ else:
+ # Just send over originial if diff isn't appropriate
+ if sig_rorp: sig_rorp.close_if_necessary()
+ if not new_rp: return RORPath(sig_rorp.index)
+ elif new_rp.isreg():
+ diff_rorp = new_rp.getRORPath(1)
+ diff_rorp.set_attached_filetype('snapshot')
+ return diff_rorp
+ else: return new_rp.getRORPath()
+
+ def PatchIter(base_rp, diff_iter):
+ """Patch the appropriate rps in basis_iter using diff_iter"""
+ basis_iter = RORPIter.IterateRPaths(base_rp)
+ collated_iter = RORPIter.CollateIterators(basis_iter, diff_iter)
+ for basisrp, diff_rorp in collated_iter:
+ RORPIter.patchonce_action(base_rp, basisrp, diff_rorp).execute()
+
+ def patchonce_action(base_rp, basisrp, diff_rorp):
+ """Return action patching basisrp using diff_rorp"""
+ assert diff_rorp, "Missing diff index %s" % basisrp.index
+ if not diff_rorp.lstat():
+ return RobustAction(lambda: None, basisrp.delete, lambda e: None)
+
+ if basisrp and basisrp.isreg() and diff_rorp.isreg():
+ assert diff_rorp.get_attached_filetype() == 'diff'
+ return Rdiff.patch_with_attribs_action(basisrp, diff_rorp)
+ else: # Diff contains whole file, just copy it over
+ if not basisrp: basisrp = base_rp.new_index(diff_rorp.index)
+ return Robust.copy_with_attribs_action(diff_rorp, basisrp)
+
+MakeStatic(RORPIter)
+
+
+
+class IndexedTuple:
+ """Like a tuple, but has .index
+
+ This is used by CollateIterator above, and can be passed to the
+ IterTreeReducer.
+
+ """
+ def __init__(self, index, sequence):
+ self.index = index
+ self.data = tuple(sequence)
+
+ def __len__(self): return len(self.data)
+
+ def __getitem__(self, key):
+ """This only works for numerical keys (faster that way)"""
+ return self.data[key]
+
+ def __cmp__(self, other):
+ assert isinstance(other, IndexedTuple)
+ if self.index < other.index: return -1
+ elif self.index == other.index: return 0
+ else: return 1
+
+ def __eq__(self, other):
+ if isinstance(other, IndexedTuple):
+ return self.index == other.index and self.data == other.data
+ elif type(other) is types.TupleType:
+ return self.data == other
+ else: return None
+
+ def __str__(self):
+ assert len(self.data) == 2
+ return "(%s, %s).%s" % (str(self.data[0]), str(self.data[1]),
+ str(self.index))
diff --git a/rdiff-backup/src/rpath.py b/rdiff-backup/src/rpath.py
new file mode 100644
index 0000000..4e6cc8f
--- /dev/null
+++ b/rdiff-backup/src/rpath.py
@@ -0,0 +1,704 @@
+execfile("connection.py")
+import os, stat, re, sys, shutil
+
+#######################################################################
+#
+# rpath - Wrapper class around a real path like "/usr/bin/env"
+#
+# The RPath and associated classes make some function calls more
+# convenient (e.g. RPath.getperms()) and also make working with files
+# on remote systems transparent.
+#
+
+class RPathException(Exception): pass
+
+class RPathStatic:
+ """Contains static methods for use with RPaths"""
+ def copyfileobj(inputfp, outputfp):
+ """Copies file inputfp to outputfp in blocksize intervals"""
+ blocksize = Globals.blocksize
+ while 1:
+ inbuf = inputfp.read(blocksize)
+ if not inbuf: break
+ outputfp.write(inbuf)
+
+ def cmpfileobj(fp1, fp2):
+ """True if file objects fp1 and fp2 contain same data"""
+ blocksize = Globals.blocksize
+ while 1:
+ buf1 = fp1.read(blocksize)
+ buf2 = fp2.read(blocksize)
+ if buf1 != buf2: return None
+ elif not buf1: return 1
+
+ def check_for_files(*rps):
+ """Make sure that all the rps exist, raise error if not"""
+ for rp in rps:
+ if not rp.lstat():
+ raise RPathException("File %s does not exist" % rp.path)
+
+ def move(rpin, rpout):
+ """Move rpin to rpout, renaming if possible"""
+ try: RPath.rename(rpin, rpout)
+ except os.error:
+ RPath.copy(rpin, rpout)
+ rpin.delete()
+
+ def copy(rpin, rpout):
+ """Copy RPath rpin to rpout. Works for symlinks, dirs, etc."""
+ Log("Regular copying %s to %s" % (rpin.index, rpout.path), 6)
+ if not rpin.lstat():
+ raise RPathException, ("File %s does not exist" % rpin.index)
+
+ if rpout.lstat():
+ if rpin.isreg() or not RPath.cmp(rpin, rpout):
+ rpout.delete() # easier to write that compare
+ else: return
+
+ if rpin.isreg(): RPath.copy_reg_file(rpin, rpout)
+ elif rpin.isdir(): rpout.mkdir()
+ elif rpin.issym(): rpout.symlink(rpin.readlink())
+ elif rpin.ischardev():
+ major, minor = rpin.getdevnums()
+ rpout.makedev("c", major, minor)
+ elif rpin.isblkdev():
+ major, minor = rpin.getdevnums()
+ rpout.makedev("b", major, minor)
+ elif rpin.isfifo(): rpout.mkfifo()
+ elif rpin.issock(): Log("Found socket, ignoring", 1)
+ else: raise RPathException("File %s has unknown type" % rpin.path)
+
+ def copy_reg_file(rpin, rpout):
+ """Copy regular file rpin to rpout, possibly avoiding connection"""
+ try:
+ if rpout.conn is rpin.conn:
+ rpout.conn.shutil.copyfile(rpin.path, rpout.path)
+ rpout.data = {'type': rpin.data['type']}
+ return
+ except AttributeError: pass
+ rpout.write_from_fileobj(rpin.open("rb"))
+
+ def cmp(rpin, rpout):
+ """True if rpin has the same data as rpout
+
+ cmp does not compare file ownership, permissions, or times, or
+ examine the contents of a directory.
+
+ """
+ RPath.check_for_files(rpin, rpout)
+ if rpin.isreg():
+ if not rpout.isreg(): return None
+ fp1, fp2 = rpin.open("rb"), rpout.open("rb")
+ result = RPathStatic.cmpfileobj(fp1, fp2)
+ if fp1.close() or fp2.close():
+ raise RPathException("Error closing file")
+ return result
+ elif rpin.isdir(): return rpout.isdir()
+ elif rpin.issym():
+ return rpout.issym() and (rpin.readlink() == rpout.readlink())
+ elif rpin.ischardev():
+ return rpout.ischardev() and \
+ (rpin.getdevnums() == rpout.getdevnums())
+ elif rpin.isblkdev():
+ return rpout.isblkdev() and \
+ (rpin.getdevnums() == rpout.getdevnums())
+ elif rpin.isfifo(): return rpout.isfifo()
+ elif rpin.issock(): return rpout.issock()
+ else: raise RPathException("File %s has unknown type" % rpin.path)
+
+ def copy_attribs(rpin, rpout):
+ """Change file attributes of rpout to match rpin
+
+ Only changes the chmoddable bits, uid/gid ownership, and
+ timestamps, so both must already exist.
+
+ """
+ Log("Copying attributes from %s to %s" % (rpin.index, rpout.path), 7)
+ RPath.check_for_files(rpin, rpout)
+ if rpin.issym(): return # symlinks have no valid attributes
+ if Globals.change_ownership: apply(rpout.chown, rpin.getuidgid())
+ rpout.chmod(rpin.getperms())
+ if not rpin.isdev(): rpout.setmtime(rpin.getmtime())
+
+ def cmp_attribs(rp1, rp2):
+ """True if rp1 has the same file attributes as rp2
+
+ Does not compare file access times. If not changing
+ ownership, do not check user/group id.
+
+ """
+ RPath.check_for_files(rp1, rp2)
+ if Globals.change_ownership and rp1.getuidgid() != rp2.getuidgid():
+ result = None
+ elif rp1.getperms() != rp2.getperms(): result = None
+ elif rp1.issym() and rp2.issym(): # Don't check times for some types
+ result = 1
+ elif rp1.isblkdev() and rp2.isblkdev(): result = 1
+ elif rp1.ischardev() and rp2.ischardev(): result = 1
+ else: result = (rp1.getmtime() == rp2.getmtime())
+ Log("Compare attribs %s and %s: %s" % (rp1.path, rp2.path, result), 7)
+ return result
+
+ def copy_with_attribs(rpin, rpout):
+ """Copy file and then copy over attributes"""
+ RPath.copy(rpin, rpout)
+ RPath.copy_attribs(rpin, rpout)
+
+ def quick_cmp_with_attribs(rp1, rp2):
+ """Quicker version of cmp_with_attribs
+
+ Instead of reading all of each file, assume that regular files
+ are the same if the attributes compare.
+
+ """
+ if not RPath.cmp_attribs(rp1, rp2): return None
+ if rp1.isreg() and rp2.isreg() and (rp1.getlen() == rp2.getlen()):
+ return 1
+ return RPath.cmp(rp1, rp2)
+
+ def cmp_with_attribs(rp1, rp2):
+ """Combine cmp and cmp_attribs"""
+ return RPath.cmp_attribs(rp1, rp2) and RPath.cmp(rp1, rp2)
+
+ def rename(rp_source, rp_dest):
+ """Rename rp_source to rp_dest"""
+ assert rp_source.conn is rp_dest.conn
+ Log("Renaming %s to %s" % (rp_source.path, rp_dest.path), 7)
+ rp_source.conn.os.rename(rp_source.path, rp_dest.path)
+ rp_dest.data = rp_source.data
+ rp_source.data = {'type': None}
+
+ def tupled_lstat(filename):
+ """Like os.lstat, but return only a tuple, or None if os.error
+
+ Later versions of os.lstat return a special lstat object,
+ which can confuse the pickler and cause errors in remote
+ operations.
+
+ """
+ try: return tuple(os.lstat(filename))
+ except os.error: return None
+
+ def cmp_recursive(rp1, rp2):
+ """True if rp1 and rp2 are at the base of same directories
+
+ Includes only attributes, no file data. This function may not
+ be used in rdiff-backup but it comes in handy in the unit
+ tests.
+
+ """
+ rp1.setdata()
+ rp2.setdata()
+ dsiter1, dsiter2 = map(DestructiveStepping.Iterate_with_Finalizer,
+ [rp1, rp2], [1, None])
+ result = Iter.equal(dsiter1, dsiter2, 1)
+ for i in dsiter1: pass # make sure all files processed anyway
+ for i in dsiter2: pass
+ return result
+
+MakeStatic(RPathStatic)
+
+
+class RORPath(RPathStatic):
+ """Read Only RPath - carry information about a path
+
+ These contain information about a file, and possible the file's
+ data, but do not have a connection and cannot be written to or
+ changed. The advantage of these objects is that they can be
+ communicated by encoding their index and data dictionary.
+
+ """
+ def __init__(self, index, data = None):
+ self.index = index
+ if data: self.data = data
+ else: self.data = {'type':None} # signify empty file
+ self.file = None
+
+ def __eq__(self, other):
+ """Signal two files equivalent"""
+ if not Globals.change_ownership or self.issym() and other.issym():
+ # Don't take file ownership into account when comparing
+ data1, data2 = self.data.copy(), other.data.copy()
+ for d in (data1, data2):
+ for key in ('uid', 'gid'):
+ if d.has_key(key): del d[key]
+ return self.index == other.index and data1 == data2
+ else: return self.index == other.index and self.data == other.data
+
+ def __str__(self):
+ """Pretty print file statistics"""
+ return "Index: %s\nData: %s" % (self.index, self.data)
+
+ def __getstate__(self):
+ """Return picklable state
+
+ This is necessary in case the RORPath is carrying around a
+ file object, which can't/shouldn't be pickled.
+
+ """
+ return (self.index, self.data)
+
+ def __setstate__(self, rorp_state):
+ """Reproduce RORPath from __getstate__ output"""
+ self.index, self.data = rorp_state
+
+ def make_placeholder(self):
+ """Make rorp into a placeholder
+
+ This object doesn't contain any information about the file,
+ but, when passed along, may show where the previous stages are
+ in their processing. It is the RORPath equivalent of fiber.
+
+ """
+ self.data = {'placeholder':
+ ("It is actually good for placeholders to use"
+ "up a bit of memory, so the buffers get flushed"
+ "more often when placeholders move through."
+ "See the get_dissimilar docs for more info.")}
+
+ def isplaceholder(self):
+ """True if the object is a placeholder"""
+ return self.data.has_key('placeholder')
+
+ def lstat(self):
+ """Returns type of file
+
+ The allowable types are None if the file doesn't exist, 'reg'
+ for a regular file, 'dir' for a directory, 'dev' for a device
+ file, 'fifo' for a fifo, 'sock' for a socket, and 'sym' for a
+ symlink.
+
+ """
+ return self.data['type']
+ gettype = lstat
+
+ def isdir(self):
+ """True if self is a dir"""
+ return self.data['type'] == 'dir'
+
+ def isreg(self):
+ """True if self is a regular file"""
+ return self.data['type'] == 'reg'
+
+ def issym(self):
+ """True if path is of a symlink"""
+ return self.data['type'] == 'sym'
+
+ def isfifo(self):
+ """True if path is a fifo"""
+ return self.data['type'] == 'fifo'
+
+ def ischardev(self):
+ """True if path is a character device file"""
+ return self.data['type'] == 'dev' and self.data['devnums'][0] == 'c'
+
+ def isblkdev(self):
+ """True if path is a block device file"""
+ return self.data['type'] == 'dev' and self.data['devnums'][0] == 'b'
+
+ def isdev(self):
+ """True if path is a device file"""
+ return self.data['type'] == 'dev'
+
+ def issock(self):
+ """True if path is a socket"""
+ return self.data['type'] == 'sock'
+
+ def getperms(self):
+ """Return permission block of file"""
+ return self.data['perms']
+
+ def getsize(self):
+ """Return length of file in bytes"""
+ return self.data['size']
+
+ def getuidgid(self):
+ """Return userid/groupid of file"""
+ return self.data['uid'], self.data['gid']
+
+ def getatime(self):
+ """Return access time in seconds"""
+ return self.data['atime']
+
+ def getmtime(self):
+ """Return modification time in seconds"""
+ return self.data['mtime']
+
+ def readlink(self):
+ """Wrapper around os.readlink()"""
+ return self.data['linkname']
+
+ def getdevnums(self):
+ """Return a devices major/minor numbers from dictionary"""
+ return self.data['devnums'][1:]
+
+ def setfile(self, file):
+ """Right now just set self.file to be the already opened file"""
+ assert file and not self.file
+ def closing_hook(): self.file_already_open = None
+ self.file = RPathFileHook(file, closing_hook)
+ self.file_already_open = None
+
+ def get_attached_filetype(self):
+ """If there is a file attached, say what it is
+
+ Currently the choices are 'snapshot' meaning an exact copy of
+ something, and 'diff' for an rdiff style diff.
+
+ """
+ return self.data['filetype']
+
+ def set_attached_filetype(self, type):
+ """Set the type of the attached file"""
+ self.data['filetype'] = type
+
+ def open(self, mode):
+ """Return file type object if any was given using self.setfile"""
+ if mode != "rb": raise RPathException("Bad mode %s" % mode)
+ if self.file_already_open:
+ raise RPathException("Attempt to open same file twice")
+ self.file_already_open = 1
+ return self.file
+
+ def close_if_necessary(self):
+ """If file is present, discard data and close"""
+ if self.file:
+ while self.file.read(Globals.blocksize): pass
+ assert not self.file.close(), \
+ "Error closing file\ndata = %s\nindex = %s\n" % (self.data,
+ self.index)
+ self.file_already_open = None
+
+
+class RPath(RORPath):
+ """Remote Path class - wrapper around a possibly non-local pathname
+
+ This class contains a dictionary called "data" which should
+ contain all the information about the file sufficient for
+ identification (i.e. if two files have the the same (==) data
+ dictionary, they are the same file).
+
+ """
+ regex_chars_to_quote = re.compile("[\\\\\\\"\\$`]")
+
+ def __init__(self, connection, base, index = (), data = None):
+ """RPath constructor
+
+ connection = self.conn is the Connection the RPath will use to
+ make system calls, and index is the name of the rpath used for
+ comparison, and should be a tuple consisting of the parts of
+ the rpath after the base split up. For instance ("foo",
+ "bar") for "foo/bar" (no base), and ("local", "bin") for
+ "/usr/local/bin" if the base is "/usr".
+
+ """
+ self.conn = connection
+ self.index = index
+ self.base = base
+ self.path = apply(os.path.join, (base,) + self.index)
+ self.file = None
+ if data: self.data = data
+ else: self.setdata()
+
+ def __str__(self):
+ return "Path: %s\nIndex: %s\nData: %s" % (self.path, self.index,
+ self.data)
+
+ def __getstate__(self):
+ """Return picklable state
+
+ The connection must be local because we can't pickle a
+ connection. Data and any attached file also won't be saved.
+
+ """
+ assert self.conn is Globals.local_connection
+ return (self.index, self.base, self.data)
+
+ def __setstate__(self, rpath_state):
+ """Reproduce RPath from __getstate__ output"""
+ self.index, self.base, self.data = rpath_state
+
+ def setdata(self):
+ """Create the data dictionary"""
+ statblock = self.conn.RPathStatic.tupled_lstat(self.path)
+ if statblock is None:
+ self.data = {'type':None}
+ return
+ data = {}
+ mode = statblock[stat.ST_MODE]
+
+ if stat.S_ISREG(mode):
+ type = 'reg'
+ data['size'] = statblock[stat.ST_SIZE]
+ elif stat.S_ISDIR(mode): type = 'dir'
+ elif stat.S_ISCHR(mode):
+ type = 'dev'
+ data['devnums'] = ('c',) + self._getdevnums()
+ elif stat.S_ISBLK(mode):
+ type = 'dev'
+ data['devnums'] = ('b',) + self._getdevnums()
+ elif stat.S_ISFIFO(mode): type = 'fifo'
+ elif stat.S_ISLNK(mode):
+ type = 'sym'
+ data['linkname'] = self.conn.os.readlink(self.path)
+ elif stat.S_ISSOCK(mode): type = 'sock'
+ else: raise RPathException("Unknown type for %s" % self.path)
+ data['type'] = type
+ data['perms'] = stat.S_IMODE(mode)
+ data['uid'] = statblock[stat.ST_UID]
+ data['gid'] = statblock[stat.ST_GID]
+
+ if not (type == 'sym' or type == 'dev'):
+ # mtimes on symlinks and dev files don't work consistently
+ data['mtime'] = long(statblock[stat.ST_MTIME])
+
+ if Globals.preserve_atime and not type == 'sym':
+ data['atime'] = long(statblock[stat.ST_ATIME])
+ self.data = data
+
+ def check_consistency(self):
+ """Raise an error if consistency of rp broken
+
+ This is useful for debugging when the cache and disk get out
+ of sync and you need to find out where it happened.
+
+ """
+ temptype = self.data['type']
+ self.setdata()
+ assert temptype == self.data['type'], \
+ "\nName: %s\nOld: %s --> New: %s\n" % \
+ (self.path, temptype, self.data['type'])
+
+ def _getdevnums(self):
+ """Return tuple for special file (major, minor)"""
+ assert self.conn is Globals.local_connection
+ if Globals.exclude_device_files:
+ # No point in finding numbers because it will be excluded anyway
+ return ()
+ s = os.lstat(self.path).st_rdev
+ return (s >> 8, s & 0xff)
+
+ def chmod(self, permissions):
+ """Wrapper around os.chmod"""
+ self.conn.os.chmod(self.path, permissions)
+ self.data['perms'] = permissions
+
+ def settime(self, accesstime, modtime):
+ """Change file modification times"""
+ Log("Setting time of %s to %d" % (self.path, modtime), 7)
+ self.conn.os.utime(self.path, (accesstime, modtime))
+ self.data['atime'] = accesstime
+ self.data['mtime'] = modtime
+
+ def setmtime(self, modtime):
+ """Set only modtime (access time to present)"""
+ Log("Setting time of %s to %d" % (self.path, modtime), 7)
+ self.conn.os.utime(self.path, (time.time(), modtime))
+ self.data['mtime'] = modtime
+
+ def chown(self, uid, gid):
+ """Set file's uid and gid"""
+ self.conn.os.chown(self.path, uid, gid)
+ self.data['uid'] = uid
+ self.data['gid'] = gid
+
+ def mkdir(self):
+ Log("Making directory " + self.path, 6)
+ self.conn.os.mkdir(self.path)
+ self.setdata()
+
+ def rmdir(self):
+ Log("Removing directory " + self.path, 6)
+ self.conn.os.rmdir(self.path)
+ self.data = {'type': None}
+
+ def listdir(self):
+ """Return list of string paths returned by os.listdir"""
+ return self.conn.os.listdir(self.path)
+
+ def symlink(self, linktext):
+ """Make symlink at self.path pointing to linktext"""
+ self.conn.os.symlink(linktext, self.path)
+ self.setdata()
+ assert self.issym()
+
+ def mkfifo(self):
+ """Make a fifo at self.path"""
+ self.conn.os.mkfifo(self.path)
+ self.setdata()
+ assert self.isfifo()
+
+ def touch(self):
+ """Make sure file at self.path exists"""
+ Log("Touching " + self.path, 7)
+ self.conn.open(self.path, "w").close()
+ self.setdata()
+ assert self.isreg()
+
+ def hasfullperms(self):
+ """Return true if current process has full permissions on the file"""
+ if self.isowner(): return self.getperms() % 01000 >= 0700
+ elif self.isgroup(): return self.getperms() % 0100 >= 070
+ else: return self.getperms() % 010 >= 07
+
+ def readable(self):
+ """Return true if current process has read permissions on the file"""
+ if self.isowner(): return self.getperms() % 01000 >= 0400
+ elif self.isgroup(): return self.getperms() % 0100 >= 040
+ else: return self.getperms() % 010 >= 04
+
+ def executable(self):
+ """Return true if current process has execute permissions"""
+ if self.isowner(): return self.getperms() % 0200 >= 0100
+ elif self.isgroup(): return self.getperms() % 020 >= 010
+ else: return self.getperms() % 02 >= 01
+
+ def isowner(self):
+ """Return true if current process is owner of rp or root"""
+ uid = self.conn.Globals.get('process_uid')
+ return uid == 0 or uid == self.data['uid']
+
+ def isgroup(self):
+ """Return true if current process is in group of rp"""
+ return self.conn.Globals.get('process_gid') == self.data['gid']
+
+ def delete(self):
+ """Delete file at self.path
+
+ The destructive stepping allows this function to delete
+ directories even if they have files and we lack permissions.
+
+ """
+ Log("Deleting %s" % self.path, 7)
+ self.setdata()
+ if not self.lstat(): return # must have been deleted in meantime
+ elif self.isdir():
+ def helper(dsrp, base_init_output, branch_reduction):
+ if dsrp.isdir(): dsrp.rmdir()
+ else: dsrp.delete()
+ dsiter = DestructiveStepping.Iterate_from(self, None)
+ itm = IterTreeReducer(lambda x: None, lambda x,y: None, None,
+ helper)
+ for dsrp in dsiter: itm(dsrp)
+ itm.getresult()
+ else: self.conn.os.unlink(self.path)
+ self.setdata()
+
+ def quote(self):
+ """Return quoted self.path for use with os.system()"""
+ return '"%s"' % self.regex_chars_to_quote.sub(
+ lambda m: "\\"+m.group(0), self.path)
+
+ def normalize(self):
+ """Return RPath canonical version of self.path
+
+ This just means that redundant /'s will be removed, including
+ the trailing one, even for directories. ".." components will
+ be retained.
+
+ """
+ newpath = "/".join(filter(lambda x: x and x != ".",
+ self.path.split("/")))
+ if self.path[0] == "/": newpath = "/" + newpath
+ elif not newpath: newpath = "."
+ return self.__class__(self.conn, newpath, ())
+
+ def dirsplit(self):
+ """Returns a tuple of strings (dirname, basename)
+
+ Basename is never '' unless self is root, so it is unlike
+ os.path.basename. If path is just above root (so dirname is
+ root), then dirname is ''. In all other cases dirname is not
+ the empty string. Also, dirsplit depends on the format of
+ self, so basename could be ".." and dirname could be a
+ subdirectory. For an atomic relative path, dirname will be
+ '.'.
+
+ """
+ normed = self.normalize()
+ if normed.path.find("/") == -1: return (".", normed.path)
+ comps = normed.path.split("/")
+ return "/".join(comps[:-1]), comps[-1]
+
+ def append(self, ext):
+ """Return new RPath with same connection by adjoing ext"""
+ return self.__class__(self.conn, self.base, self.index + (ext,))
+
+ def new_index(self, index):
+ """Return similar RPath but with new index"""
+ return self.__class__(self.conn, self.base, index)
+
+ def open(self, mode):
+ """Return open file. Supports modes "w" and "r"."""
+ return self.conn.open(self.path, mode)
+
+ def write_from_fileobj(self, fp):
+ """Reads fp and writes to self.path. Closes both when done"""
+ Log("Writing file object to " + self.path, 7)
+ assert not self.lstat(), "File %s already exists" % self.path
+ outfp = self.open("wb")
+ RPath.copyfileobj(fp, outfp)
+ if fp.close() or outfp.close():
+ raise RPathException("Error closing file")
+ self.setdata()
+
+ def isincfile(self):
+ """Return true if path looks like an increment file"""
+ dotsplit = self.path.split(".")
+ if len(dotsplit) < 3: return None
+ timestring, ext = dotsplit[-2:]
+ if Time.stringtotime(timestring) is None: return None
+ return (ext == "snapshot" or ext == "dir" or
+ ext == "missing" or ext == "diff")
+
+ def getinctype(self):
+ """Return type of an increment file"""
+ return self.path.split(".")[-1]
+
+ def getinctime(self):
+ """Return timestring of an increment file"""
+ return self.path.split(".")[-2]
+
+ def getincbase(self):
+ """Return the base filename of an increment file in rp form"""
+ if self.index:
+ return self.__class__(self.conn, self.base, self.index[:-1] +
+ ((".".join(self.index[-1].split(".")[:-2])),))
+ else: return self.__class__(self.conn,
+ ".".join(self.base.split(".")[:-2]), ())
+
+ def getincbase_str(self):
+ """Return the base filename string of an increment file"""
+ return self.getincbase().dirsplit()[1]
+
+ def makedev(self, type, major, minor):
+ """Make a special file with specified type, and major/minor nums"""
+ cmdlist = ['mknod', self.path, type, str(major), str(minor)]
+ if self.conn.os.spawnvp(os.P_WAIT, 'mknod', cmdlist) != 0:
+ RPathException("Error running %s" % cmdlist)
+ if type == 'c': datatype = 'chr'
+ elif type == 'b': datatype = 'blk'
+ else: raise RPathException
+ self.data = {'type': datatype, 'devnums': (type, major, minor)}
+
+ def getRORPath(self, include_contents = None):
+ """Return read only version of self"""
+ rorp = RORPath(self.index, self.data)
+ if include_contents: rorp.setfile(self.open("rb"))
+ return rorp
+
+
+class RPathFileHook:
+ """Look like a file, but add closing hook"""
+ def __init__(self, file, closing_thunk):
+ self.file = file
+ self.closing_thunk = closing_thunk
+
+ def read(self, length = -1): return self.file.read(length)
+ def write(self, buf): return self.file.write(buf)
+
+ def close(self):
+ """Close file and then run closing thunk"""
+ result = self.file.close()
+ self.closing_thunk()
+ return result
diff --git a/rdiff-backup/src/setconnections.py b/rdiff-backup/src/setconnections.py
new file mode 100644
index 0000000..07c6893
--- /dev/null
+++ b/rdiff-backup/src/setconnections.py
@@ -0,0 +1,205 @@
+execfile("highlevel.py")
+
+#######################################################################
+#
+# setconnections - Parse initial arguments and establish connections
+#
+
+class SetConnectionsException(Exception): pass
+
+class SetConnections:
+ """Parse args and setup connections
+
+ The methods in this class are used once by Main to parse file
+ descriptions like bescoto@folly.stanford.edu:/usr/bin/ls and to
+ set up the related connections.
+
+ """
+ # This is the schema that determines how rdiff-backup will open a
+ # pipe to the remote system. If the file is given as A:B, %s will
+ # be substituted with A in the schema.
+ __cmd_schema = 'ssh %s rdiff-backup --server'
+
+ # This is a list of remote commands used to start the connections.
+ # The first is None because it is the local connection.
+ __conn_remote_cmds = [None]
+
+ def InitRPs(cls, arglist, remote_schema = None, remote_cmd = None):
+ """Map the given file descriptions into rpaths and return list"""
+ if remote_schema: cls.__cmd_schema = remote_schema
+ if not arglist: return []
+ desc_pairs = map(cls.parse_file_desc, arglist)
+
+ if filter(lambda x: x[0], desc_pairs): # True if any host_info found
+ if remote_cmd:
+ Log.FatalError("The --remote-cmd flag is not compatible "
+ "with remote file descriptions.")
+ elif remote_schema:
+ Log("Remote schema option ignored - no remote file "
+ "descriptions.", 2)
+
+ cmd_pairs = map(cls.desc2cmd_pairs, desc_pairs)
+ if remote_cmd: # last file description gets remote_cmd
+ cmd_pairs[-1] = (remote_cmd, cmd_pairs[-1][1])
+ return map(cls.cmdpair2rp, cmd_pairs)
+
+ def cmdpair2rp(cls, cmd_pair):
+ """Return RPath from cmd_pair (remote_cmd, filename)"""
+ cmd, filename = cmd_pair
+ if cmd: conn = cls.init_connection(cmd)
+ else: conn = Globals.local_connection
+ return RPath(conn, filename)
+
+ def desc2cmd_pairs(cls, desc_pair):
+ """Return pair (remote_cmd, filename) from desc_pair"""
+ host_info, filename = desc_pair
+ if not host_info: return (None, filename)
+ else: return (cls.fill_schema(host_info), filename)
+
+ def parse_file_desc(cls, file_desc):
+ """Parse file description returning pair (host_info, filename)
+
+ In other words, bescoto@folly.stanford.edu::/usr/bin/ls =>
+ ("bescoto@folly.stanford.edu", "/usr/bin/ls"). The
+ complication is to allow for quoting of : by a \. If the
+ string is not separated by :, then the host_info is None.
+
+ """
+ def check_len(i):
+ if i >= len(file_desc):
+ raise SetConnectionsException(
+ "Unexpected end to file description %s" % file_desc)
+
+ host_info_list, i, last_was_quoted = [], 0, None
+ while 1:
+ if i == len(file_desc):
+ return (None, file_desc)
+
+ if file_desc[i] == '\\':
+ i = i+1
+ check_len(i)
+ last_was_quoted = 1
+ elif (file_desc[i] == ":" and i > 0 and file_desc[i-1] == ":"
+ and not last_was_quoted):
+ host_info_list.pop() # Remove last colon from name
+ break
+ else: last_was_quoted = None
+ host_info_list.append(file_desc[i])
+ i = i+1
+
+ check_len(i+1)
+ return ("".join(host_info_list), file_desc[i+1:])
+
+ def fill_schema(cls, host_info):
+ """Fills host_info into the schema and returns remote command"""
+ return cls.__cmd_schema % host_info
+
+ def init_connection(cls, remote_cmd):
+ """Run remote_cmd, register connection, and then return it
+
+ If remote_cmd is None, then the local connection will be
+ returned. This also updates some settings on the remote side,
+ like global settings, its connection number, and verbosity.
+
+ """
+ if not remote_cmd: return Globals.local_connection
+
+ Log("Executing " + remote_cmd, 4)
+ stdin, stdout = os.popen2(remote_cmd)
+ conn_number = len(Globals.connections)
+ conn = PipeConnection(stdout, stdin, conn_number)
+
+ cls.check_connection_version(conn)
+ Log("Registering connection %d" % conn_number, 7)
+ cls.init_connection_routing(conn, conn_number, remote_cmd)
+ cls.init_connection_settings(conn)
+ return conn
+
+ def check_connection_version(cls, conn):
+ """Log warning if connection has different version"""
+ remote_version = conn.Globals.get('version')
+ if remote_version != Globals.version:
+ Log("Warning: Local version %s does not match remote version %s."
+ % (Globals.version, remote_version), 2)
+
+ def init_connection_routing(cls, conn, conn_number, remote_cmd):
+ """Called by init_connection, establish routing, conn dict"""
+ Globals.connection_dict[conn_number] = conn
+
+ conn.SetConnections.init_connection_remote(conn_number)
+ for other_remote_conn in Globals.connections[1:]:
+ conn.SetConnections.add_redirected_conn(
+ other_remote_conn.conn_number)
+ other_remote_conn.SetConnections.add_redirected_conn(conn_number)
+
+ Globals.connections.append(conn)
+ cls.__conn_remote_cmds.append(remote_cmd)
+
+ def init_connection_settings(cls, conn):
+ """Tell new conn about log settings and updated globals"""
+ conn.Log.setverbosity(Log.verbosity)
+ conn.Log.setterm_verbosity(Log.term_verbosity)
+ for setting_name in Globals.changed_settings:
+ conn.Globals.set(setting_name, Globals.get(setting_name))
+
+ def init_connection_remote(cls, conn_number):
+ """Run on server side to tell self that have given conn_number"""
+ Globals.connection_number = conn_number
+ Globals.local_connection.conn_number = conn_number
+ Globals.connection_dict[0] = Globals.connections[1]
+ Globals.connection_dict[conn_number] = Globals.local_connection
+
+ def add_redirected_conn(cls, conn_number):
+ """Run on server side - tell about redirected connection"""
+ Globals.connection_dict[conn_number] = \
+ RedirectedConnection(conn_number)
+
+ def UpdateGlobal(cls, setting_name, val):
+ """Update value of global variable across all connections"""
+ for conn in Globals.connections:
+ conn.Globals.set(setting_name, val)
+
+ def BackupInitConnections(cls, reading_conn, writing_conn):
+ """Backup specific connection initialization"""
+ reading_conn.Globals.set("isbackup_reader", 1)
+ writing_conn.Globals.set("isbackup_writer", 1)
+ cls.UpdateGlobal("backup_reader", reading_conn)
+ cls.UpdateGlobal("backup_writer", writing_conn)
+
+
+ def CloseConnections(cls):
+ """Close all connections. Run by client"""
+ assert not Globals.server
+ for conn in Globals.connections: conn.quit()
+ del Globals.connections[1:] # Only leave local connection
+ Globals.connection_dict = {0: Globals.local_connection}
+ Globals.backup_reader = Globals.isbackup_reader = \
+ Globals.backup_writer = Globals.isbackup_writer = None
+
+ def TestConnections(cls):
+ """Test connections, printing results"""
+ if len(Globals.connections) == 1:
+ print "No remote connections specified"
+ else:
+ for i in range(1, len(Globals.connections)):
+ cls.test_connection(i)
+
+ def test_connection(cls, conn_number):
+ """Test connection. conn_number 0 is the local connection"""
+ print "Testing server started by: ", \
+ cls.__conn_remote_cmds[conn_number]
+ conn = Globals.connections[conn_number]
+ try:
+ assert conn.pow(2,3) == 8
+ assert conn.os.path.join("a", "b") == "a/b"
+ version = conn.reval("lambda: Globals.version")
+ except:
+ sys.stderr.write("Server tests failed\n")
+ raise
+ if not version == Globals.version:
+ print """Server may work, but there is a version mismatch:
+Local version: %s
+Remote version: %s""" % (Globals.version, version)
+ else: print "Server OK"
+
+MakeClass(SetConnections)
diff --git a/rdiff-backup/src/static.py b/rdiff-backup/src/static.py
new file mode 100644
index 0000000..2e97cd0
--- /dev/null
+++ b/rdiff-backup/src/static.py
@@ -0,0 +1,30 @@
+execfile("globals.py")
+
+#######################################################################
+#
+# static - MakeStatic and MakeClass
+#
+# These functions are used to make all the instance methods in a class
+# into static or class methods.
+#
+
+class StaticMethodsError(Exception):
+ pass
+
+def MakeStatic(cls):
+ """turn instance methods into static ones
+
+ The methods (that don't begin with _) of any class that
+ subclasses this will be turned into static methods.
+
+ """
+ for name in dir(cls):
+ if name[0] != "_":
+ cls.__dict__[name] = staticmethod(cls.__dict__[name])
+
+
+def MakeClass(cls):
+ """Turn instance methods into classmethods. Ignore _ like above"""
+ for name in dir(cls):
+ if name[0] != "_":
+ cls.__dict__[name] = classmethod(cls.__dict__[name])
diff --git a/rdiff-backup/src/ttime.py b/rdiff-backup/src/ttime.py
new file mode 100644
index 0000000..c8bb58e
--- /dev/null
+++ b/rdiff-backup/src/ttime.py
@@ -0,0 +1,129 @@
+execfile("log.py")
+import time, types
+
+#######################################################################
+#
+# ttime - Provide Time class, which contains time related functions.
+#
+
+class TimeException(Exception): pass
+
+class Time:
+ """Functions which act on the time"""
+ _interval_conv_dict = {"s": 1, "m": 60, "h": 3600,
+ "D": 86400, "M": 30*86400, "Y": 365*86400}
+
+ def setcurtime(cls, curtime = None):
+ """Sets the current time in curtime and curtimestr on all systems"""
+ t = curtime or time.time()
+ for conn in Globals.connections:
+ conn.Time.setcurtime_local(t, cls.timetostring(t))
+
+ def setcurtime_local(cls, timeinseconds, timestr):
+ """Only set the current time locally"""
+ cls.curtime = timeinseconds
+ cls.curtimestr = timestr
+
+ def setprevtime(cls, timeinseconds):
+ """Sets the previous inc time in prevtime and prevtimestr"""
+ assert timeinseconds > 0, timeinseconds
+ timestr = cls.timetostring(timeinseconds)
+ for conn in Globals.connections:
+ conn.Time.setprevtime_local(timeinseconds, timestr)
+
+ def setprevtime_local(cls, timeinseconds, timestr):
+ """Like setprevtime but only set the local version"""
+ cls.prevtime = timeinseconds
+ cls.prevtimestr = timestr
+
+ def timetostring(cls, timeinseconds):
+ """Return w3 datetime compliant listing of timeinseconds"""
+ return time.strftime("%Y-%m-%dT%H" + Globals.time_separator +
+ "%M" + Globals.time_separator + "%S",
+ time.localtime(timeinseconds)) + cls.gettzd()
+
+ def stringtotime(cls, timestring):
+ """Return time in seconds from w3 timestring
+
+ If there is an error parsing the string, or it doesn't look
+ like a w3 datetime string, return None.
+
+ """
+ try:
+ date, daytime = timestring[:19].split("T")
+ year, month, day = map(int, date.split("-"))
+ hour, minute, second = map(int,
+ daytime.split(Globals.time_separator))
+ assert 1900 < year < 2100, year
+ assert 1 <= month <= 12
+ assert 1 <= day <= 31
+ assert 0 <= hour <= 23
+ assert 0 <= minute <= 59
+ assert 0 <= second <= 61 # leap seconds
+ timetuple = (year, month, day, hour, minute, second, -1, -1, -1)
+ if time.daylight:
+ utc_in_secs = time.mktime(timetuple) - time.altzone
+ else: utc_in_secs = time.mktime(timetuple) - time.timezone
+
+ return utc_in_secs + cls.tzdtoseconds(timestring[19:])
+ except (TypeError, ValueError, AssertionError): return None
+
+ def timetopretty(cls, timeinseconds):
+ """Return pretty version of time"""
+ return time.asctime(time.localtime(timeinseconds))
+
+ def stringtopretty(cls, timestring):
+ """Return pretty version of time given w3 time string"""
+ return cls.timetopretty(cls.stringtotime(timestring))
+
+ def intstringtoseconds(cls, interval_string):
+ """Convert a string expressing an interval to seconds"""
+ def error():
+ raise TimeException('Bad interval string "%s"' % interval_string)
+ if len(interval_string) < 2: error()
+ try: num, ext = int(interval_string[:-1]), interval_string[-1]
+ except ValueError: error()
+ if not ext in cls._interval_conv_dict or num < 0: error()
+ return num*cls._interval_conv_dict[ext]
+
+ def gettzd(cls):
+ """Return w3's timezone identification string.
+
+ Expresed as [+/-]hh:mm. For instance, PST is -08:00. Zone is
+ coincides with what localtime(), etc., use.
+
+ """
+ if time.daylight: offset = -1 * time.altzone/60
+ else: offset = -1 * time.timezone/60
+ if offset > 0: prefix = "+"
+ elif offset < 0: prefix = "-"
+ else: return "Z" # time is already in UTC
+
+ hours, minutes = map(abs, divmod(offset, 60))
+ assert 0 <= hours <= 23
+ assert 0 <= minutes <= 59
+ return "%s%02d%s%02d" % (prefix, hours,
+ Globals.time_separator, minutes)
+
+ def tzdtoseconds(cls, tzd):
+ """Given w3 compliant TZD, return how far ahead UTC is"""
+ if tzd == "Z": return 0
+ assert len(tzd) == 6 # only accept forms like +08:00 for now
+ assert (tzd[0] == "-" or tzd[0] == "+") and \
+ tzd[3] == Globals.time_separator
+ return -60 * (60 * int(tzd[:3]) + int(tzd[4:]))
+
+ def cmp(cls, time1, time2):
+ """Compare time1 and time2 and return -1, 0, or 1"""
+ if type(time1) is types.StringType:
+ time1 = cls.stringtotime(time1)
+ assert time1 is not None
+ if type(time2) is types.StringType:
+ time2 = cls.stringtotime(time2)
+ assert time2 is not None
+
+ if time1 < time2: return -1
+ elif time1 == time2: return 0
+ else: return 1
+
+MakeClass(Time)