summaryrefslogtreecommitdiff
path: root/bzrlib/tests/test_store.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/tests/test_store.py')
-rw-r--r--bzrlib/tests/test_store.py449
1 files changed, 449 insertions, 0 deletions
diff --git a/bzrlib/tests/test_store.py b/bzrlib/tests/test_store.py
new file mode 100644
index 0000000..9a403e9
--- /dev/null
+++ b/bzrlib/tests/test_store.py
@@ -0,0 +1,449 @@
+# Copyright (C) 2005-2009, 2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Test Store implementations."""
+
+from cStringIO import StringIO
+import os
+import gzip
+
+import bzrlib.errors as errors
+from bzrlib.errors import BzrError
+from bzrlib.store import TransportStore
+from bzrlib.store.text import TextStore
+from bzrlib.store.versioned import VersionedFileStore
+from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
+import bzrlib.transactions as transactions
+import bzrlib.transport as transport
+from bzrlib.transport.memory import MemoryTransport
+from bzrlib.weave import WeaveFile
+
+
+class TestStores(object):
+ """Mixin template class that provides some common tests for stores"""
+
+ def check_content(self, store, fileid, value):
+ f = store.get(fileid)
+ self.assertEqual(f.read(), value)
+
+ def fill_store(self, store):
+ store.add(StringIO('hello'), 'a')
+ store.add(StringIO('other'), 'b')
+ store.add(StringIO('something'), 'c')
+ store.add(StringIO('goodbye'), '123123')
+
+ def test_copy_all(self):
+ """Test copying"""
+ os.mkdir('a')
+ store_a = self.get_store('a')
+ store_a.add(StringIO('foo'), '1')
+ os.mkdir('b')
+ store_b = self.get_store('b')
+ store_b.copy_all_ids(store_a)
+ self.assertEqual(store_a.get('1').read(), 'foo')
+ self.assertEqual(store_b.get('1').read(), 'foo')
+ # TODO: Switch the exception form UnlistableStore to
+ # or make Stores throw UnlistableStore if their
+ # Transport doesn't support listing
+ # store_c = RemoteStore('http://example.com/')
+ # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
+
+ def test_get(self):
+ store = self.get_store()
+ self.fill_store(store)
+
+ self.check_content(store, 'a', 'hello')
+ self.check_content(store, 'b', 'other')
+ self.check_content(store, 'c', 'something')
+
+ # Make sure that requesting a non-existing file fails
+ self.assertRaises(KeyError, self.check_content, store, 'd', None)
+
+ def test_multiple_add(self):
+ """Multiple add with same ID should raise a BzrError"""
+ store = self.get_store()
+ self.fill_store(store)
+ self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
+
+
+class TestCompressedTextStore(TestCaseInTempDir, TestStores):
+
+ def get_store(self, path=u'.'):
+ t = transport.get_transport_from_path(path)
+ return TextStore(t, compressed=True)
+
+ def test_total_size(self):
+ store = self.get_store(u'.')
+ store.register_suffix('dsc')
+ store.add(StringIO('goodbye'), '123123')
+ store.add(StringIO('goodbye2'), '123123', 'dsc')
+ # these get gzipped - content should be stable
+ self.assertEqual(store.total_size(), (2, 55))
+
+ def test__relpath_suffixed(self):
+ my_store = TextStore(MockTransport(),
+ prefixed=True, compressed=True)
+ my_store.register_suffix('dsc')
+ self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
+
+
+class TestMemoryStore(TestCase):
+
+ def get_store(self):
+ return TextStore(MemoryTransport())
+
+ def test_add_and_retrieve(self):
+ store = self.get_store()
+ store.add(StringIO('hello'), 'aa')
+ self.assertNotEqual(store.get('aa'), None)
+ self.assertEqual(store.get('aa').read(), 'hello')
+ store.add(StringIO('hello world'), 'bb')
+ self.assertNotEqual(store.get('bb'), None)
+ self.assertEqual(store.get('bb').read(), 'hello world')
+
+ def test_missing_is_absent(self):
+ store = self.get_store()
+ self.assertFalse('aa' in store)
+
+ def test_adding_fails_when_present(self):
+ my_store = self.get_store()
+ my_store.add(StringIO('hello'), 'aa')
+ self.assertRaises(BzrError,
+ my_store.add, StringIO('hello'), 'aa')
+
+ def test_total_size(self):
+ store = self.get_store()
+ store.add(StringIO('goodbye'), '123123')
+ store.add(StringIO('goodbye2'), '123123.dsc')
+ self.assertEqual(store.total_size(), (2, 15))
+ # TODO: Switch the exception form UnlistableStore to
+ # or make Stores throw UnlistableStore if their
+ # Transport doesn't support listing
+ # store_c = RemoteStore('http://example.com/')
+ # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
+
+
+class TestTextStore(TestCaseInTempDir, TestStores):
+
+ def get_store(self, path=u'.'):
+ t = transport.get_transport_from_path(path)
+ return TextStore(t, compressed=False)
+
+ def test_total_size(self):
+ store = self.get_store()
+ store.add(StringIO('goodbye'), '123123')
+ store.add(StringIO('goodbye2'), '123123.dsc')
+ self.assertEqual(store.total_size(), (2, 15))
+ # TODO: Switch the exception form UnlistableStore to
+ # or make Stores throw UnlistableStore if their
+ # Transport doesn't support listing
+ # store_c = RemoteStore('http://example.com/')
+ # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
+
+
+class TestMixedTextStore(TestCaseInTempDir, TestStores):
+
+ def get_store(self, path=u'.', compressed=True):
+ t = transport.get_transport_from_path(path)
+ return TextStore(t, compressed=compressed)
+
+ def test_get_mixed(self):
+ cs = self.get_store(u'.', compressed=True)
+ s = self.get_store(u'.', compressed=False)
+ cs.add(StringIO('hello there'), 'a')
+
+ self.assertPathExists('a.gz')
+ self.assertFalse(os.path.lexists('a'))
+
+ self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
+
+ self.assertEquals(cs.has_id('a'), True)
+ self.assertEquals(s.has_id('a'), True)
+ self.assertEquals(cs.get('a').read(), 'hello there')
+ self.assertEquals(s.get('a').read(), 'hello there')
+
+ self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
+
+ s.add(StringIO('goodbye'), 'b')
+ self.assertPathExists('b')
+ self.assertFalse(os.path.lexists('b.gz'))
+ self.assertEquals(open('b').read(), 'goodbye')
+
+ self.assertEquals(cs.has_id('b'), True)
+ self.assertEquals(s.has_id('b'), True)
+ self.assertEquals(cs.get('b').read(), 'goodbye')
+ self.assertEquals(s.get('b').read(), 'goodbye')
+
+ self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
+
+class MockTransport(transport.Transport):
+ """A fake transport for testing with."""
+
+ def has(self, filename):
+ return False
+
+ def __init__(self, url=None):
+ if url is None:
+ url = "http://example.com"
+ super(MockTransport, self).__init__(url)
+
+ def mkdir(self, filename):
+ return
+
+
+class InstrumentedTransportStore(TransportStore):
+ """An instrumented TransportStore.
+
+ Here we replace template method worker methods with calls that record the
+ expected results.
+ """
+
+ def _add(self, filename, file):
+ self._calls.append(("_add", filename, file))
+
+ def __init__(self, transport, prefixed=False):
+ super(InstrumentedTransportStore, self).__init__(transport, prefixed)
+ self._calls = []
+
+
+class TestInstrumentedTransportStore(TestCase):
+
+ def test__add_records(self):
+ my_store = InstrumentedTransportStore(MockTransport())
+ my_store._add("filename", "file")
+ self.assertEqual([("_add", "filename", "file")], my_store._calls)
+
+
+class TestMockTransport(TestCase):
+
+ def test_isinstance(self):
+ self.assertIsInstance(MockTransport(), transport.Transport)
+
+ def test_has(self):
+ self.assertEqual(False, MockTransport().has('foo'))
+
+ def test_mkdir(self):
+ MockTransport().mkdir('45')
+
+
+class TestTransportStore(TestCase):
+
+ def test__relpath_invalid(self):
+ my_store = TransportStore(MockTransport())
+ self.assertRaises(ValueError, my_store._relpath, '/foo')
+ self.assertRaises(ValueError, my_store._relpath, 'foo/')
+
+ def test_register_invalid_suffixes(self):
+ my_store = TransportStore(MockTransport())
+ self.assertRaises(ValueError, my_store.register_suffix, '/')
+ self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
+
+ def test__relpath_unregister_suffixes(self):
+ my_store = TransportStore(MockTransport())
+ self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
+ self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
+
+ def test__relpath_simple(self):
+ my_store = TransportStore(MockTransport())
+ self.assertEqual("foo", my_store._relpath('foo'))
+
+ def test__relpath_prefixed(self):
+ my_store = TransportStore(MockTransport(), True)
+ self.assertEqual('45/foo', my_store._relpath('foo'))
+
+ def test__relpath_simple_suffixed(self):
+ my_store = TransportStore(MockTransport())
+ my_store.register_suffix('bar')
+ my_store.register_suffix('baz')
+ self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
+ self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
+
+ def test__relpath_prefixed_suffixed(self):
+ my_store = TransportStore(MockTransport(), True)
+ my_store.register_suffix('bar')
+ my_store.register_suffix('baz')
+ self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
+ self.assertEqual('45/foo.bar.baz',
+ my_store._relpath('foo', ['bar', 'baz']))
+
+ def test_add_simple(self):
+ stream = StringIO("content")
+ my_store = InstrumentedTransportStore(MockTransport())
+ my_store.add(stream, "foo")
+ self.assertEqual([("_add", "foo", stream)], my_store._calls)
+
+ def test_add_prefixed(self):
+ stream = StringIO("content")
+ my_store = InstrumentedTransportStore(MockTransport(), True)
+ my_store.add(stream, "foo")
+ self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
+
+ def test_add_simple_suffixed(self):
+ stream = StringIO("content")
+ my_store = InstrumentedTransportStore(MockTransport())
+ my_store.register_suffix('dsc')
+ my_store.add(stream, "foo", 'dsc')
+ self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
+
+ def test_add_simple_suffixed(self):
+ stream = StringIO("content")
+ my_store = InstrumentedTransportStore(MockTransport(), True)
+ my_store.register_suffix('dsc')
+ my_store.add(stream, "foo", 'dsc')
+ self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
+
+ def get_populated_store(self, prefixed=False,
+ store_class=TextStore, compressed=False):
+ my_store = store_class(MemoryTransport(), prefixed,
+ compressed=compressed)
+ my_store.register_suffix('sig')
+ stream = StringIO("signature")
+ my_store.add(stream, "foo", 'sig')
+ stream = StringIO("content")
+ my_store.add(stream, "foo")
+ stream = StringIO("signature for missing base")
+ my_store.add(stream, "missing", 'sig')
+ return my_store
+
+ def test_has_simple(self):
+ my_store = self.get_populated_store()
+ self.assertEqual(True, my_store.has_id('foo'))
+ my_store = self.get_populated_store(True)
+ self.assertEqual(True, my_store.has_id('foo'))
+
+ def test_has_suffixed(self):
+ my_store = self.get_populated_store()
+ self.assertEqual(True, my_store.has_id('foo', 'sig'))
+ my_store = self.get_populated_store(True)
+ self.assertEqual(True, my_store.has_id('foo', 'sig'))
+
+ def test_has_suffixed_no_base(self):
+ my_store = self.get_populated_store()
+ self.assertEqual(False, my_store.has_id('missing'))
+ my_store = self.get_populated_store(True)
+ self.assertEqual(False, my_store.has_id('missing'))
+
+ def test_get_simple(self):
+ my_store = self.get_populated_store()
+ self.assertEqual('content', my_store.get('foo').read())
+ my_store = self.get_populated_store(True)
+ self.assertEqual('content', my_store.get('foo').read())
+
+ def test_get_suffixed(self):
+ my_store = self.get_populated_store()
+ self.assertEqual('signature', my_store.get('foo', 'sig').read())
+ my_store = self.get_populated_store(True)
+ self.assertEqual('signature', my_store.get('foo', 'sig').read())
+
+ def test_get_suffixed_no_base(self):
+ my_store = self.get_populated_store()
+ self.assertEqual('signature for missing base',
+ my_store.get('missing', 'sig').read())
+ my_store = self.get_populated_store(True)
+ self.assertEqual('signature for missing base',
+ my_store.get('missing', 'sig').read())
+
+ def test___iter__no_suffix(self):
+ my_store = TextStore(MemoryTransport(),
+ prefixed=False, compressed=False)
+ stream = StringIO("content")
+ my_store.add(stream, "foo")
+ self.assertEqual(set(['foo']),
+ set(my_store.__iter__()))
+
+ def test___iter__(self):
+ self.assertEqual(set(['foo']),
+ set(self.get_populated_store().__iter__()))
+ self.assertEqual(set(['foo']),
+ set(self.get_populated_store(True).__iter__()))
+
+ def test___iter__compressed(self):
+ self.assertEqual(set(['foo']),
+ set(self.get_populated_store(
+ compressed=True).__iter__()))
+ self.assertEqual(set(['foo']),
+ set(self.get_populated_store(
+ True, compressed=True).__iter__()))
+
+ def test___len__(self):
+ self.assertEqual(1, len(self.get_populated_store()))
+
+ def test_copy_suffixes(self):
+ from_store = self.get_populated_store()
+ to_store = TextStore(MemoryTransport(),
+ prefixed=True, compressed=True)
+ to_store.register_suffix('sig')
+ to_store.copy_all_ids(from_store)
+ self.assertEqual(1, len(to_store))
+ self.assertEqual(set(['foo']), set(to_store.__iter__()))
+ self.assertEqual('content', to_store.get('foo').read())
+ self.assertEqual('signature', to_store.get('foo', 'sig').read())
+ self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
+
+ def test_relpath_escaped(self):
+ my_store = TransportStore(MemoryTransport())
+ self.assertEqual('%25', my_store._relpath('%'))
+
+ def test_escaped_uppercase(self):
+ """Uppercase letters are escaped for safety on Windows"""
+ my_store = TransportStore(MemoryTransport(), prefixed=True,
+ escaped=True)
+ # a particularly perverse file-id! :-)
+ self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
+
+
+class TestVersionFileStore(TestCaseWithTransport):
+
+ def get_scope(self):
+ return self._transaction
+
+ def setUp(self):
+ super(TestVersionFileStore, self).setUp()
+ self.vfstore = VersionedFileStore(MemoryTransport(),
+ versionedfile_class=WeaveFile)
+ self.vfstore.get_scope = self.get_scope
+ self._transaction = None
+
+ def test_get_weave_registers_dirty_in_write(self):
+ self._transaction = transactions.WriteTransaction()
+ vf = self.vfstore.get_weave_or_empty('id', self._transaction)
+ self._transaction.finish()
+ self._transaction = None
+ self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
+ self._transaction = transactions.WriteTransaction()
+ vf = self.vfstore.get_weave('id', self._transaction)
+ self._transaction.finish()
+ self._transaction = None
+ self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
+
+ def test_get_weave_readonly_cant_write(self):
+ self._transaction = transactions.WriteTransaction()
+ vf = self.vfstore.get_weave_or_empty('id', self._transaction)
+ self._transaction.finish()
+ self._transaction = transactions.ReadOnlyTransaction()
+ vf = self.vfstore.get_weave_or_empty('id', self._transaction)
+ self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
+
+ def test___iter__escaped(self):
+ self.vfstore = VersionedFileStore(MemoryTransport(),
+ prefixed=True, escaped=True, versionedfile_class=WeaveFile)
+ self.vfstore.get_scope = self.get_scope
+ self._transaction = transactions.WriteTransaction()
+ vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
+ vf.add_lines('a', [], [])
+ del vf
+ self._transaction.finish()
+ self.assertEqual([' '], list(self.vfstore))