diff options
Diffstat (limited to 'bzrlib/tests/test_store.py')
-rw-r--r-- | bzrlib/tests/test_store.py | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/bzrlib/tests/test_store.py b/bzrlib/tests/test_store.py new file mode 100644 index 0000000..9a403e9 --- /dev/null +++ b/bzrlib/tests/test_store.py @@ -0,0 +1,449 @@ +# Copyright (C) 2005-2009, 2011 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""Test Store implementations.""" + +from cStringIO import StringIO +import os +import gzip + +import bzrlib.errors as errors +from bzrlib.errors import BzrError +from bzrlib.store import TransportStore +from bzrlib.store.text import TextStore +from bzrlib.store.versioned import VersionedFileStore +from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport +import bzrlib.transactions as transactions +import bzrlib.transport as transport +from bzrlib.transport.memory import MemoryTransport +from bzrlib.weave import WeaveFile + + +class TestStores(object): + """Mixin template class that provides some common tests for stores""" + + def check_content(self, store, fileid, value): + f = store.get(fileid) + self.assertEqual(f.read(), value) + + def fill_store(self, store): + store.add(StringIO('hello'), 'a') + store.add(StringIO('other'), 'b') + store.add(StringIO('something'), 'c') + store.add(StringIO('goodbye'), '123123') + + def test_copy_all(self): + """Test copying""" + os.mkdir('a') + store_a = self.get_store('a') + store_a.add(StringIO('foo'), '1') + os.mkdir('b') + store_b = self.get_store('b') + store_b.copy_all_ids(store_a) + self.assertEqual(store_a.get('1').read(), 'foo') + self.assertEqual(store_b.get('1').read(), 'foo') + # TODO: Switch the exception form UnlistableStore to + # or make Stores throw UnlistableStore if their + # Transport doesn't support listing + # store_c = RemoteStore('http://example.com/') + # self.assertRaises(UnlistableStore, copy_all, store_c, store_b) + + def test_get(self): + store = self.get_store() + self.fill_store(store) + + self.check_content(store, 'a', 'hello') + self.check_content(store, 'b', 'other') + self.check_content(store, 'c', 'something') + + # Make sure that requesting a non-existing file fails + self.assertRaises(KeyError, self.check_content, store, 'd', None) + + def test_multiple_add(self): + """Multiple add with same ID should raise a BzrError""" + store = self.get_store() + self.fill_store(store) + self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123') + + +class TestCompressedTextStore(TestCaseInTempDir, TestStores): + + def get_store(self, path=u'.'): + t = transport.get_transport_from_path(path) + return TextStore(t, compressed=True) + + def test_total_size(self): + store = self.get_store(u'.') + store.register_suffix('dsc') + store.add(StringIO('goodbye'), '123123') + store.add(StringIO('goodbye2'), '123123', 'dsc') + # these get gzipped - content should be stable + self.assertEqual(store.total_size(), (2, 55)) + + def test__relpath_suffixed(self): + my_store = TextStore(MockTransport(), + prefixed=True, compressed=True) + my_store.register_suffix('dsc') + self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc'])) + + +class TestMemoryStore(TestCase): + + def get_store(self): + return TextStore(MemoryTransport()) + + def test_add_and_retrieve(self): + store = self.get_store() + store.add(StringIO('hello'), 'aa') + self.assertNotEqual(store.get('aa'), None) + self.assertEqual(store.get('aa').read(), 'hello') + store.add(StringIO('hello world'), 'bb') + self.assertNotEqual(store.get('bb'), None) + self.assertEqual(store.get('bb').read(), 'hello world') + + def test_missing_is_absent(self): + store = self.get_store() + self.assertFalse('aa' in store) + + def test_adding_fails_when_present(self): + my_store = self.get_store() + my_store.add(StringIO('hello'), 'aa') + self.assertRaises(BzrError, + my_store.add, StringIO('hello'), 'aa') + + def test_total_size(self): + store = self.get_store() + store.add(StringIO('goodbye'), '123123') + store.add(StringIO('goodbye2'), '123123.dsc') + self.assertEqual(store.total_size(), (2, 15)) + # TODO: Switch the exception form UnlistableStore to + # or make Stores throw UnlistableStore if their + # Transport doesn't support listing + # store_c = RemoteStore('http://example.com/') + # self.assertRaises(UnlistableStore, copy_all, store_c, store_b) + + +class TestTextStore(TestCaseInTempDir, TestStores): + + def get_store(self, path=u'.'): + t = transport.get_transport_from_path(path) + return TextStore(t, compressed=False) + + def test_total_size(self): + store = self.get_store() + store.add(StringIO('goodbye'), '123123') + store.add(StringIO('goodbye2'), '123123.dsc') + self.assertEqual(store.total_size(), (2, 15)) + # TODO: Switch the exception form UnlistableStore to + # or make Stores throw UnlistableStore if their + # Transport doesn't support listing + # store_c = RemoteStore('http://example.com/') + # self.assertRaises(UnlistableStore, copy_all, store_c, store_b) + + +class TestMixedTextStore(TestCaseInTempDir, TestStores): + + def get_store(self, path=u'.', compressed=True): + t = transport.get_transport_from_path(path) + return TextStore(t, compressed=compressed) + + def test_get_mixed(self): + cs = self.get_store(u'.', compressed=True) + s = self.get_store(u'.', compressed=False) + cs.add(StringIO('hello there'), 'a') + + self.assertPathExists('a.gz') + self.assertFalse(os.path.lexists('a')) + + self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there') + + self.assertEquals(cs.has_id('a'), True) + self.assertEquals(s.has_id('a'), True) + self.assertEquals(cs.get('a').read(), 'hello there') + self.assertEquals(s.get('a').read(), 'hello there') + + self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a') + + s.add(StringIO('goodbye'), 'b') + self.assertPathExists('b') + self.assertFalse(os.path.lexists('b.gz')) + self.assertEquals(open('b').read(), 'goodbye') + + self.assertEquals(cs.has_id('b'), True) + self.assertEquals(s.has_id('b'), True) + self.assertEquals(cs.get('b').read(), 'goodbye') + self.assertEquals(s.get('b').read(), 'goodbye') + + self.assertRaises(BzrError, cs.add, StringIO('again'), 'b') + +class MockTransport(transport.Transport): + """A fake transport for testing with.""" + + def has(self, filename): + return False + + def __init__(self, url=None): + if url is None: + url = "http://example.com" + super(MockTransport, self).__init__(url) + + def mkdir(self, filename): + return + + +class InstrumentedTransportStore(TransportStore): + """An instrumented TransportStore. + + Here we replace template method worker methods with calls that record the + expected results. + """ + + def _add(self, filename, file): + self._calls.append(("_add", filename, file)) + + def __init__(self, transport, prefixed=False): + super(InstrumentedTransportStore, self).__init__(transport, prefixed) + self._calls = [] + + +class TestInstrumentedTransportStore(TestCase): + + def test__add_records(self): + my_store = InstrumentedTransportStore(MockTransport()) + my_store._add("filename", "file") + self.assertEqual([("_add", "filename", "file")], my_store._calls) + + +class TestMockTransport(TestCase): + + def test_isinstance(self): + self.assertIsInstance(MockTransport(), transport.Transport) + + def test_has(self): + self.assertEqual(False, MockTransport().has('foo')) + + def test_mkdir(self): + MockTransport().mkdir('45') + + +class TestTransportStore(TestCase): + + def test__relpath_invalid(self): + my_store = TransportStore(MockTransport()) + self.assertRaises(ValueError, my_store._relpath, '/foo') + self.assertRaises(ValueError, my_store._relpath, 'foo/') + + def test_register_invalid_suffixes(self): + my_store = TransportStore(MockTransport()) + self.assertRaises(ValueError, my_store.register_suffix, '/') + self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar') + + def test__relpath_unregister_suffixes(self): + my_store = TransportStore(MockTransport()) + self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz']) + self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz']) + + def test__relpath_simple(self): + my_store = TransportStore(MockTransport()) + self.assertEqual("foo", my_store._relpath('foo')) + + def test__relpath_prefixed(self): + my_store = TransportStore(MockTransport(), True) + self.assertEqual('45/foo', my_store._relpath('foo')) + + def test__relpath_simple_suffixed(self): + my_store = TransportStore(MockTransport()) + my_store.register_suffix('bar') + my_store.register_suffix('baz') + self.assertEqual('foo.baz', my_store._relpath('foo', ['baz'])) + self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz'])) + + def test__relpath_prefixed_suffixed(self): + my_store = TransportStore(MockTransport(), True) + my_store.register_suffix('bar') + my_store.register_suffix('baz') + self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz'])) + self.assertEqual('45/foo.bar.baz', + my_store._relpath('foo', ['bar', 'baz'])) + + def test_add_simple(self): + stream = StringIO("content") + my_store = InstrumentedTransportStore(MockTransport()) + my_store.add(stream, "foo") + self.assertEqual([("_add", "foo", stream)], my_store._calls) + + def test_add_prefixed(self): + stream = StringIO("content") + my_store = InstrumentedTransportStore(MockTransport(), True) + my_store.add(stream, "foo") + self.assertEqual([("_add", "45/foo", stream)], my_store._calls) + + def test_add_simple_suffixed(self): + stream = StringIO("content") + my_store = InstrumentedTransportStore(MockTransport()) + my_store.register_suffix('dsc') + my_store.add(stream, "foo", 'dsc') + self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls) + + def test_add_simple_suffixed(self): + stream = StringIO("content") + my_store = InstrumentedTransportStore(MockTransport(), True) + my_store.register_suffix('dsc') + my_store.add(stream, "foo", 'dsc') + self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls) + + def get_populated_store(self, prefixed=False, + store_class=TextStore, compressed=False): + my_store = store_class(MemoryTransport(), prefixed, + compressed=compressed) + my_store.register_suffix('sig') + stream = StringIO("signature") + my_store.add(stream, "foo", 'sig') + stream = StringIO("content") + my_store.add(stream, "foo") + stream = StringIO("signature for missing base") + my_store.add(stream, "missing", 'sig') + return my_store + + def test_has_simple(self): + my_store = self.get_populated_store() + self.assertEqual(True, my_store.has_id('foo')) + my_store = self.get_populated_store(True) + self.assertEqual(True, my_store.has_id('foo')) + + def test_has_suffixed(self): + my_store = self.get_populated_store() + self.assertEqual(True, my_store.has_id('foo', 'sig')) + my_store = self.get_populated_store(True) + self.assertEqual(True, my_store.has_id('foo', 'sig')) + + def test_has_suffixed_no_base(self): + my_store = self.get_populated_store() + self.assertEqual(False, my_store.has_id('missing')) + my_store = self.get_populated_store(True) + self.assertEqual(False, my_store.has_id('missing')) + + def test_get_simple(self): + my_store = self.get_populated_store() + self.assertEqual('content', my_store.get('foo').read()) + my_store = self.get_populated_store(True) + self.assertEqual('content', my_store.get('foo').read()) + + def test_get_suffixed(self): + my_store = self.get_populated_store() + self.assertEqual('signature', my_store.get('foo', 'sig').read()) + my_store = self.get_populated_store(True) + self.assertEqual('signature', my_store.get('foo', 'sig').read()) + + def test_get_suffixed_no_base(self): + my_store = self.get_populated_store() + self.assertEqual('signature for missing base', + my_store.get('missing', 'sig').read()) + my_store = self.get_populated_store(True) + self.assertEqual('signature for missing base', + my_store.get('missing', 'sig').read()) + + def test___iter__no_suffix(self): + my_store = TextStore(MemoryTransport(), + prefixed=False, compressed=False) + stream = StringIO("content") + my_store.add(stream, "foo") + self.assertEqual(set(['foo']), + set(my_store.__iter__())) + + def test___iter__(self): + self.assertEqual(set(['foo']), + set(self.get_populated_store().__iter__())) + self.assertEqual(set(['foo']), + set(self.get_populated_store(True).__iter__())) + + def test___iter__compressed(self): + self.assertEqual(set(['foo']), + set(self.get_populated_store( + compressed=True).__iter__())) + self.assertEqual(set(['foo']), + set(self.get_populated_store( + True, compressed=True).__iter__())) + + def test___len__(self): + self.assertEqual(1, len(self.get_populated_store())) + + def test_copy_suffixes(self): + from_store = self.get_populated_store() + to_store = TextStore(MemoryTransport(), + prefixed=True, compressed=True) + to_store.register_suffix('sig') + to_store.copy_all_ids(from_store) + self.assertEqual(1, len(to_store)) + self.assertEqual(set(['foo']), set(to_store.__iter__())) + self.assertEqual('content', to_store.get('foo').read()) + self.assertEqual('signature', to_store.get('foo', 'sig').read()) + self.assertRaises(KeyError, to_store.get, 'missing', 'sig') + + def test_relpath_escaped(self): + my_store = TransportStore(MemoryTransport()) + self.assertEqual('%25', my_store._relpath('%')) + + def test_escaped_uppercase(self): + """Uppercase letters are escaped for safety on Windows""" + my_store = TransportStore(MemoryTransport(), prefixed=True, + escaped=True) + # a particularly perverse file-id! :-) + self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e') + + +class TestVersionFileStore(TestCaseWithTransport): + + def get_scope(self): + return self._transaction + + def setUp(self): + super(TestVersionFileStore, self).setUp() + self.vfstore = VersionedFileStore(MemoryTransport(), + versionedfile_class=WeaveFile) + self.vfstore.get_scope = self.get_scope + self._transaction = None + + def test_get_weave_registers_dirty_in_write(self): + self._transaction = transactions.WriteTransaction() + vf = self.vfstore.get_weave_or_empty('id', self._transaction) + self._transaction.finish() + self._transaction = None + self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], []) + self._transaction = transactions.WriteTransaction() + vf = self.vfstore.get_weave('id', self._transaction) + self._transaction.finish() + self._transaction = None + self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], []) + + def test_get_weave_readonly_cant_write(self): + self._transaction = transactions.WriteTransaction() + vf = self.vfstore.get_weave_or_empty('id', self._transaction) + self._transaction.finish() + self._transaction = transactions.ReadOnlyTransaction() + vf = self.vfstore.get_weave_or_empty('id', self._transaction) + self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], []) + + def test___iter__escaped(self): + self.vfstore = VersionedFileStore(MemoryTransport(), + prefixed=True, escaped=True, versionedfile_class=WeaveFile) + self.vfstore.get_scope = self.get_scope + self._transaction = transactions.WriteTransaction() + vf = self.vfstore.get_weave_or_empty(' ', self._transaction) + vf.add_lines('a', [], []) + del vf + self._transaction.finish() + self.assertEqual([' '], list(self.vfstore)) |