summaryrefslogtreecommitdiff
path: root/bzrlib/tests/test_pack.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/tests/test_pack.py')
-rw-r--r--bzrlib/tests/test_pack.py741
1 files changed, 741 insertions, 0 deletions
diff --git a/bzrlib/tests/test_pack.py b/bzrlib/tests/test_pack.py
new file mode 100644
index 0000000..33c8176
--- /dev/null
+++ b/bzrlib/tests/test_pack.py
@@ -0,0 +1,741 @@
+# Copyright (C) 2007 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Tests for bzrlib.pack."""
+
+
+from cStringIO import StringIO
+
+from bzrlib import pack, errors, tests
+
+
+class TestContainerSerialiser(tests.TestCase):
+ """Tests for the ContainerSerialiser class."""
+
+ def test_construct(self):
+ """Test constructing a ContainerSerialiser."""
+ pack.ContainerSerialiser()
+
+ def test_begin(self):
+ serialiser = pack.ContainerSerialiser()
+ self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
+ serialiser.begin())
+
+ def test_end(self):
+ serialiser = pack.ContainerSerialiser()
+ self.assertEqual('E', serialiser.end())
+
+ def test_bytes_record_no_name(self):
+ serialiser = pack.ContainerSerialiser()
+ record = serialiser.bytes_record('bytes', [])
+ self.assertEqual('B5\n\nbytes', record)
+
+ def test_bytes_record_one_name_with_one_part(self):
+ serialiser = pack.ContainerSerialiser()
+ record = serialiser.bytes_record('bytes', [('name',)])
+ self.assertEqual('B5\nname\n\nbytes', record)
+
+ def test_bytes_record_one_name_with_two_parts(self):
+ serialiser = pack.ContainerSerialiser()
+ record = serialiser.bytes_record('bytes', [('part1', 'part2')])
+ self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
+
+ def test_bytes_record_two_names(self):
+ serialiser = pack.ContainerSerialiser()
+ record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
+ self.assertEqual('B5\nname1\nname2\n\nbytes', record)
+
+ def test_bytes_record_whitespace_in_name_part(self):
+ serialiser = pack.ContainerSerialiser()
+ self.assertRaises(
+ errors.InvalidRecordError,
+ serialiser.bytes_record, 'bytes', [('bad name',)])
+
+ def test_bytes_record_header(self):
+ serialiser = pack.ContainerSerialiser()
+ record = serialiser.bytes_header(32, [('name1',), ('name2',)])
+ self.assertEqual('B32\nname1\nname2\n\n', record)
+
+
+class TestContainerWriter(tests.TestCase):
+
+ def setUp(self):
+ tests.TestCase.setUp(self)
+ self.output = StringIO()
+ self.writer = pack.ContainerWriter(self.output.write)
+
+ def assertOutput(self, expected_output):
+ """Assert that the output of self.writer ContainerWriter is equal to
+ expected_output.
+ """
+ self.assertEqual(expected_output, self.output.getvalue())
+
+ def test_construct(self):
+ """Test constructing a ContainerWriter.
+
+ This uses None as the output stream to show that the constructor
+ doesn't try to use the output stream.
+ """
+ writer = pack.ContainerWriter(None)
+
+ def test_begin(self):
+ """The begin() method writes the container format marker line."""
+ self.writer.begin()
+ self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
+
+ def test_zero_records_written_after_begin(self):
+ """After begin is written, 0 records have been written."""
+ self.writer.begin()
+ self.assertEqual(0, self.writer.records_written)
+
+ def test_end(self):
+ """The end() method writes an End Marker record."""
+ self.writer.begin()
+ self.writer.end()
+ self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
+
+ def test_empty_end_does_not_add_a_record_to_records_written(self):
+ """The end() method does not count towards the records written."""
+ self.writer.begin()
+ self.writer.end()
+ self.assertEqual(0, self.writer.records_written)
+
+ def test_non_empty_end_does_not_add_a_record_to_records_written(self):
+ """The end() method does not count towards the records written."""
+ self.writer.begin()
+ self.writer.add_bytes_record('foo', names=[])
+ self.writer.end()
+ self.assertEqual(1, self.writer.records_written)
+
+ def test_add_bytes_record_no_name(self):
+ """Add a bytes record with no name."""
+ self.writer.begin()
+ offset, length = self.writer.add_bytes_record('abc', names=[])
+ self.assertEqual((42, 7), (offset, length))
+ self.assertOutput(
+ 'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
+
+ def test_add_bytes_record_one_name(self):
+ """Add a bytes record with one name."""
+ self.writer.begin()
+
+ offset, length = self.writer.add_bytes_record(
+ 'abc', names=[('name1', )])
+ self.assertEqual((42, 13), (offset, length))
+ self.assertOutput(
+ 'Bazaar pack format 1 (introduced in 0.18)\n'
+ 'B3\nname1\n\nabc')
+
+ def test_add_bytes_record_split_writes(self):
+ """Write a large record which does multiple IOs"""
+
+ writes = []
+ real_write = self.writer.write_func
+
+ def record_writes(bytes):
+ writes.append(bytes)
+ return real_write(bytes)
+
+ self.writer.write_func = record_writes
+ self.writer._JOIN_WRITES_THRESHOLD = 2
+
+ self.writer.begin()
+ offset, length = self.writer.add_bytes_record(
+ 'abcabc', names=[('name1', )])
+ self.assertEqual((42, 16), (offset, length))
+ self.assertOutput(
+ 'Bazaar pack format 1 (introduced in 0.18)\n'
+ 'B6\nname1\n\nabcabc')
+
+ self.assertEquals([
+ 'Bazaar pack format 1 (introduced in 0.18)\n',
+ 'B6\nname1\n\n',
+ 'abcabc'],
+ writes)
+
+ def test_add_bytes_record_two_names(self):
+ """Add a bytes record with two names."""
+ self.writer.begin()
+ offset, length = self.writer.add_bytes_record(
+ 'abc', names=[('name1', ), ('name2', )])
+ self.assertEqual((42, 19), (offset, length))
+ self.assertOutput(
+ 'Bazaar pack format 1 (introduced in 0.18)\n'
+ 'B3\nname1\nname2\n\nabc')
+
+ def test_add_bytes_record_two_names(self):
+ """Add a bytes record with two names."""
+ self.writer.begin()
+ offset, length = self.writer.add_bytes_record(
+ 'abc', names=[('name1', ), ('name2', )])
+ self.assertEqual((42, 19), (offset, length))
+ self.assertOutput(
+ 'Bazaar pack format 1 (introduced in 0.18)\n'
+ 'B3\nname1\nname2\n\nabc')
+
+ def test_add_bytes_record_two_element_name(self):
+ """Add a bytes record with a two-element name."""
+ self.writer.begin()
+ offset, length = self.writer.add_bytes_record(
+ 'abc', names=[('name1', 'name2')])
+ self.assertEqual((42, 19), (offset, length))
+ self.assertOutput(
+ 'Bazaar pack format 1 (introduced in 0.18)\n'
+ 'B3\nname1\x00name2\n\nabc')
+
+ def test_add_second_bytes_record_gets_higher_offset(self):
+ self.writer.begin()
+ self.writer.add_bytes_record('abc', names=[])
+ offset, length = self.writer.add_bytes_record('abc', names=[])
+ self.assertEqual((49, 7), (offset, length))
+ self.assertOutput(
+ 'Bazaar pack format 1 (introduced in 0.18)\n'
+ 'B3\n\nabc'
+ 'B3\n\nabc')
+
+ def test_add_bytes_record_invalid_name(self):
+ """Adding a Bytes record with a name with whitespace in it raises
+ InvalidRecordError.
+ """
+ self.writer.begin()
+ self.assertRaises(
+ errors.InvalidRecordError,
+ self.writer.add_bytes_record, 'abc', names=[('bad name', )])
+
+ def test_add_bytes_records_add_to_records_written(self):
+ """Adding a Bytes record increments the records_written counter."""
+ self.writer.begin()
+ self.writer.add_bytes_record('foo', names=[])
+ self.assertEqual(1, self.writer.records_written)
+ self.writer.add_bytes_record('foo', names=[])
+ self.assertEqual(2, self.writer.records_written)
+
+
+class TestContainerReader(tests.TestCase):
+ """Tests for the ContainerReader.
+
+ The ContainerReader reads format 1 containers, so these tests explicitly
+ test how it reacts to format 1 data. If a new version of the format is
+ added, then separate tests for that format should be added.
+ """
+
+ def get_reader_for(self, bytes):
+ stream = StringIO(bytes)
+ reader = pack.ContainerReader(stream)
+ return reader
+
+ def test_construct(self):
+ """Test constructing a ContainerReader.
+
+ This uses None as the output stream to show that the constructor
+ doesn't try to use the input stream.
+ """
+ reader = pack.ContainerReader(None)
+
+ def test_empty_container(self):
+ """Read an empty container."""
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\nE")
+ self.assertEqual([], list(reader.iter_records()))
+
+ def test_unknown_format(self):
+ """Unrecognised container formats raise UnknownContainerFormatError."""
+ reader = self.get_reader_for("unknown format\n")
+ self.assertRaises(
+ errors.UnknownContainerFormatError, reader.iter_records)
+
+ def test_unexpected_end_of_container(self):
+ """Containers that don't end with an End Marker record should cause
+ UnexpectedEndOfContainerError to be raised.
+ """
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\n")
+ iterator = reader.iter_records()
+ self.assertRaises(
+ errors.UnexpectedEndOfContainerError, iterator.next)
+
+ def test_unknown_record_type(self):
+ """Unknown record types cause UnknownRecordTypeError to be raised."""
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\nX")
+ iterator = reader.iter_records()
+ self.assertRaises(
+ errors.UnknownRecordTypeError, iterator.next)
+
+ def test_container_with_one_unnamed_record(self):
+ """Read a container with one Bytes record.
+
+ Parsing Bytes records is more thoroughly exercised by
+ TestBytesRecordReader. This test is here to ensure that
+ ContainerReader's integration with BytesRecordReader is working.
+ """
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
+ expected_records = [([], 'aaaaa')]
+ self.assertEqual(
+ expected_records,
+ [(names, read_bytes(None))
+ for (names, read_bytes) in reader.iter_records()])
+
+ def test_validate_empty_container(self):
+ """validate does not raise an error for a container with no records."""
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\nE")
+ # No exception raised
+ reader.validate()
+
+ def test_validate_non_empty_valid_container(self):
+ """validate does not raise an error for a container with a valid record.
+ """
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
+ # No exception raised
+ reader.validate()
+
+ def test_validate_bad_format(self):
+ """validate raises an error for unrecognised format strings.
+
+ It may raise either UnexpectedEndOfContainerError or
+ UnknownContainerFormatError, depending on exactly what the string is.
+ """
+ inputs = ["", "x", "Bazaar pack format 1 (introduced in 0.18)", "bad\n"]
+ for input in inputs:
+ reader = self.get_reader_for(input)
+ self.assertRaises(
+ (errors.UnexpectedEndOfContainerError,
+ errors.UnknownContainerFormatError),
+ reader.validate)
+
+ def test_validate_bad_record_marker(self):
+ """validate raises UnknownRecordTypeError for unrecognised record
+ types.
+ """
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\nX")
+ self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
+
+ def test_validate_data_after_end_marker(self):
+ """validate raises ContainerHasExcessDataError if there are any bytes
+ after the end of the container.
+ """
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\nEcrud")
+ self.assertRaises(
+ errors.ContainerHasExcessDataError, reader.validate)
+
+ def test_validate_no_end_marker(self):
+ """validate raises UnexpectedEndOfContainerError if there's no end of
+ container marker, even if the container up to this point has been valid.
+ """
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\n")
+ self.assertRaises(
+ errors.UnexpectedEndOfContainerError, reader.validate)
+
+ def test_validate_duplicate_name(self):
+ """validate raises DuplicateRecordNameError if the same name occurs
+ multiple times in the container.
+ """
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\n"
+ "B0\nname\n\n"
+ "B0\nname\n\n"
+ "E")
+ self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
+
+ def test_validate_undecodeable_name(self):
+ """Names that aren't valid UTF-8 cause validate to fail."""
+ reader = self.get_reader_for(
+ "Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
+ self.assertRaises(errors.InvalidRecordError, reader.validate)
+
+
+class TestBytesRecordReader(tests.TestCase):
+ """Tests for reading and validating Bytes records with
+ BytesRecordReader.
+
+ Like TestContainerReader, this explicitly tests the reading of format 1
+ data. If a new version of the format is added, then a separate set of
+ tests for reading that format should be added.
+ """
+
+ def get_reader_for(self, bytes):
+ stream = StringIO(bytes)
+ reader = pack.BytesRecordReader(stream)
+ return reader
+
+ def test_record_with_no_name(self):
+ """Reading a Bytes record with no name returns an empty list of
+ names.
+ """
+ reader = self.get_reader_for("5\n\naaaaa")
+ names, get_bytes = reader.read()
+ self.assertEqual([], names)
+ self.assertEqual('aaaaa', get_bytes(None))
+
+ def test_record_with_one_name(self):
+ """Reading a Bytes record with one name returns a list of just that
+ name.
+ """
+ reader = self.get_reader_for("5\nname1\n\naaaaa")
+ names, get_bytes = reader.read()
+ self.assertEqual([('name1', )], names)
+ self.assertEqual('aaaaa', get_bytes(None))
+
+ def test_record_with_two_names(self):
+ """Reading a Bytes record with two names returns a list of both names.
+ """
+ reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
+ names, get_bytes = reader.read()
+ self.assertEqual([('name1', ), ('name2', )], names)
+ self.assertEqual('aaaaa', get_bytes(None))
+
+ def test_record_with_two_part_names(self):
+ """Reading a Bytes record with a two_part name reads both."""
+ reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
+ names, get_bytes = reader.read()
+ self.assertEqual([('name1', 'name2', )], names)
+ self.assertEqual('aaaaa', get_bytes(None))
+
+ def test_invalid_length(self):
+ """If the length-prefix is not a number, parsing raises
+ InvalidRecordError.
+ """
+ reader = self.get_reader_for("not a number\n")
+ self.assertRaises(errors.InvalidRecordError, reader.read)
+
+ def test_early_eof(self):
+ """Tests for premature EOF occuring during parsing Bytes records with
+ BytesRecordReader.
+
+ A incomplete container might be interrupted at any point. The
+ BytesRecordReader needs to cope with the input stream running out no
+ matter where it is in the parsing process.
+
+ In all cases, UnexpectedEndOfContainerError should be raised.
+ """
+ complete_record = "6\nname\n\nabcdef"
+ for count in range(0, len(complete_record)):
+ incomplete_record = complete_record[:count]
+ reader = self.get_reader_for(incomplete_record)
+ # We don't use assertRaises to make diagnosing failures easier
+ # (assertRaises doesn't allow a custom failure message).
+ try:
+ names, read_bytes = reader.read()
+ read_bytes(None)
+ except errors.UnexpectedEndOfContainerError:
+ pass
+ else:
+ self.fail(
+ "UnexpectedEndOfContainerError not raised when parsing %r"
+ % (incomplete_record,))
+
+ def test_initial_eof(self):
+ """EOF before any bytes read at all."""
+ reader = self.get_reader_for("")
+ self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
+
+ def test_eof_after_length(self):
+ """EOF after reading the length and before reading name(s)."""
+ reader = self.get_reader_for("123\n")
+ self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
+
+ def test_eof_during_name(self):
+ """EOF during reading a name."""
+ reader = self.get_reader_for("123\nname")
+ self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
+
+ def test_read_invalid_name_whitespace(self):
+ """Names must have no whitespace."""
+ # A name with a space.
+ reader = self.get_reader_for("0\nbad name\n\n")
+ self.assertRaises(errors.InvalidRecordError, reader.read)
+
+ # A name with a tab.
+ reader = self.get_reader_for("0\nbad\tname\n\n")
+ self.assertRaises(errors.InvalidRecordError, reader.read)
+
+ # A name with a vertical tab.
+ reader = self.get_reader_for("0\nbad\vname\n\n")
+ self.assertRaises(errors.InvalidRecordError, reader.read)
+
+ def test_validate_whitespace_in_name(self):
+ """Names must have no whitespace."""
+ reader = self.get_reader_for("0\nbad name\n\n")
+ self.assertRaises(errors.InvalidRecordError, reader.validate)
+
+ def test_validate_interrupted_prelude(self):
+ """EOF during reading a record's prelude causes validate to fail."""
+ reader = self.get_reader_for("")
+ self.assertRaises(
+ errors.UnexpectedEndOfContainerError, reader.validate)
+
+ def test_validate_interrupted_body(self):
+ """EOF during reading a record's body causes validate to fail."""
+ reader = self.get_reader_for("1\n\n")
+ self.assertRaises(
+ errors.UnexpectedEndOfContainerError, reader.validate)
+
+ def test_validate_unparseable_length(self):
+ """An unparseable record length causes validate to fail."""
+ reader = self.get_reader_for("\n\n")
+ self.assertRaises(
+ errors.InvalidRecordError, reader.validate)
+
+ def test_validate_undecodeable_name(self):
+ """Names that aren't valid UTF-8 cause validate to fail."""
+ reader = self.get_reader_for("0\n\xcc\n\n")
+ self.assertRaises(errors.InvalidRecordError, reader.validate)
+
+ def test_read_max_length(self):
+ """If the max_length passed to the callable returned by read is not
+ None, then no more than that many bytes will be read.
+ """
+ reader = self.get_reader_for("6\n\nabcdef")
+ names, get_bytes = reader.read()
+ self.assertEqual('abc', get_bytes(3))
+
+ def test_read_no_max_length(self):
+ """If the max_length passed to the callable returned by read is None,
+ then all the bytes in the record will be read.
+ """
+ reader = self.get_reader_for("6\n\nabcdef")
+ names, get_bytes = reader.read()
+ self.assertEqual('abcdef', get_bytes(None))
+
+ def test_repeated_read_calls(self):
+ """Repeated calls to the callable returned from BytesRecordReader.read
+ will not read beyond the end of the record.
+ """
+ reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
+ names, get_bytes = reader.read()
+ self.assertEqual('abcdef', get_bytes(None))
+ self.assertEqual('', get_bytes(None))
+ self.assertEqual('', get_bytes(99))
+
+
+class TestMakeReadvReader(tests.TestCaseWithTransport):
+
+ def test_read_skipping_records(self):
+ pack_data = StringIO()
+ writer = pack.ContainerWriter(pack_data.write)
+ writer.begin()
+ memos = []
+ memos.append(writer.add_bytes_record('abc', names=[]))
+ memos.append(writer.add_bytes_record('def', names=[('name1', )]))
+ memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
+ memos.append(writer.add_bytes_record('jkl', names=[]))
+ writer.end()
+ transport = self.get_transport()
+ transport.put_bytes('mypack', pack_data.getvalue())
+ requested_records = [memos[0], memos[2]]
+ reader = pack.make_readv_reader(transport, 'mypack', requested_records)
+ result = []
+ for names, reader_func in reader.iter_records():
+ result.append((names, reader_func(None)))
+ self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
+
+
+class TestReadvFile(tests.TestCaseWithTransport):
+ """Tests of the ReadVFile class.
+
+ Error cases are deliberately undefined: this code adapts the underlying
+ transport interface to a single 'streaming read' interface as
+ ContainerReader needs.
+ """
+
+ def test_read_bytes(self):
+ """Test reading of both single bytes and all bytes in a hunk."""
+ transport = self.get_transport()
+ transport.put_bytes('sample', '0123456789')
+ f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
+ results = []
+ results.append(f.read(1))
+ results.append(f.read(2))
+ results.append(f.read(1))
+ results.append(f.read(1))
+ results.append(f.read(1))
+ self.assertEqual(['0', '12', '4', '6', '7'], results)
+
+ def test_readline(self):
+ """Test using readline() as ContainerReader does.
+
+ This is always within a readv hunk, never across it.
+ """
+ transport = self.get_transport()
+ transport.put_bytes('sample', '0\n2\n4\n')
+ f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
+ results = []
+ results.append(f.readline())
+ results.append(f.readline())
+ results.append(f.readline())
+ self.assertEqual(['0\n', '2\n', '4\n'], results)
+
+ def test_readline_and_read(self):
+ """Test exercising one byte reads, readline, and then read again."""
+ transport = self.get_transport()
+ transport.put_bytes('sample', '0\n2\n4\n')
+ f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
+ results = []
+ results.append(f.read(1))
+ results.append(f.readline())
+ results.append(f.read(4))
+ self.assertEqual(['0', '\n', '2\n4\n'], results)
+
+
+class PushParserTestCase(tests.TestCase):
+ """Base class for TestCases involving ContainerPushParser."""
+
+ def make_parser_expecting_record_type(self):
+ parser = pack.ContainerPushParser()
+ parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
+ return parser
+
+ def make_parser_expecting_bytes_record(self):
+ parser = pack.ContainerPushParser()
+ parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
+ return parser
+
+ def assertRecordParsing(self, expected_record, bytes):
+ """Assert that 'bytes' is parsed as a given bytes record.
+
+ :param expected_record: A tuple of (names, bytes).
+ """
+ parser = self.make_parser_expecting_bytes_record()
+ parser.accept_bytes(bytes)
+ parsed_records = parser.read_pending_records()
+ self.assertEqual([expected_record], parsed_records)
+
+
+class TestContainerPushParser(PushParserTestCase):
+ """Tests for ContainerPushParser.
+
+ The ContainerPushParser reads format 1 containers, so these tests
+ explicitly test how it reacts to format 1 data. If a new version of the
+ format is added, then separate tests for that format should be added.
+ """
+
+ def test_construct(self):
+ """ContainerPushParser can be constructed."""
+ pack.ContainerPushParser()
+
+ def test_multiple_records_at_once(self):
+ """If multiple records worth of data are fed to the parser in one
+ string, the parser will correctly parse all the records.
+
+ (A naive implementation might stop after parsing the first record.)
+ """
+ parser = self.make_parser_expecting_record_type()
+ parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
+ self.assertEqual(
+ [([('name1',)], 'body1'), ([('name2',)], 'body2')],
+ parser.read_pending_records())
+
+ def test_multiple_empty_records_at_once(self):
+ """If multiple empty records worth of data are fed to the parser in one
+ string, the parser will correctly parse all the records.
+
+ (A naive implementation might stop after parsing the first empty
+ record, because the buffer size had not changed.)
+ """
+ parser = self.make_parser_expecting_record_type()
+ parser.accept_bytes("B0\nname1\n\nB0\nname2\n\n")
+ self.assertEqual(
+ [([('name1',)], ''), ([('name2',)], '')],
+ parser.read_pending_records())
+
+
+class TestContainerPushParserBytesParsing(PushParserTestCase):
+ """Tests for reading Bytes records with ContainerPushParser.
+
+ The ContainerPushParser reads format 1 containers, so these tests
+ explicitly test how it reacts to format 1 data. If a new version of the
+ format is added, then separate tests for that format should be added.
+ """
+
+ def test_record_with_no_name(self):
+ """Reading a Bytes record with no name returns an empty list of
+ names.
+ """
+ self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
+
+ def test_record_with_one_name(self):
+ """Reading a Bytes record with one name returns a list of just that
+ name.
+ """
+ self.assertRecordParsing(
+ ([('name1', )], 'aaaaa'),
+ "5\nname1\n\naaaaa")
+
+ def test_record_with_two_names(self):
+ """Reading a Bytes record with two names returns a list of both names.
+ """
+ self.assertRecordParsing(
+ ([('name1', ), ('name2', )], 'aaaaa'),
+ "5\nname1\nname2\n\naaaaa")
+
+ def test_record_with_two_part_names(self):
+ """Reading a Bytes record with a two_part name reads both."""
+ self.assertRecordParsing(
+ ([('name1', 'name2')], 'aaaaa'),
+ "5\nname1\x00name2\n\naaaaa")
+
+ def test_invalid_length(self):
+ """If the length-prefix is not a number, parsing raises
+ InvalidRecordError.
+ """
+ parser = self.make_parser_expecting_bytes_record()
+ self.assertRaises(
+ errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
+
+ def test_incomplete_record(self):
+ """If the bytes seen so far don't form a complete record, then there
+ will be nothing returned by read_pending_records.
+ """
+ parser = self.make_parser_expecting_bytes_record()
+ parser.accept_bytes("5\n\nabcd")
+ self.assertEqual([], parser.read_pending_records())
+
+ def test_accept_nothing(self):
+ """The edge case of parsing an empty string causes no error."""
+ parser = self.make_parser_expecting_bytes_record()
+ parser.accept_bytes("")
+
+ def assertInvalidRecord(self, bytes):
+ """Assert that parsing the given bytes will raise an
+ InvalidRecordError.
+ """
+ parser = self.make_parser_expecting_bytes_record()
+ self.assertRaises(
+ errors.InvalidRecordError, parser.accept_bytes, bytes)
+
+ def test_read_invalid_name_whitespace(self):
+ """Names must have no whitespace."""
+ # A name with a space.
+ self.assertInvalidRecord("0\nbad name\n\n")
+
+ # A name with a tab.
+ self.assertInvalidRecord("0\nbad\tname\n\n")
+
+ # A name with a vertical tab.
+ self.assertInvalidRecord("0\nbad\vname\n\n")
+
+ def test_repeated_read_pending_records(self):
+ """read_pending_records will not return the same record twice."""
+ parser = self.make_parser_expecting_bytes_record()
+ parser.accept_bytes("6\n\nabcdef")
+ self.assertEqual([([], 'abcdef')], parser.read_pending_records())
+ self.assertEqual([], parser.read_pending_records())