summaryrefslogtreecommitdiff
path: root/rdiff-backup/rdiff_backup/iterfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'rdiff-backup/rdiff_backup/iterfile.py')
-rw-r--r--rdiff-backup/rdiff_backup/iterfile.py197
1 files changed, 174 insertions, 23 deletions
diff --git a/rdiff-backup/rdiff_backup/iterfile.py b/rdiff-backup/rdiff_backup/iterfile.py
index 9d52d2f..0ae998e 100644
--- a/rdiff-backup/rdiff_backup/iterfile.py
+++ b/rdiff-backup/rdiff_backup/iterfile.py
@@ -19,8 +19,8 @@
"""Convert an iterator to a file object and vice-versa"""
-import cPickle, array
-import Globals, C, robust, log
+import cPickle, array, types
+import Globals, C, robust, log, rpath
class IterFileException(Exception): pass
@@ -129,7 +129,7 @@ class IterVirtualFile(UnwrapFile):
def addtobuffer(self):
"""Read a chunk from the file and add it to the buffer"""
assert self.iwf.currently_in_file
- type, data = self._get()
+ type, data = self.iwf._get()
if type == "e":
self.iwf.currently_in_file = None
raise data
@@ -238,32 +238,183 @@ class FileWrappingIter:
def close(self): self.closed = 1
-class BufferedRead:
- """Buffer the .read() calls to the given file
+class RORPIterFlush:
+ """Used to signal that a RORPIterToFile should flush buffer"""
+ pass
+
+class RORPIterFlushRepeat(RORPIterFlush):
+ """Flush, but then cause RORPIter to yield this same object
- This is used to lessen overhead and latency when a file is sent
- over a connection. Profiling said that arrays were faster than
- strings here.
+ Thus if we put together a pipeline of these, one RORPIterContFlush
+ can cause all the segments to flush in sequence.
"""
- def __init__(self, file):
- self.file = file
- self.array_buf = array.array('c')
- self.bufsize = Globals.conn_bufsize
+ pass
+
+class RORPIterToFile(FileWrappingIter):
+ """Take a RORPIter and give it a file-ish interface
+
+ This is how we send signatures and diffs across the line. As
+ sending each one separately via a read() call would result in a
+ lot of latency, the read()'s are buffered - a read() call with no
+ arguments will return a variable length string (possibly empty).
+
+ To flush the RORPIterToFile, have the iterator yield a
+ RORPIterFlush class.
+
+ """
+ def __init__(self, rpiter, max_buffer_bytes = None, max_buffer_rps = None):
+ """RORPIterToFile initializer
+
+ max_buffer_bytes is the maximum size of the buffer in bytes.
+ max_buffer_rps is the maximum size of the buffer in rorps.
- def read(self, l = -1):
- array_buf = self.array_buf
- if l < 0: # Read as much as possible
- result = array_buf.tostring() + self.file.read()
- del array_buf[:]
+ """
+ self.max_buffer_bytes = max_buffer_bytes or Globals.conn_bufsize
+ self.max_buffer_rps = max_buffer_rps or Globals.pipeline_max_length
+ self.rorps_in_buffer = 0
+ self.next_in_line = None
+ FileWrappingIter.__init__(self, rpiter)
+
+ def read(self, length = None):
+ """Return some number of bytes, including 0"""
+ assert not self.closed
+ if length is None:
+ while (len(self.array_buf) < self.max_buffer_bytes and
+ self.rorps_in_buffer < self.max_buffer_rps):
+ if not self.addtobuffer(): break
+
+ result = self.array_buf.tostring()
+ del self.array_buf[:]
+ self.rorps_in_buffer = 0
return result
+ else:
+ assert length >= 0
+ read_buffer = self.read()
+ while len(read_buffer) < length: read_buffer += self.read()
+ self.array_buf.fromstring(read_buffer[length:])
+ return read_buffer[length:]
- if len(array_buf) < l: # Try to make buffer at least as long as l
- array_buf.fromstring(self.file.read(max(self.bufsize, l)))
- result = array_buf[:l].tostring()
- del array_buf[:l]
- return result
+ def addtobuffer(self):
+ """Add some number of bytes to the buffer. Return false if done"""
+ if self.currently_in_file:
+ self.addfromfile("c")
+ if not self.currently_in_file: self.rorps_in_buffer += 1
+ else:
+ if self.next_in_line:
+ currentobj = self.next_in_line
+ self.next_in_line = 0
+ else:
+ try: currentobj = self.iter.next()
+ except StopIteration:
+ self.addfinal()
+ return None
- def close(self): return self.file.close()
+ if hasattr(currentobj, "read") and hasattr(currentobj, "close"):
+ self.currently_in_file = currentobj
+ self.addfromfile("f")
+ elif (type(currentobj) is types.ClassType and
+ issubclass(currentobj, iterfile.RORPIterFlush)):
+ if currentobj is iterfile.RORPIterFlushRepeat:
+ self.add_flush_repeater()
+ return None
+ else: self.addrorp(currentobj)
+ return 1
+
+ def add_flush_repeater(self):
+ """Add a RORPIterFlushRepeat object to the buffer"""
+ pickle = cPickle.dumps(iterfile.RORPIterFlushRepeat, 1)
+ self.array_buf.fromstring("o")
+ self.array_buf.fromstring(C.long2str(long(len(pickle))))
+ self.array_buf.fromstring(pickle)
+
+ def addrorp(self, rorp):
+ """Add a rorp to the buffer"""
+ if rorp.file:
+ pickle = cPickle.dumps((rorp.index, rorp.data, 1), 1)
+ self.next_in_line = rorp.file
+ else:
+ pickle = cPickle.dumps((rorp.index, rorp.data, 0), 1)
+ self.rorps_in_buffer += 1
+ self.array_buf.fromstring("o")
+ self.array_buf.fromstring(C.long2str(long(len(pickle))))
+ self.array_buf.fromstring(pickle)
+
+ def addfinal(self):
+ """Signal the end of the iterator to the other end"""
+ self.array_buf.fromstring("z")
+ self.array_buf.fromstring(C.long2str(0L))
+
+ def close(self): self.closed = 1
+class FileToRORPIter(IterWrappingFile):
+ """Take a RORPIterToFile and turn it back into a RORPIter"""
+ def __init__(self, file):
+ IterWrappingFile.__init__(self, file)
+ self.buf = ""
+
+ def __iter__(self): return self
+
+ def next(self):
+ """Return next object in iter, or raise StopIteration"""
+ if self.currently_in_file:
+ self.currently_in_file.close()
+ type = None
+ while not type: type, data = self._get()
+ if type == "z": raise StopIteration
+ elif type == "o":
+ if data is iterfile.RORPIterFlushRepeat: return data
+ else: return self.get_rorp(data)
+ else: raise IterFileException("Bad file type %s" % (type,))
+
+ def get_rorp(self, pickled_tuple):
+ """Return rorp that data represents"""
+ index, data_dict, num_files = pickled_tuple
+ rorp = rpath.RORPath(index, data_dict)
+ if num_files:
+ assert num_files == 1, "Only one file accepted right now"
+ rorp.setfile(self.get_file())
+ return rorp
+
+ def get_file(self):
+ """Read file object from file"""
+ type, data = self._get()
+ if type == "f":
+ file = IterVirtualFile(self, data)
+ if data: self.currently_in_file = file
+ else: self.currently_in_file = None
+ return file
+ assert type == "e", "Expected type e, got %s" % (type,)
+ assert isinstance(data, Exception)
+ return ErrorFile(data)
+
+ def _get(self):
+ """Return (type, data or object) pair
+
+ This is like UnwrapFile._get() but reads in variable length
+ blocks. Also type "z" is allowed, which means end of
+ iterator. An empty read() is not considered to mark the end
+ of remote iter.
+
+ """
+ if not self.buf: self.buf += self.file.read()
+ if not self.buf: return None, None
+
+ assert len(self.buf) >= 8, "Unexpected end of RORPIter file"
+ type, length = self.buf[0], C.str2long(self.buf[1:8])
+ data = self.buf[8:8+length]
+ self.buf = self.buf[8+length:]
+ if type == "o" or type == "e": return type, cPickle.loads(data)
+ else: return type, data
+
+
+class ErrorFile:
+ """File-like that just raises error (used by FileToRORPIter above)"""
+ def __init__(self, exc):
+ """Initialize new ErrorFile. exc is the exception to raise on read"""
+ self.exc = exc
+ def read(self, l=-1): raise self.exc
+ def close(self): return None
+
+import iterfile