summaryrefslogtreecommitdiff
path: root/rdiff-backup
diff options
context:
space:
mode:
Diffstat (limited to 'rdiff-backup')
-rw-r--r--rdiff-backup/CHANGELOG4
-rw-r--r--rdiff-backup/TODO5
-rw-r--r--rdiff-backup/rdiff_backup/Globals.py2
-rw-r--r--rdiff-backup/rdiff_backup/Main.py3
-rw-r--r--rdiff-backup/rdiff_backup/backup.py67
-rw-r--r--rdiff-backup/rdiff_backup/connection.py11
-rw-r--r--rdiff-backup/rdiff_backup/iterfile.py197
-rw-r--r--rdiff-backup/rdiff_backup/rorpiter.py42
-rw-r--r--rdiff-backup/testing/iterfiletest.py99
9 files changed, 323 insertions, 107 deletions
diff --git a/rdiff-backup/CHANGELOG b/rdiff-backup/CHANGELOG
index 2c83469..6c6e9f7 100644
--- a/rdiff-backup/CHANGELOG
+++ b/rdiff-backup/CHANGELOG
@@ -1,4 +1,4 @@
-New in v0.11.3 (2003/04/01)
+New in v0.11.3 (2003/03/04)
---------------------------
Fixed a number of bugs reported by Olivier Mueller:
@@ -13,6 +13,8 @@ Fixed a number of bugs reported by Olivier Mueller:
--print-statistics option works again (before it would silently
ignored).
+ Fixed cache pipeline overflow bug. This error could appear on
+ large remote backups when many files have not changed.
New in v0.11.2 (2003/03/01)
diff --git a/rdiff-backup/TODO b/rdiff-backup/TODO
index 0f18b34..ca48674 100644
--- a/rdiff-backup/TODO
+++ b/rdiff-backup/TODO
@@ -1,8 +1,3 @@
----------[ Short term (next version) ]-------------------------
-
-Fix --print-statistics
-
-
---------[ Medium term ]---------------------------------------
Look at Kent Borg's suggestion for restore options and digests.
diff --git a/rdiff-backup/rdiff_backup/Globals.py b/rdiff-backup/rdiff_backup/Globals.py
index b9045f7..359e351 100644
--- a/rdiff-backup/rdiff_backup/Globals.py
+++ b/rdiff-backup/rdiff_backup/Globals.py
@@ -40,7 +40,7 @@ conn_bufsize = 98304
# This is used in rorpiter.CacheIndexable. The number represents the
# number of rpaths which may be stuck in buffers when moving over a
# remote connection.
-pipeline_max_length = int(conn_bufsize / 150)*2
+pipeline_max_length = 500
# True if script is running as a server
server = None
diff --git a/rdiff-backup/rdiff_backup/Main.py b/rdiff-backup/rdiff_backup/Main.py
index 5f6cbbc..391e848 100644
--- a/rdiff-backup/rdiff_backup/Main.py
+++ b/rdiff-backup/rdiff_backup/Main.py
@@ -601,5 +601,6 @@ def checkdest_if_necessary(dest_rp):
"""
need_check = checkdest_need_check(dest_rp)
if need_check == 1:
- Log("Previous backup seems to have failed, checking now.", 2)
+ Log("Previous backup seems to have failed, regressing "
+ "destination now.", 2)
dest_rp.conn.regress.Regress(dest_rp)
diff --git a/rdiff-backup/rdiff_backup/backup.py b/rdiff-backup/rdiff_backup/backup.py
index 0ee782c..0ff5ace 100644
--- a/rdiff-backup/rdiff_backup/backup.py
+++ b/rdiff-backup/rdiff_backup/backup.py
@@ -22,7 +22,7 @@
from __future__ import generators
import errno
import Globals, metadata, rorpiter, TempFile, Hardlink, robust, increment, \
- rpath, static, log, selection, Time, Rdiff, statistics
+ rpath, static, log, selection, Time, Rdiff, statistics, iterfile
def Mirror(src_rpath, dest_rpath):
"""Turn dest_rpath into a copy of src_rpath"""
@@ -65,7 +65,7 @@ class SourceStruct:
sel = selection.Select(rpath)
sel.ParseArgs(tuplelist, filelists)
sel.set_iter()
- cache_size = Globals.pipeline_max_length * 2 # 2 because to and from
+ cache_size = Globals.pipeline_max_length * 3 # to and from+leeway
cls.source_select = rorpiter.CacheIndexable(sel, cache_size)
def get_source_select(cls):
@@ -96,6 +96,9 @@ class SourceStruct:
diff_rorp.set_attached_filetype('snapshot')
for dest_sig in dest_sigiter:
+ if dest_sig is iterfile.RORPIterFlushRepeat:
+ yield iterfile.RORPIterFlush # Flush buffer when get_sigs does
+ continue
src_rp = (source_rps.get(dest_sig.index) or
rpath.RORPath(dest_sig.index))
diff_rorp = src_rp.getRORPath()
@@ -139,29 +142,48 @@ class DestinationStruct:
"""
dest_iter = cls.get_dest_select(baserp, for_increment)
collated = rorpiter.Collate2Iters(source_iter, dest_iter)
- cls.CCPP = CacheCollatedPostProcess(collated,
- Globals.pipeline_max_length*2)
+ cls.CCPP = CacheCollatedPostProcess(
+ collated, Globals.pipeline_max_length*4)
+ # pipeline len adds some leeway over just*3 (to and from and back)
def get_sigs(cls, dest_base_rpath):
- """Yield signatures of any changed destination files"""
+ """Yield signatures of any changed destination files
+
+ If we are backing up across a pipe, we must flush the pipeline
+ every so often so it doesn't get congested on destination end.
+
+ """
+ flush_threshold = int(Globals.pipeline_max_length/2)
+ num_rorps_skipped = 0
for src_rorp, dest_rorp in cls.CCPP:
if (src_rorp and dest_rorp and src_rorp == dest_rorp and
(not Globals.preserve_hardlinks or
- Hardlink.rorp_eq(src_rorp, dest_rorp))): continue
- index = src_rorp and src_rorp.index or dest_rorp.index
- cls.CCPP.flag_changed(index)
- if (Globals.preserve_hardlinks and
- Hardlink.islinked(src_rorp or dest_rorp)):
- dest_sig = rpath.RORPath(index)
- dest_sig.flaglinked(Hardlink.get_link_index(dest_sig))
- elif dest_rorp:
- dest_sig = dest_rorp.getRORPath()
- if dest_rorp.isreg():
- dest_rp = dest_base_rpath.new_index(index)
- assert dest_rp.isreg()
- dest_sig.setfile(Rdiff.get_signature(dest_rp))
- else: dest_sig = rpath.RORPath(index)
- yield dest_sig
+ Hardlink.rorp_eq(src_rorp, dest_rorp))):
+ num_rorps_skipped += 1
+ if (Globals.backup_reader is not Globals.backup_writer and
+ num_rorps_skipped > flush_threshold):
+ num_rorps_skipped = 0
+ yield iterfile.RORPIterFlushRepeat
+ else:
+ index = src_rorp and src_rorp.index or dest_rorp.index
+ cls.CCPP.flag_changed(index)
+ yield cls.get_one_sig(dest_base_rpath, index,
+ src_rorp, dest_rorp)
+
+ def get_one_sig(cls, dest_base_rpath, index, src_rorp, dest_rorp):
+ """Return a signature given source and destination rorps"""
+ if (Globals.preserve_hardlinks and
+ Hardlink.islinked(src_rorp or dest_rorp)):
+ dest_sig = rpath.RORPath(index)
+ dest_sig.flaglinked(Hardlink.get_link_index(dest_sig))
+ elif dest_rorp:
+ dest_sig = dest_rorp.getRORPath()
+ if dest_rorp.isreg():
+ dest_rp = dest_base_rpath.new_index(index)
+ assert dest_rp.isreg()
+ dest_sig.setfile(Rdiff.get_signature(dest_rp))
+ else: dest_sig = rpath.RORPath(index)
+ return dest_sig
def patch(cls, dest_rpath, source_diffiter, start_index = ()):
"""Patch dest_rpath with an rorpiter of diffs"""
@@ -301,6 +323,9 @@ class CacheCollatedPostProcess:
def get_source_rorp(self, index):
"""Retrieve source_rorp with given index from cache"""
+ assert index >= self.cache_indicies[0], \
+ ("CCPP index out of order: %s %s" %
+ (repr(index), repr(self.cache_indicies[0])))
return self.cache_dict[index][0]
def get_mirror_rorp(self, index):
@@ -388,7 +413,7 @@ class PatchITRB(rorpiter.ITRBranch):
"""
if not new_rp.isreg(): return 1
cached_rorp = self.CCPP.get_source_rorp(diff_rorp.index)
- if cached_rorp.equal_loose(new_rp): return 1
+ if cached_rorp and cached_rorp.equal_loose(new_rp): return 1
log.ErrorLog.write_if_open("UpdateError", diff_rorp, "Updated mirror "
"temp file %s does not match source" % (new_rp.path,))
return 0
diff --git a/rdiff-backup/rdiff_backup/connection.py b/rdiff-backup/rdiff_backup/connection.py
index c1d2f70..05aef20 100644
--- a/rdiff-backup/rdiff_backup/connection.py
+++ b/rdiff-backup/rdiff_backup/connection.py
@@ -144,7 +144,8 @@ class LowLevelPipeConnection(Connection):
def _putiter(self, iterator, req_num):
"""Put an iterator through the pipe"""
- self._write("i", str(VirtualFile.new(rorpiter.ToFile(iterator))),
+ self._write("i",
+ str(VirtualFile.new(iterfile.RORPIterToFile(iterator))),
req_num)
def _putrpath(self, rpath, req_num):
@@ -226,8 +227,7 @@ class LowLevelPipeConnection(Connection):
elif format_string == "b": result = data
elif format_string == "f": result = VirtualFile(self, int(data))
elif format_string == "i":
- result = rorpiter.FromFile(iterfile.BufferedRead(
- VirtualFile(self, int(data))))
+ result = iterfile.FileToRORPIter(VirtualFile(self, int(data)))
elif format_string == "r": result = self._getrorpath(data)
elif format_string == "R": result = self._getrpath(data)
else:
@@ -456,7 +456,8 @@ class VirtualFile:
getbyid = classmethod(getbyid)
def readfromid(cls, id, length):
- return cls.vfiles[id].read(length)
+ if length is None: return cls.vfiles[id].read()
+ else: return cls.vfiles[id].read(length)
readfromid = classmethod(readfromid)
def readlinefromid(cls, id):
@@ -487,7 +488,7 @@ class VirtualFile:
self.connection = connection
self.id = id
- def read(self, length = -1):
+ def read(self, length = None):
return self.connection.VirtualFile.readfromid(self.id, length)
def readline(self):
diff --git a/rdiff-backup/rdiff_backup/iterfile.py b/rdiff-backup/rdiff_backup/iterfile.py
index 9d52d2f..0ae998e 100644
--- a/rdiff-backup/rdiff_backup/iterfile.py
+++ b/rdiff-backup/rdiff_backup/iterfile.py
@@ -19,8 +19,8 @@
"""Convert an iterator to a file object and vice-versa"""
-import cPickle, array
-import Globals, C, robust, log
+import cPickle, array, types
+import Globals, C, robust, log, rpath
class IterFileException(Exception): pass
@@ -129,7 +129,7 @@ class IterVirtualFile(UnwrapFile):
def addtobuffer(self):
"""Read a chunk from the file and add it to the buffer"""
assert self.iwf.currently_in_file
- type, data = self._get()
+ type, data = self.iwf._get()
if type == "e":
self.iwf.currently_in_file = None
raise data
@@ -238,32 +238,183 @@ class FileWrappingIter:
def close(self): self.closed = 1
-class BufferedRead:
- """Buffer the .read() calls to the given file
+class RORPIterFlush:
+ """Used to signal that a RORPIterToFile should flush buffer"""
+ pass
+
+class RORPIterFlushRepeat(RORPIterFlush):
+ """Flush, but then cause RORPIter to yield this same object
- This is used to lessen overhead and latency when a file is sent
- over a connection. Profiling said that arrays were faster than
- strings here.
+ Thus if we put together a pipeline of these, one RORPIterContFlush
+ can cause all the segments to flush in sequence.
"""
- def __init__(self, file):
- self.file = file
- self.array_buf = array.array('c')
- self.bufsize = Globals.conn_bufsize
+ pass
+
+class RORPIterToFile(FileWrappingIter):
+ """Take a RORPIter and give it a file-ish interface
+
+ This is how we send signatures and diffs across the line. As
+ sending each one separately via a read() call would result in a
+ lot of latency, the read()'s are buffered - a read() call with no
+ arguments will return a variable length string (possibly empty).
+
+ To flush the RORPIterToFile, have the iterator yield a
+ RORPIterFlush class.
+
+ """
+ def __init__(self, rpiter, max_buffer_bytes = None, max_buffer_rps = None):
+ """RORPIterToFile initializer
+
+ max_buffer_bytes is the maximum size of the buffer in bytes.
+ max_buffer_rps is the maximum size of the buffer in rorps.
- def read(self, l = -1):
- array_buf = self.array_buf
- if l < 0: # Read as much as possible
- result = array_buf.tostring() + self.file.read()
- del array_buf[:]
+ """
+ self.max_buffer_bytes = max_buffer_bytes or Globals.conn_bufsize
+ self.max_buffer_rps = max_buffer_rps or Globals.pipeline_max_length
+ self.rorps_in_buffer = 0
+ self.next_in_line = None
+ FileWrappingIter.__init__(self, rpiter)
+
+ def read(self, length = None):
+ """Return some number of bytes, including 0"""
+ assert not self.closed
+ if length is None:
+ while (len(self.array_buf) < self.max_buffer_bytes and
+ self.rorps_in_buffer < self.max_buffer_rps):
+ if not self.addtobuffer(): break
+
+ result = self.array_buf.tostring()
+ del self.array_buf[:]
+ self.rorps_in_buffer = 0
return result
+ else:
+ assert length >= 0
+ read_buffer = self.read()
+ while len(read_buffer) < length: read_buffer += self.read()
+ self.array_buf.fromstring(read_buffer[length:])
+ return read_buffer[length:]
- if len(array_buf) < l: # Try to make buffer at least as long as l
- array_buf.fromstring(self.file.read(max(self.bufsize, l)))
- result = array_buf[:l].tostring()
- del array_buf[:l]
- return result
+ def addtobuffer(self):
+ """Add some number of bytes to the buffer. Return false if done"""
+ if self.currently_in_file:
+ self.addfromfile("c")
+ if not self.currently_in_file: self.rorps_in_buffer += 1
+ else:
+ if self.next_in_line:
+ currentobj = self.next_in_line
+ self.next_in_line = 0
+ else:
+ try: currentobj = self.iter.next()
+ except StopIteration:
+ self.addfinal()
+ return None
- def close(self): return self.file.close()
+ if hasattr(currentobj, "read") and hasattr(currentobj, "close"):
+ self.currently_in_file = currentobj
+ self.addfromfile("f")
+ elif (type(currentobj) is types.ClassType and
+ issubclass(currentobj, iterfile.RORPIterFlush)):
+ if currentobj is iterfile.RORPIterFlushRepeat:
+ self.add_flush_repeater()
+ return None
+ else: self.addrorp(currentobj)
+ return 1
+
+ def add_flush_repeater(self):
+ """Add a RORPIterFlushRepeat object to the buffer"""
+ pickle = cPickle.dumps(iterfile.RORPIterFlushRepeat, 1)
+ self.array_buf.fromstring("o")
+ self.array_buf.fromstring(C.long2str(long(len(pickle))))
+ self.array_buf.fromstring(pickle)
+
+ def addrorp(self, rorp):
+ """Add a rorp to the buffer"""
+ if rorp.file:
+ pickle = cPickle.dumps((rorp.index, rorp.data, 1), 1)
+ self.next_in_line = rorp.file
+ else:
+ pickle = cPickle.dumps((rorp.index, rorp.data, 0), 1)
+ self.rorps_in_buffer += 1
+ self.array_buf.fromstring("o")
+ self.array_buf.fromstring(C.long2str(long(len(pickle))))
+ self.array_buf.fromstring(pickle)
+
+ def addfinal(self):
+ """Signal the end of the iterator to the other end"""
+ self.array_buf.fromstring("z")
+ self.array_buf.fromstring(C.long2str(0L))
+
+ def close(self): self.closed = 1
+class FileToRORPIter(IterWrappingFile):
+ """Take a RORPIterToFile and turn it back into a RORPIter"""
+ def __init__(self, file):
+ IterWrappingFile.__init__(self, file)
+ self.buf = ""
+
+ def __iter__(self): return self
+
+ def next(self):
+ """Return next object in iter, or raise StopIteration"""
+ if self.currently_in_file:
+ self.currently_in_file.close()
+ type = None
+ while not type: type, data = self._get()
+ if type == "z": raise StopIteration
+ elif type == "o":
+ if data is iterfile.RORPIterFlushRepeat: return data
+ else: return self.get_rorp(data)
+ else: raise IterFileException("Bad file type %s" % (type,))
+
+ def get_rorp(self, pickled_tuple):
+ """Return rorp that data represents"""
+ index, data_dict, num_files = pickled_tuple
+ rorp = rpath.RORPath(index, data_dict)
+ if num_files:
+ assert num_files == 1, "Only one file accepted right now"
+ rorp.setfile(self.get_file())
+ return rorp
+
+ def get_file(self):
+ """Read file object from file"""
+ type, data = self._get()
+ if type == "f":
+ file = IterVirtualFile(self, data)
+ if data: self.currently_in_file = file
+ else: self.currently_in_file = None
+ return file
+ assert type == "e", "Expected type e, got %s" % (type,)
+ assert isinstance(data, Exception)
+ return ErrorFile(data)
+
+ def _get(self):
+ """Return (type, data or object) pair
+
+ This is like UnwrapFile._get() but reads in variable length
+ blocks. Also type "z" is allowed, which means end of
+ iterator. An empty read() is not considered to mark the end
+ of remote iter.
+
+ """
+ if not self.buf: self.buf += self.file.read()
+ if not self.buf: return None, None
+
+ assert len(self.buf) >= 8, "Unexpected end of RORPIter file"
+ type, length = self.buf[0], C.str2long(self.buf[1:8])
+ data = self.buf[8:8+length]
+ self.buf = self.buf[8+length:]
+ if type == "o" or type == "e": return type, cPickle.loads(data)
+ else: return type, data
+
+
+class ErrorFile:
+ """File-like that just raises error (used by FileToRORPIter above)"""
+ def __init__(self, exc):
+ """Initialize new ErrorFile. exc is the exception to raise on read"""
+ self.exc = exc
+ def read(self, l=-1): raise self.exc
+ def close(self): return None
+
+import iterfile
diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py
index fda06e5..7db4fea 100644
--- a/rdiff-backup/rdiff_backup/rorpiter.py
+++ b/rdiff-backup/rdiff_backup/rorpiter.py
@@ -33,48 +33,6 @@ import os, tempfile, UserList, types
import Globals, rpath, iterfile
-class RORPIterException(Exception): pass
-
-def ToRaw(rorp_iter):
- """Convert a rorp iterator to raw form"""
- for rorp in rorp_iter:
- if rorp.file:
- yield (rorp.index, rorp.data, 1)
- yield rorp.file
- else: yield (rorp.index, rorp.data, 0)
-
-def FromRaw(raw_iter):
- """Convert raw rorp iter back to standard form"""
- for index, data, num_files in raw_iter:
- rorp = rpath.RORPath(index, data)
- if num_files:
- assert num_files == 1, "Only one file accepted right now"
- rorp.setfile(get_next_file(raw_iter))
- yield rorp
-
-class ErrorFile:
- """Used by get_next_file below, file-like that just raises error"""
- def __init__(self, exc):
- """Initialize new ErrorFile. exc is the exception to raise on read"""
- self.exc = exc
- def read(self, l=-1): raise self.exc
- def close(self): return None
-
-def get_next_file(iter):
- """Return the next element of an iterator, raising error if none"""
- try: next = iter.next()
- except StopIteration: raise RORPIterException("Unexpected end to iter")
- if isinstance(next, Exception): return ErrorFile(next)
- return next
-
-def ToFile(rorp_iter):
- """Return file version of iterator"""
- return iterfile.FileWrappingIter(ToRaw(rorp_iter))
-
-def FromFile(fileobj):
- """Recover rorp iterator from file interface"""
- return FromRaw(iterfile.IterWrappingFile(fileobj))
-
def CollateIterators(*rorp_iters):
"""Collate RORPath iterators by index
diff --git a/rdiff-backup/testing/iterfiletest.py b/rdiff-backup/testing/iterfiletest.py
index c8c77c6..2c357ef 100644
--- a/rdiff-backup/testing/iterfiletest.py
+++ b/rdiff-backup/testing/iterfiletest.py
@@ -52,15 +52,98 @@ class testIterFile(unittest.TestCase):
assert new_iter.next() == "foo"
self.assertRaises(StopIteration, new_iter.next)
+
+class testRORPIters(unittest.TestCase):
+ """Test sending rorpiter back and forth"""
+ def setUp(self):
+ """Make testfiles/output directory and a few files"""
+ Myrm("testfiles/output")
+ self.outputrp = rpath.RPath(Globals.local_connection,
+ "testfiles/output")
+ self.regfile1 = self.outputrp.append("reg1")
+ self.regfile2 = self.outputrp.append("reg2")
+ self.regfile3 = self.outputrp.append("reg3")
+
+ self.outputrp.mkdir()
+
+ fp = self.regfile1.open("wb")
+ fp.write("hello")
+ fp.close()
+ self.regfile1.setfile(self.regfile1.open("rb"))
+
+ self.regfile2.touch()
+ self.regfile2.setfile(self.regfile2.open("rb"))
+
+ fp = self.regfile3.open("wb")
+ fp.write("goodbye")
+ fp.close()
+ self.regfile3.setfile(self.regfile3.open("rb"))
+
+ self.regfile1.setdata()
+ self.regfile2.setdata()
+ self.regfile3.setdata()
-class testBufferedRead(unittest.TestCase):
- def testBuffering(self):
- """Test buffering a StringIO"""
- fp = StringIO.StringIO("12345678"*10000)
- bfp = BufferedRead(fp)
- assert bfp.read(5) == "12345"
- assert bfp.read(4) == "6781"
- assert len(bfp.read(75000)) == 75000
+ def print_RORPIterFile(self, rpiter_file):
+ """Print the given rorpiter file"""
+ while 1:
+ buf = rpiter_file.read()
+ sys.stdout.write(buf)
+ if buf[0] == "z": break
+
+ def testBasic(self):
+ """Test basic conversion"""
+ l = [self.outputrp, self.regfile1, self.regfile2, self.regfile3]
+ i_out = FileToRORPIter(RORPIterToFile(iter(l)))
+
+ out1 = i_out.next()
+ assert out1 == self.outputrp
+
+ out2 = i_out.next()
+ assert out2 == self.regfile1
+ fp = out2.open("rb")
+ assert fp.read() == "hello"
+ assert not fp.close()
+
+ out3 = i_out.next()
+ assert out3 == self.regfile2
+ fp = out3.open("rb")
+ assert fp.read() == ""
+ assert not fp.close()
+
+ i_out.next()
+ self.assertRaises(StopIteration, i_out.next)
+
+ def testFlush(self):
+ """Test flushing property of RORPIterToFile"""
+ l = [self.outputrp, RORPIterFlush, self.outputrp]
+ filelike = RORPIterToFile(iter(l))
+ new_filelike = StringIO.StringIO((filelike.read() + "z" +
+ C.long2str(0L)))
+
+ i_out = FileToRORPIter(new_filelike)
+ assert i_out.next() == self.outputrp
+ self.assertRaises(StopIteration, i_out.next)
+
+ i_out2 = FileToRORPIter(filelike)
+ assert i_out2.next() == self.outputrp
+ self.assertRaises(StopIteration, i_out2.next)
+
+ def testFlushRepeat(self):
+ """Test flushing like above, but have Flush obj emerge from iter"""
+ l = [self.outputrp, RORPIterFlushRepeat, self.outputrp]
+ filelike = RORPIterToFile(iter(l))
+ new_filelike = StringIO.StringIO((filelike.read() + "z" +
+ C.long2str(0L)))
+
+ i_out = FileToRORPIter(new_filelike)
+ assert i_out.next() == self.outputrp
+ assert i_out.next() is RORPIterFlushRepeat
+ self.assertRaises(StopIteration, i_out.next)
+
+ i_out2 = FileToRORPIter(filelike)
+ assert i_out2.next() == self.outputrp
+ self.assertRaises(StopIteration, i_out2.next)
+
if __name__ == "__main__": unittest.main()