diff options
Diffstat (limited to 'rdiff-backup/rdiff_backup/iterfile.py')
-rw-r--r-- | rdiff-backup/rdiff_backup/iterfile.py | 197 |
1 files changed, 174 insertions, 23 deletions
diff --git a/rdiff-backup/rdiff_backup/iterfile.py b/rdiff-backup/rdiff_backup/iterfile.py index 9d52d2f..0ae998e 100644 --- a/rdiff-backup/rdiff_backup/iterfile.py +++ b/rdiff-backup/rdiff_backup/iterfile.py @@ -19,8 +19,8 @@ """Convert an iterator to a file object and vice-versa""" -import cPickle, array -import Globals, C, robust, log +import cPickle, array, types +import Globals, C, robust, log, rpath class IterFileException(Exception): pass @@ -129,7 +129,7 @@ class IterVirtualFile(UnwrapFile): def addtobuffer(self): """Read a chunk from the file and add it to the buffer""" assert self.iwf.currently_in_file - type, data = self._get() + type, data = self.iwf._get() if type == "e": self.iwf.currently_in_file = None raise data @@ -238,32 +238,183 @@ class FileWrappingIter: def close(self): self.closed = 1 -class BufferedRead: - """Buffer the .read() calls to the given file +class RORPIterFlush: + """Used to signal that a RORPIterToFile should flush buffer""" + pass + +class RORPIterFlushRepeat(RORPIterFlush): + """Flush, but then cause RORPIter to yield this same object - This is used to lessen overhead and latency when a file is sent - over a connection. Profiling said that arrays were faster than - strings here. + Thus if we put together a pipeline of these, one RORPIterContFlush + can cause all the segments to flush in sequence. """ - def __init__(self, file): - self.file = file - self.array_buf = array.array('c') - self.bufsize = Globals.conn_bufsize + pass + +class RORPIterToFile(FileWrappingIter): + """Take a RORPIter and give it a file-ish interface + + This is how we send signatures and diffs across the line. As + sending each one separately via a read() call would result in a + lot of latency, the read()'s are buffered - a read() call with no + arguments will return a variable length string (possibly empty). + + To flush the RORPIterToFile, have the iterator yield a + RORPIterFlush class. + + """ + def __init__(self, rpiter, max_buffer_bytes = None, max_buffer_rps = None): + """RORPIterToFile initializer + + max_buffer_bytes is the maximum size of the buffer in bytes. + max_buffer_rps is the maximum size of the buffer in rorps. - def read(self, l = -1): - array_buf = self.array_buf - if l < 0: # Read as much as possible - result = array_buf.tostring() + self.file.read() - del array_buf[:] + """ + self.max_buffer_bytes = max_buffer_bytes or Globals.conn_bufsize + self.max_buffer_rps = max_buffer_rps or Globals.pipeline_max_length + self.rorps_in_buffer = 0 + self.next_in_line = None + FileWrappingIter.__init__(self, rpiter) + + def read(self, length = None): + """Return some number of bytes, including 0""" + assert not self.closed + if length is None: + while (len(self.array_buf) < self.max_buffer_bytes and + self.rorps_in_buffer < self.max_buffer_rps): + if not self.addtobuffer(): break + + result = self.array_buf.tostring() + del self.array_buf[:] + self.rorps_in_buffer = 0 return result + else: + assert length >= 0 + read_buffer = self.read() + while len(read_buffer) < length: read_buffer += self.read() + self.array_buf.fromstring(read_buffer[length:]) + return read_buffer[length:] - if len(array_buf) < l: # Try to make buffer at least as long as l - array_buf.fromstring(self.file.read(max(self.bufsize, l))) - result = array_buf[:l].tostring() - del array_buf[:l] - return result + def addtobuffer(self): + """Add some number of bytes to the buffer. Return false if done""" + if self.currently_in_file: + self.addfromfile("c") + if not self.currently_in_file: self.rorps_in_buffer += 1 + else: + if self.next_in_line: + currentobj = self.next_in_line + self.next_in_line = 0 + else: + try: currentobj = self.iter.next() + except StopIteration: + self.addfinal() + return None - def close(self): return self.file.close() + if hasattr(currentobj, "read") and hasattr(currentobj, "close"): + self.currently_in_file = currentobj + self.addfromfile("f") + elif (type(currentobj) is types.ClassType and + issubclass(currentobj, iterfile.RORPIterFlush)): + if currentobj is iterfile.RORPIterFlushRepeat: + self.add_flush_repeater() + return None + else: self.addrorp(currentobj) + return 1 + + def add_flush_repeater(self): + """Add a RORPIterFlushRepeat object to the buffer""" + pickle = cPickle.dumps(iterfile.RORPIterFlushRepeat, 1) + self.array_buf.fromstring("o") + self.array_buf.fromstring(C.long2str(long(len(pickle)))) + self.array_buf.fromstring(pickle) + + def addrorp(self, rorp): + """Add a rorp to the buffer""" + if rorp.file: + pickle = cPickle.dumps((rorp.index, rorp.data, 1), 1) + self.next_in_line = rorp.file + else: + pickle = cPickle.dumps((rorp.index, rorp.data, 0), 1) + self.rorps_in_buffer += 1 + self.array_buf.fromstring("o") + self.array_buf.fromstring(C.long2str(long(len(pickle)))) + self.array_buf.fromstring(pickle) + + def addfinal(self): + """Signal the end of the iterator to the other end""" + self.array_buf.fromstring("z") + self.array_buf.fromstring(C.long2str(0L)) + + def close(self): self.closed = 1 +class FileToRORPIter(IterWrappingFile): + """Take a RORPIterToFile and turn it back into a RORPIter""" + def __init__(self, file): + IterWrappingFile.__init__(self, file) + self.buf = "" + + def __iter__(self): return self + + def next(self): + """Return next object in iter, or raise StopIteration""" + if self.currently_in_file: + self.currently_in_file.close() + type = None + while not type: type, data = self._get() + if type == "z": raise StopIteration + elif type == "o": + if data is iterfile.RORPIterFlushRepeat: return data + else: return self.get_rorp(data) + else: raise IterFileException("Bad file type %s" % (type,)) + + def get_rorp(self, pickled_tuple): + """Return rorp that data represents""" + index, data_dict, num_files = pickled_tuple + rorp = rpath.RORPath(index, data_dict) + if num_files: + assert num_files == 1, "Only one file accepted right now" + rorp.setfile(self.get_file()) + return rorp + + def get_file(self): + """Read file object from file""" + type, data = self._get() + if type == "f": + file = IterVirtualFile(self, data) + if data: self.currently_in_file = file + else: self.currently_in_file = None + return file + assert type == "e", "Expected type e, got %s" % (type,) + assert isinstance(data, Exception) + return ErrorFile(data) + + def _get(self): + """Return (type, data or object) pair + + This is like UnwrapFile._get() but reads in variable length + blocks. Also type "z" is allowed, which means end of + iterator. An empty read() is not considered to mark the end + of remote iter. + + """ + if not self.buf: self.buf += self.file.read() + if not self.buf: return None, None + + assert len(self.buf) >= 8, "Unexpected end of RORPIter file" + type, length = self.buf[0], C.str2long(self.buf[1:8]) + data = self.buf[8:8+length] + self.buf = self.buf[8+length:] + if type == "o" or type == "e": return type, cPickle.loads(data) + else: return type, data + + +class ErrorFile: + """File-like that just raises error (used by FileToRORPIter above)""" + def __init__(self, exc): + """Initialize new ErrorFile. exc is the exception to raise on read""" + self.exc = exc + def read(self, l=-1): raise self.exc + def close(self): return None + +import iterfile |