diff options
Diffstat (limited to 'bzrlib/pack.py')
-rw-r--r-- | bzrlib/pack.py | 537 |
1 files changed, 537 insertions, 0 deletions
diff --git a/bzrlib/pack.py b/bzrlib/pack.py new file mode 100644 index 0000000..adadb8f --- /dev/null +++ b/bzrlib/pack.py @@ -0,0 +1,537 @@ +# Copyright (C) 2007, 2009, 2010 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""Container format for Bazaar data. + +"Containers" and "records" are described in +doc/developers/container-format.txt. +""" + +from __future__ import absolute_import + +from cStringIO import StringIO +import re + +from bzrlib import errors + + +FORMAT_ONE = "Bazaar pack format 1 (introduced in 0.18)" + + +_whitespace_re = re.compile('[\t\n\x0b\x0c\r ]') + + +def _check_name(name): + """Do some basic checking of 'name'. + + At the moment, this just checks that there are no whitespace characters in a + name. + + :raises InvalidRecordError: if name is not valid. + :seealso: _check_name_encoding + """ + if _whitespace_re.search(name) is not None: + raise errors.InvalidRecordError("%r is not a valid name." % (name,)) + + +def _check_name_encoding(name): + """Check that 'name' is valid UTF-8. + + This is separate from _check_name because UTF-8 decoding is relatively + expensive, and we usually want to avoid it. + + :raises InvalidRecordError: if name is not valid UTF-8. + """ + try: + name.decode('utf-8') + except UnicodeDecodeError, e: + raise errors.InvalidRecordError(str(e)) + + +class ContainerSerialiser(object): + """A helper class for serialising containers. + + It simply returns bytes from method calls to 'begin', 'end' and + 'bytes_record'. You may find ContainerWriter to be a more convenient + interface. + """ + + def begin(self): + """Return the bytes to begin a container.""" + return FORMAT_ONE + "\n" + + def end(self): + """Return the bytes to finish a container.""" + return "E" + + def bytes_header(self, length, names): + """Return the header for a Bytes record.""" + # Kind marker + byte_sections = ["B"] + # Length + byte_sections.append(str(length) + "\n") + # Names + for name_tuple in names: + # Make sure we're writing valid names. Note that we will leave a + # half-written record if a name is bad! + for name in name_tuple: + _check_name(name) + byte_sections.append('\x00'.join(name_tuple) + "\n") + # End of headers + byte_sections.append("\n") + return ''.join(byte_sections) + + def bytes_record(self, bytes, names): + """Return the bytes for a Bytes record with the given name and + contents. + + If the content may be large, construct the header separately and then + stream out the contents. + """ + return self.bytes_header(len(bytes), names) + bytes + + +class ContainerWriter(object): + """A class for writing containers to a file. + + :attribute records_written: The number of user records added to the + container. This does not count the prelude or suffix of the container + introduced by the begin() and end() methods. + """ + + # Join up headers with the body if writing fewer than this many bytes: + # trades off memory usage and copying to do less IO ops. + _JOIN_WRITES_THRESHOLD = 100000 + + def __init__(self, write_func): + """Constructor. + + :param write_func: a callable that will be called when this + ContainerWriter needs to write some bytes. + """ + self._write_func = write_func + self.current_offset = 0 + self.records_written = 0 + self._serialiser = ContainerSerialiser() + + def begin(self): + """Begin writing a container.""" + self.write_func(self._serialiser.begin()) + + def write_func(self, bytes): + self._write_func(bytes) + self.current_offset += len(bytes) + + def end(self): + """Finish writing a container.""" + self.write_func(self._serialiser.end()) + + def add_bytes_record(self, bytes, names): + """Add a Bytes record with the given names. + + :param bytes: The bytes to insert. + :param names: The names to give the inserted bytes. Each name is + a tuple of bytestrings. The bytestrings may not contain + whitespace. + :return: An offset, length tuple. The offset is the offset + of the record within the container, and the length is the + length of data that will need to be read to reconstitute the + record. These offset and length can only be used with the pack + interface - they might be offset by headers or other such details + and thus are only suitable for use by a ContainerReader. + """ + current_offset = self.current_offset + length = len(bytes) + if length < self._JOIN_WRITES_THRESHOLD: + self.write_func(self._serialiser.bytes_header(length, names) + + bytes) + else: + self.write_func(self._serialiser.bytes_header(length, names)) + self.write_func(bytes) + self.records_written += 1 + # return a memo of where we wrote data to allow random access. + return current_offset, self.current_offset - current_offset + + +class ReadVFile(object): + """Adapt a readv result iterator to a file like protocol. + + The readv result must support the iterator protocol returning (offset, + data_bytes) pairs. + """ + + # XXX: This could be a generic transport class, as other code may want to + # gradually consume the readv result. + + def __init__(self, readv_result): + """Construct a new ReadVFile wrapper. + + :seealso: make_readv_reader + + :param readv_result: the most recent readv result - list or generator + """ + # readv can return a sequence or an iterator, but we require an + # iterator to know how much has been consumed. + readv_result = iter(readv_result) + self.readv_result = readv_result + self._string = None + + def _next(self): + if (self._string is None or + self._string.tell() == self._string_length): + offset, data = self.readv_result.next() + self._string_length = len(data) + self._string = StringIO(data) + + def read(self, length): + self._next() + result = self._string.read(length) + if len(result) < length: + raise errors.BzrError('wanted %d bytes but next ' + 'hunk only contains %d: %r...' % + (length, len(result), result[:20])) + return result + + def readline(self): + """Note that readline will not cross readv segments.""" + self._next() + result = self._string.readline() + if self._string.tell() == self._string_length and result[-1] != '\n': + raise errors.BzrError('short readline in the readvfile hunk: %r' + % (result, )) + return result + + +def make_readv_reader(transport, filename, requested_records): + """Create a ContainerReader that will read selected records only. + + :param transport: The transport the pack file is located on. + :param filename: The filename of the pack file. + :param requested_records: The record offset, length tuples as returned + by add_bytes_record for the desired records. + """ + readv_blocks = [(0, len(FORMAT_ONE)+1)] + readv_blocks.extend(requested_records) + result = ContainerReader(ReadVFile( + transport.readv(filename, readv_blocks))) + return result + + +class BaseReader(object): + + def __init__(self, source_file): + """Constructor. + + :param source_file: a file-like object with `read` and `readline` + methods. + """ + self._source = source_file + + def reader_func(self, length=None): + return self._source.read(length) + + def _read_line(self): + line = self._source.readline() + if not line.endswith('\n'): + raise errors.UnexpectedEndOfContainerError() + return line.rstrip('\n') + + +class ContainerReader(BaseReader): + """A class for reading Bazaar's container format.""" + + def iter_records(self): + """Iterate over the container, yielding each record as it is read. + + Each yielded record will be a 2-tuple of (names, callable), where names + is a ``list`` and bytes is a function that takes one argument, + ``max_length``. + + You **must not** call the callable after advancing the iterator to the + next record. That is, this code is invalid:: + + record_iter = container.iter_records() + names1, callable1 = record_iter.next() + names2, callable2 = record_iter.next() + bytes1 = callable1(None) + + As it will give incorrect results and invalidate the state of the + ContainerReader. + + :raises ContainerError: if any sort of container corruption is + detected, e.g. UnknownContainerFormatError is the format of the + container is unrecognised. + :seealso: ContainerReader.read + """ + self._read_format() + return self._iter_records() + + def iter_record_objects(self): + """Iterate over the container, yielding each record as it is read. + + Each yielded record will be an object with ``read`` and ``validate`` + methods. Like with iter_records, it is not safe to use a record object + after advancing the iterator to yield next record. + + :raises ContainerError: if any sort of container corruption is + detected, e.g. UnknownContainerFormatError is the format of the + container is unrecognised. + :seealso: iter_records + """ + self._read_format() + return self._iter_record_objects() + + def _iter_records(self): + for record in self._iter_record_objects(): + yield record.read() + + def _iter_record_objects(self): + while True: + record_kind = self.reader_func(1) + if record_kind == 'B': + # Bytes record. + reader = BytesRecordReader(self._source) + yield reader + elif record_kind == 'E': + # End marker. There are no more records. + return + elif record_kind == '': + # End of stream encountered, but no End Marker record seen, so + # this container is incomplete. + raise errors.UnexpectedEndOfContainerError() + else: + # Unknown record type. + raise errors.UnknownRecordTypeError(record_kind) + + def _read_format(self): + format = self._read_line() + if format != FORMAT_ONE: + raise errors.UnknownContainerFormatError(format) + + def validate(self): + """Validate this container and its records. + + Validating consumes the data stream just like iter_records and + iter_record_objects, so you cannot call it after + iter_records/iter_record_objects. + + :raises ContainerError: if something is invalid. + """ + all_names = set() + for record_names, read_bytes in self.iter_records(): + read_bytes(None) + for name_tuple in record_names: + for name in name_tuple: + _check_name_encoding(name) + # Check that the name is unique. Note that Python will refuse + # to decode non-shortest forms of UTF-8 encoding, so there is no + # risk that the same unicode string has been encoded two + # different ways. + if name_tuple in all_names: + raise errors.DuplicateRecordNameError(name_tuple[0]) + all_names.add(name_tuple) + excess_bytes = self.reader_func(1) + if excess_bytes != '': + raise errors.ContainerHasExcessDataError(excess_bytes) + + +class BytesRecordReader(BaseReader): + + def read(self): + """Read this record. + + You can either validate or read a record, you can't do both. + + :returns: A tuple of (names, callable). The callable can be called + repeatedly to obtain the bytes for the record, with a max_length + argument. If max_length is None, returns all the bytes. Because + records can be arbitrarily large, using None is not recommended + unless you have reason to believe the content will fit in memory. + """ + # Read the content length. + length_line = self._read_line() + try: + length = int(length_line) + except ValueError: + raise errors.InvalidRecordError( + "%r is not a valid length." % (length_line,)) + + # Read the list of names. + names = [] + while True: + name_line = self._read_line() + if name_line == '': + break + name_tuple = tuple(name_line.split('\x00')) + for name in name_tuple: + _check_name(name) + names.append(name_tuple) + + self._remaining_length = length + return names, self._content_reader + + def _content_reader(self, max_length): + if max_length is None: + length_to_read = self._remaining_length + else: + length_to_read = min(max_length, self._remaining_length) + self._remaining_length -= length_to_read + bytes = self.reader_func(length_to_read) + if len(bytes) != length_to_read: + raise errors.UnexpectedEndOfContainerError() + return bytes + + def validate(self): + """Validate this record. + + You can either validate or read, you can't do both. + + :raises ContainerError: if this record is invalid. + """ + names, read_bytes = self.read() + for name_tuple in names: + for name in name_tuple: + _check_name_encoding(name) + read_bytes(None) + + +class ContainerPushParser(object): + """A "push" parser for container format 1. + + It accepts bytes via the ``accept_bytes`` method, and parses them into + records which can be retrieved via the ``read_pending_records`` method. + """ + + def __init__(self): + self._buffer = '' + self._state_handler = self._state_expecting_format_line + self._parsed_records = [] + self._reset_current_record() + self.finished = False + + def _reset_current_record(self): + self._current_record_length = None + self._current_record_names = [] + + def accept_bytes(self, bytes): + self._buffer += bytes + # Keep iterating the state machine until it stops consuming bytes from + # the buffer. + last_buffer_length = None + cur_buffer_length = len(self._buffer) + last_state_handler = None + while (cur_buffer_length != last_buffer_length + or last_state_handler != self._state_handler): + last_buffer_length = cur_buffer_length + last_state_handler = self._state_handler + self._state_handler() + cur_buffer_length = len(self._buffer) + + def read_pending_records(self, max=None): + if max: + records = self._parsed_records[:max] + del self._parsed_records[:max] + return records + else: + records = self._parsed_records + self._parsed_records = [] + return records + + def _consume_line(self): + """Take a line out of the buffer, and return the line. + + If a newline byte is not found in the buffer, the buffer is + unchanged and this returns None instead. + """ + newline_pos = self._buffer.find('\n') + if newline_pos != -1: + line = self._buffer[:newline_pos] + self._buffer = self._buffer[newline_pos+1:] + return line + else: + return None + + def _state_expecting_format_line(self): + line = self._consume_line() + if line is not None: + if line != FORMAT_ONE: + raise errors.UnknownContainerFormatError(line) + self._state_handler = self._state_expecting_record_type + + def _state_expecting_record_type(self): + if len(self._buffer) >= 1: + record_type = self._buffer[0] + self._buffer = self._buffer[1:] + if record_type == 'B': + self._state_handler = self._state_expecting_length + elif record_type == 'E': + self.finished = True + self._state_handler = self._state_expecting_nothing + else: + raise errors.UnknownRecordTypeError(record_type) + + def _state_expecting_length(self): + line = self._consume_line() + if line is not None: + try: + self._current_record_length = int(line) + except ValueError: + raise errors.InvalidRecordError( + "%r is not a valid length." % (line,)) + self._state_handler = self._state_expecting_name + + def _state_expecting_name(self): + encoded_name_parts = self._consume_line() + if encoded_name_parts == '': + self._state_handler = self._state_expecting_body + elif encoded_name_parts: + name_parts = tuple(encoded_name_parts.split('\x00')) + for name_part in name_parts: + _check_name(name_part) + self._current_record_names.append(name_parts) + + def _state_expecting_body(self): + if len(self._buffer) >= self._current_record_length: + body_bytes = self._buffer[:self._current_record_length] + self._buffer = self._buffer[self._current_record_length:] + record = (self._current_record_names, body_bytes) + self._parsed_records.append(record) + self._reset_current_record() + self._state_handler = self._state_expecting_record_type + + def _state_expecting_nothing(self): + pass + + def read_size_hint(self): + hint = 16384 + if self._state_handler == self._state_expecting_body: + remaining = self._current_record_length - len(self._buffer) + if remaining < 0: + remaining = 0 + return max(hint, remaining) + return hint + + +def iter_records_from_file(source_file): + parser = ContainerPushParser() + while True: + bytes = source_file.read(parser.read_size_hint()) + parser.accept_bytes(bytes) + for record in parser.read_pending_records(): + yield record + if parser.finished: + break + |