From 1d98cd8af3761cb473fe4ebbbdee60109a1b757a Mon Sep 17 00:00:00 2001 From: bescoto Date: Tue, 4 Mar 2003 21:10:38 +0000 Subject: Final checkin for 0.11.3 (fixed pipeline flushing problem) git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@292 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/CHANGELOG | 4 +- rdiff-backup/TODO | 5 - rdiff-backup/rdiff_backup/Globals.py | 2 +- rdiff-backup/rdiff_backup/Main.py | 3 +- rdiff-backup/rdiff_backup/backup.py | 67 +++++++---- rdiff-backup/rdiff_backup/connection.py | 11 +- rdiff-backup/rdiff_backup/iterfile.py | 197 ++++++++++++++++++++++++++++---- rdiff-backup/rdiff_backup/rorpiter.py | 42 ------- rdiff-backup/testing/iterfiletest.py | 99 ++++++++++++++-- 9 files changed, 323 insertions(+), 107 deletions(-) diff --git a/rdiff-backup/CHANGELOG b/rdiff-backup/CHANGELOG index 2c83469..6c6e9f7 100644 --- a/rdiff-backup/CHANGELOG +++ b/rdiff-backup/CHANGELOG @@ -1,4 +1,4 @@ -New in v0.11.3 (2003/04/01) +New in v0.11.3 (2003/03/04) --------------------------- Fixed a number of bugs reported by Olivier Mueller: @@ -13,6 +13,8 @@ Fixed a number of bugs reported by Olivier Mueller: --print-statistics option works again (before it would silently ignored). + Fixed cache pipeline overflow bug. This error could appear on + large remote backups when many files have not changed. New in v0.11.2 (2003/03/01) diff --git a/rdiff-backup/TODO b/rdiff-backup/TODO index 0f18b34..ca48674 100644 --- a/rdiff-backup/TODO +++ b/rdiff-backup/TODO @@ -1,8 +1,3 @@ ----------[ Short term (next version) ]------------------------- - -Fix --print-statistics - - ---------[ Medium term ]--------------------------------------- Look at Kent Borg's suggestion for restore options and digests. diff --git a/rdiff-backup/rdiff_backup/Globals.py b/rdiff-backup/rdiff_backup/Globals.py index b9045f7..359e351 100644 --- a/rdiff-backup/rdiff_backup/Globals.py +++ b/rdiff-backup/rdiff_backup/Globals.py @@ -40,7 +40,7 @@ conn_bufsize = 98304 # This is used in rorpiter.CacheIndexable. The number represents the # number of rpaths which may be stuck in buffers when moving over a # remote connection. -pipeline_max_length = int(conn_bufsize / 150)*2 +pipeline_max_length = 500 # True if script is running as a server server = None diff --git a/rdiff-backup/rdiff_backup/Main.py b/rdiff-backup/rdiff_backup/Main.py index 5f6cbbc..391e848 100644 --- a/rdiff-backup/rdiff_backup/Main.py +++ b/rdiff-backup/rdiff_backup/Main.py @@ -601,5 +601,6 @@ def checkdest_if_necessary(dest_rp): """ need_check = checkdest_need_check(dest_rp) if need_check == 1: - Log("Previous backup seems to have failed, checking now.", 2) + Log("Previous backup seems to have failed, regressing " + "destination now.", 2) dest_rp.conn.regress.Regress(dest_rp) diff --git a/rdiff-backup/rdiff_backup/backup.py b/rdiff-backup/rdiff_backup/backup.py index 0ee782c..0ff5ace 100644 --- a/rdiff-backup/rdiff_backup/backup.py +++ b/rdiff-backup/rdiff_backup/backup.py @@ -22,7 +22,7 @@ from __future__ import generators import errno import Globals, metadata, rorpiter, TempFile, Hardlink, robust, increment, \ - rpath, static, log, selection, Time, Rdiff, statistics + rpath, static, log, selection, Time, Rdiff, statistics, iterfile def Mirror(src_rpath, dest_rpath): """Turn dest_rpath into a copy of src_rpath""" @@ -65,7 +65,7 @@ class SourceStruct: sel = selection.Select(rpath) sel.ParseArgs(tuplelist, filelists) sel.set_iter() - cache_size = Globals.pipeline_max_length * 2 # 2 because to and from + cache_size = Globals.pipeline_max_length * 3 # to and from+leeway cls.source_select = rorpiter.CacheIndexable(sel, cache_size) def get_source_select(cls): @@ -96,6 +96,9 @@ class SourceStruct: diff_rorp.set_attached_filetype('snapshot') for dest_sig in dest_sigiter: + if dest_sig is iterfile.RORPIterFlushRepeat: + yield iterfile.RORPIterFlush # Flush buffer when get_sigs does + continue src_rp = (source_rps.get(dest_sig.index) or rpath.RORPath(dest_sig.index)) diff_rorp = src_rp.getRORPath() @@ -139,29 +142,48 @@ class DestinationStruct: """ dest_iter = cls.get_dest_select(baserp, for_increment) collated = rorpiter.Collate2Iters(source_iter, dest_iter) - cls.CCPP = CacheCollatedPostProcess(collated, - Globals.pipeline_max_length*2) + cls.CCPP = CacheCollatedPostProcess( + collated, Globals.pipeline_max_length*4) + # pipeline len adds some leeway over just*3 (to and from and back) def get_sigs(cls, dest_base_rpath): - """Yield signatures of any changed destination files""" + """Yield signatures of any changed destination files + + If we are backing up across a pipe, we must flush the pipeline + every so often so it doesn't get congested on destination end. + + """ + flush_threshold = int(Globals.pipeline_max_length/2) + num_rorps_skipped = 0 for src_rorp, dest_rorp in cls.CCPP: if (src_rorp and dest_rorp and src_rorp == dest_rorp and (not Globals.preserve_hardlinks or - Hardlink.rorp_eq(src_rorp, dest_rorp))): continue - index = src_rorp and src_rorp.index or dest_rorp.index - cls.CCPP.flag_changed(index) - if (Globals.preserve_hardlinks and - Hardlink.islinked(src_rorp or dest_rorp)): - dest_sig = rpath.RORPath(index) - dest_sig.flaglinked(Hardlink.get_link_index(dest_sig)) - elif dest_rorp: - dest_sig = dest_rorp.getRORPath() - if dest_rorp.isreg(): - dest_rp = dest_base_rpath.new_index(index) - assert dest_rp.isreg() - dest_sig.setfile(Rdiff.get_signature(dest_rp)) - else: dest_sig = rpath.RORPath(index) - yield dest_sig + Hardlink.rorp_eq(src_rorp, dest_rorp))): + num_rorps_skipped += 1 + if (Globals.backup_reader is not Globals.backup_writer and + num_rorps_skipped > flush_threshold): + num_rorps_skipped = 0 + yield iterfile.RORPIterFlushRepeat + else: + index = src_rorp and src_rorp.index or dest_rorp.index + cls.CCPP.flag_changed(index) + yield cls.get_one_sig(dest_base_rpath, index, + src_rorp, dest_rorp) + + def get_one_sig(cls, dest_base_rpath, index, src_rorp, dest_rorp): + """Return a signature given source and destination rorps""" + if (Globals.preserve_hardlinks and + Hardlink.islinked(src_rorp or dest_rorp)): + dest_sig = rpath.RORPath(index) + dest_sig.flaglinked(Hardlink.get_link_index(dest_sig)) + elif dest_rorp: + dest_sig = dest_rorp.getRORPath() + if dest_rorp.isreg(): + dest_rp = dest_base_rpath.new_index(index) + assert dest_rp.isreg() + dest_sig.setfile(Rdiff.get_signature(dest_rp)) + else: dest_sig = rpath.RORPath(index) + return dest_sig def patch(cls, dest_rpath, source_diffiter, start_index = ()): """Patch dest_rpath with an rorpiter of diffs""" @@ -301,6 +323,9 @@ class CacheCollatedPostProcess: def get_source_rorp(self, index): """Retrieve source_rorp with given index from cache""" + assert index >= self.cache_indicies[0], \ + ("CCPP index out of order: %s %s" % + (repr(index), repr(self.cache_indicies[0]))) return self.cache_dict[index][0] def get_mirror_rorp(self, index): @@ -388,7 +413,7 @@ class PatchITRB(rorpiter.ITRBranch): """ if not new_rp.isreg(): return 1 cached_rorp = self.CCPP.get_source_rorp(diff_rorp.index) - if cached_rorp.equal_loose(new_rp): return 1 + if cached_rorp and cached_rorp.equal_loose(new_rp): return 1 log.ErrorLog.write_if_open("UpdateError", diff_rorp, "Updated mirror " "temp file %s does not match source" % (new_rp.path,)) return 0 diff --git a/rdiff-backup/rdiff_backup/connection.py b/rdiff-backup/rdiff_backup/connection.py index c1d2f70..05aef20 100644 --- a/rdiff-backup/rdiff_backup/connection.py +++ b/rdiff-backup/rdiff_backup/connection.py @@ -144,7 +144,8 @@ class LowLevelPipeConnection(Connection): def _putiter(self, iterator, req_num): """Put an iterator through the pipe""" - self._write("i", str(VirtualFile.new(rorpiter.ToFile(iterator))), + self._write("i", + str(VirtualFile.new(iterfile.RORPIterToFile(iterator))), req_num) def _putrpath(self, rpath, req_num): @@ -226,8 +227,7 @@ class LowLevelPipeConnection(Connection): elif format_string == "b": result = data elif format_string == "f": result = VirtualFile(self, int(data)) elif format_string == "i": - result = rorpiter.FromFile(iterfile.BufferedRead( - VirtualFile(self, int(data)))) + result = iterfile.FileToRORPIter(VirtualFile(self, int(data))) elif format_string == "r": result = self._getrorpath(data) elif format_string == "R": result = self._getrpath(data) else: @@ -456,7 +456,8 @@ class VirtualFile: getbyid = classmethod(getbyid) def readfromid(cls, id, length): - return cls.vfiles[id].read(length) + if length is None: return cls.vfiles[id].read() + else: return cls.vfiles[id].read(length) readfromid = classmethod(readfromid) def readlinefromid(cls, id): @@ -487,7 +488,7 @@ class VirtualFile: self.connection = connection self.id = id - def read(self, length = -1): + def read(self, length = None): return self.connection.VirtualFile.readfromid(self.id, length) def readline(self): diff --git a/rdiff-backup/rdiff_backup/iterfile.py b/rdiff-backup/rdiff_backup/iterfile.py index 9d52d2f..0ae998e 100644 --- a/rdiff-backup/rdiff_backup/iterfile.py +++ b/rdiff-backup/rdiff_backup/iterfile.py @@ -19,8 +19,8 @@ """Convert an iterator to a file object and vice-versa""" -import cPickle, array -import Globals, C, robust, log +import cPickle, array, types +import Globals, C, robust, log, rpath class IterFileException(Exception): pass @@ -129,7 +129,7 @@ class IterVirtualFile(UnwrapFile): def addtobuffer(self): """Read a chunk from the file and add it to the buffer""" assert self.iwf.currently_in_file - type, data = self._get() + type, data = self.iwf._get() if type == "e": self.iwf.currently_in_file = None raise data @@ -238,32 +238,183 @@ class FileWrappingIter: def close(self): self.closed = 1 -class BufferedRead: - """Buffer the .read() calls to the given file +class RORPIterFlush: + """Used to signal that a RORPIterToFile should flush buffer""" + pass + +class RORPIterFlushRepeat(RORPIterFlush): + """Flush, but then cause RORPIter to yield this same object - This is used to lessen overhead and latency when a file is sent - over a connection. Profiling said that arrays were faster than - strings here. + Thus if we put together a pipeline of these, one RORPIterContFlush + can cause all the segments to flush in sequence. """ - def __init__(self, file): - self.file = file - self.array_buf = array.array('c') - self.bufsize = Globals.conn_bufsize + pass + +class RORPIterToFile(FileWrappingIter): + """Take a RORPIter and give it a file-ish interface + + This is how we send signatures and diffs across the line. As + sending each one separately via a read() call would result in a + lot of latency, the read()'s are buffered - a read() call with no + arguments will return a variable length string (possibly empty). + + To flush the RORPIterToFile, have the iterator yield a + RORPIterFlush class. + + """ + def __init__(self, rpiter, max_buffer_bytes = None, max_buffer_rps = None): + """RORPIterToFile initializer + + max_buffer_bytes is the maximum size of the buffer in bytes. + max_buffer_rps is the maximum size of the buffer in rorps. - def read(self, l = -1): - array_buf = self.array_buf - if l < 0: # Read as much as possible - result = array_buf.tostring() + self.file.read() - del array_buf[:] + """ + self.max_buffer_bytes = max_buffer_bytes or Globals.conn_bufsize + self.max_buffer_rps = max_buffer_rps or Globals.pipeline_max_length + self.rorps_in_buffer = 0 + self.next_in_line = None + FileWrappingIter.__init__(self, rpiter) + + def read(self, length = None): + """Return some number of bytes, including 0""" + assert not self.closed + if length is None: + while (len(self.array_buf) < self.max_buffer_bytes and + self.rorps_in_buffer < self.max_buffer_rps): + if not self.addtobuffer(): break + + result = self.array_buf.tostring() + del self.array_buf[:] + self.rorps_in_buffer = 0 return result + else: + assert length >= 0 + read_buffer = self.read() + while len(read_buffer) < length: read_buffer += self.read() + self.array_buf.fromstring(read_buffer[length:]) + return read_buffer[length:] - if len(array_buf) < l: # Try to make buffer at least as long as l - array_buf.fromstring(self.file.read(max(self.bufsize, l))) - result = array_buf[:l].tostring() - del array_buf[:l] - return result + def addtobuffer(self): + """Add some number of bytes to the buffer. Return false if done""" + if self.currently_in_file: + self.addfromfile("c") + if not self.currently_in_file: self.rorps_in_buffer += 1 + else: + if self.next_in_line: + currentobj = self.next_in_line + self.next_in_line = 0 + else: + try: currentobj = self.iter.next() + except StopIteration: + self.addfinal() + return None - def close(self): return self.file.close() + if hasattr(currentobj, "read") and hasattr(currentobj, "close"): + self.currently_in_file = currentobj + self.addfromfile("f") + elif (type(currentobj) is types.ClassType and + issubclass(currentobj, iterfile.RORPIterFlush)): + if currentobj is iterfile.RORPIterFlushRepeat: + self.add_flush_repeater() + return None + else: self.addrorp(currentobj) + return 1 + + def add_flush_repeater(self): + """Add a RORPIterFlushRepeat object to the buffer""" + pickle = cPickle.dumps(iterfile.RORPIterFlushRepeat, 1) + self.array_buf.fromstring("o") + self.array_buf.fromstring(C.long2str(long(len(pickle)))) + self.array_buf.fromstring(pickle) + + def addrorp(self, rorp): + """Add a rorp to the buffer""" + if rorp.file: + pickle = cPickle.dumps((rorp.index, rorp.data, 1), 1) + self.next_in_line = rorp.file + else: + pickle = cPickle.dumps((rorp.index, rorp.data, 0), 1) + self.rorps_in_buffer += 1 + self.array_buf.fromstring("o") + self.array_buf.fromstring(C.long2str(long(len(pickle)))) + self.array_buf.fromstring(pickle) + + def addfinal(self): + """Signal the end of the iterator to the other end""" + self.array_buf.fromstring("z") + self.array_buf.fromstring(C.long2str(0L)) + + def close(self): self.closed = 1 +class FileToRORPIter(IterWrappingFile): + """Take a RORPIterToFile and turn it back into a RORPIter""" + def __init__(self, file): + IterWrappingFile.__init__(self, file) + self.buf = "" + + def __iter__(self): return self + + def next(self): + """Return next object in iter, or raise StopIteration""" + if self.currently_in_file: + self.currently_in_file.close() + type = None + while not type: type, data = self._get() + if type == "z": raise StopIteration + elif type == "o": + if data is iterfile.RORPIterFlushRepeat: return data + else: return self.get_rorp(data) + else: raise IterFileException("Bad file type %s" % (type,)) + + def get_rorp(self, pickled_tuple): + """Return rorp that data represents""" + index, data_dict, num_files = pickled_tuple + rorp = rpath.RORPath(index, data_dict) + if num_files: + assert num_files == 1, "Only one file accepted right now" + rorp.setfile(self.get_file()) + return rorp + + def get_file(self): + """Read file object from file""" + type, data = self._get() + if type == "f": + file = IterVirtualFile(self, data) + if data: self.currently_in_file = file + else: self.currently_in_file = None + return file + assert type == "e", "Expected type e, got %s" % (type,) + assert isinstance(data, Exception) + return ErrorFile(data) + + def _get(self): + """Return (type, data or object) pair + + This is like UnwrapFile._get() but reads in variable length + blocks. Also type "z" is allowed, which means end of + iterator. An empty read() is not considered to mark the end + of remote iter. + + """ + if not self.buf: self.buf += self.file.read() + if not self.buf: return None, None + + assert len(self.buf) >= 8, "Unexpected end of RORPIter file" + type, length = self.buf[0], C.str2long(self.buf[1:8]) + data = self.buf[8:8+length] + self.buf = self.buf[8+length:] + if type == "o" or type == "e": return type, cPickle.loads(data) + else: return type, data + + +class ErrorFile: + """File-like that just raises error (used by FileToRORPIter above)""" + def __init__(self, exc): + """Initialize new ErrorFile. exc is the exception to raise on read""" + self.exc = exc + def read(self, l=-1): raise self.exc + def close(self): return None + +import iterfile diff --git a/rdiff-backup/rdiff_backup/rorpiter.py b/rdiff-backup/rdiff_backup/rorpiter.py index fda06e5..7db4fea 100644 --- a/rdiff-backup/rdiff_backup/rorpiter.py +++ b/rdiff-backup/rdiff_backup/rorpiter.py @@ -33,48 +33,6 @@ import os, tempfile, UserList, types import Globals, rpath, iterfile -class RORPIterException(Exception): pass - -def ToRaw(rorp_iter): - """Convert a rorp iterator to raw form""" - for rorp in rorp_iter: - if rorp.file: - yield (rorp.index, rorp.data, 1) - yield rorp.file - else: yield (rorp.index, rorp.data, 0) - -def FromRaw(raw_iter): - """Convert raw rorp iter back to standard form""" - for index, data, num_files in raw_iter: - rorp = rpath.RORPath(index, data) - if num_files: - assert num_files == 1, "Only one file accepted right now" - rorp.setfile(get_next_file(raw_iter)) - yield rorp - -class ErrorFile: - """Used by get_next_file below, file-like that just raises error""" - def __init__(self, exc): - """Initialize new ErrorFile. exc is the exception to raise on read""" - self.exc = exc - def read(self, l=-1): raise self.exc - def close(self): return None - -def get_next_file(iter): - """Return the next element of an iterator, raising error if none""" - try: next = iter.next() - except StopIteration: raise RORPIterException("Unexpected end to iter") - if isinstance(next, Exception): return ErrorFile(next) - return next - -def ToFile(rorp_iter): - """Return file version of iterator""" - return iterfile.FileWrappingIter(ToRaw(rorp_iter)) - -def FromFile(fileobj): - """Recover rorp iterator from file interface""" - return FromRaw(iterfile.IterWrappingFile(fileobj)) - def CollateIterators(*rorp_iters): """Collate RORPath iterators by index diff --git a/rdiff-backup/testing/iterfiletest.py b/rdiff-backup/testing/iterfiletest.py index c8c77c6..2c357ef 100644 --- a/rdiff-backup/testing/iterfiletest.py +++ b/rdiff-backup/testing/iterfiletest.py @@ -52,15 +52,98 @@ class testIterFile(unittest.TestCase): assert new_iter.next() == "foo" self.assertRaises(StopIteration, new_iter.next) + +class testRORPIters(unittest.TestCase): + """Test sending rorpiter back and forth""" + def setUp(self): + """Make testfiles/output directory and a few files""" + Myrm("testfiles/output") + self.outputrp = rpath.RPath(Globals.local_connection, + "testfiles/output") + self.regfile1 = self.outputrp.append("reg1") + self.regfile2 = self.outputrp.append("reg2") + self.regfile3 = self.outputrp.append("reg3") + + self.outputrp.mkdir() + + fp = self.regfile1.open("wb") + fp.write("hello") + fp.close() + self.regfile1.setfile(self.regfile1.open("rb")) + + self.regfile2.touch() + self.regfile2.setfile(self.regfile2.open("rb")) + + fp = self.regfile3.open("wb") + fp.write("goodbye") + fp.close() + self.regfile3.setfile(self.regfile3.open("rb")) + + self.regfile1.setdata() + self.regfile2.setdata() + self.regfile3.setdata() -class testBufferedRead(unittest.TestCase): - def testBuffering(self): - """Test buffering a StringIO""" - fp = StringIO.StringIO("12345678"*10000) - bfp = BufferedRead(fp) - assert bfp.read(5) == "12345" - assert bfp.read(4) == "6781" - assert len(bfp.read(75000)) == 75000 + def print_RORPIterFile(self, rpiter_file): + """Print the given rorpiter file""" + while 1: + buf = rpiter_file.read() + sys.stdout.write(buf) + if buf[0] == "z": break + + def testBasic(self): + """Test basic conversion""" + l = [self.outputrp, self.regfile1, self.regfile2, self.regfile3] + i_out = FileToRORPIter(RORPIterToFile(iter(l))) + + out1 = i_out.next() + assert out1 == self.outputrp + + out2 = i_out.next() + assert out2 == self.regfile1 + fp = out2.open("rb") + assert fp.read() == "hello" + assert not fp.close() + + out3 = i_out.next() + assert out3 == self.regfile2 + fp = out3.open("rb") + assert fp.read() == "" + assert not fp.close() + + i_out.next() + self.assertRaises(StopIteration, i_out.next) + + def testFlush(self): + """Test flushing property of RORPIterToFile""" + l = [self.outputrp, RORPIterFlush, self.outputrp] + filelike = RORPIterToFile(iter(l)) + new_filelike = StringIO.StringIO((filelike.read() + "z" + + C.long2str(0L))) + + i_out = FileToRORPIter(new_filelike) + assert i_out.next() == self.outputrp + self.assertRaises(StopIteration, i_out.next) + + i_out2 = FileToRORPIter(filelike) + assert i_out2.next() == self.outputrp + self.assertRaises(StopIteration, i_out2.next) + + def testFlushRepeat(self): + """Test flushing like above, but have Flush obj emerge from iter""" + l = [self.outputrp, RORPIterFlushRepeat, self.outputrp] + filelike = RORPIterToFile(iter(l)) + new_filelike = StringIO.StringIO((filelike.read() + "z" + + C.long2str(0L))) + + i_out = FileToRORPIter(new_filelike) + assert i_out.next() == self.outputrp + assert i_out.next() is RORPIterFlushRepeat + self.assertRaises(StopIteration, i_out.next) + + i_out2 = FileToRORPIter(filelike) + assert i_out2.next() == self.outputrp + self.assertRaises(StopIteration, i_out2.next) + if __name__ == "__main__": unittest.main() -- cgit v1.2.1